Skip to content

Commit

Permalink
Fixes #17507: inventory watcher should ignore uuid.hive file
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceMacBuche authored and fanf committed May 26, 2020
1 parent 6ecab83 commit c3b5961
Showing 1 changed file with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ final class Watchers(incoming: FileMonitor, updates: FileMonitor) {
}
object Watchers {

val inventoryExtentions = "gz" :: "xml" :: "ocs" :: "sign" :: Nil
def hasValidInventoryExtension(file : File) = {
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
inventoryExtentions.contains(ext)
}
def apply(incoming: File, updates: File, checkProcess: File => Unit): Watchers = {
def newWatcher(directory: File): FileMonitor = {
new FileMonitor(directory, recursive = false) {
val maxDepth = 0 // see parent class for logic, needed to minimize change in overriden `process` method
val maxDepth = 0 // see parent class for logic, needed to minimize change in overridden `process` method

private var stopRequired = false

Expand All @@ -98,10 +103,10 @@ object Watchers {
override def onCreate(file: File, count: Int): Unit = onModify(file, count)
override def onModify(file: File, count: Int): Unit = {
// we want to avoid processing some file
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
if( ext == "gz" || ext == "xml" || ext == "ocs" || ext == "sign") {
if( hasValidInventoryExtension(file) ) {
checkProcess(file)
} else {
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
InventoryProcessingLogger.logEffect.debug(s"watcher ignored file ${file.name} (unrecognized extension: '${ext}')")
}
}
Expand Down Expand Up @@ -169,14 +174,14 @@ class SchedulerMissedNotify(
){

/*
* List all files that need to be rehad - simply all files older than period
* List all files that need to be add again - simply all files older than period
*/
def listFiles(d: Duration): UIO[List[File]] = {
IOResult.effect{
val ageLimit = DateTime.now().minusMillis(d.toMillis.toInt)
val filter = (f:File) => (
(f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))
)
val filter = (f:File) => {
Watchers.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))
}
directories.flatMap(_.collectChildren(filter).toList)
}.catchAll(err =>
InventoryProcessingLogger.error(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}")*>
Expand Down Expand Up @@ -218,7 +223,7 @@ class InventoryFileWatcher(
if(dir.isDirectory && dir.isWriteable) {
InventoryProcessingLogger.logEffect.debug(s"${name} inventories directory [ok]: ${dir.pathAsString}")
} else {
InventoryProcessingLogger.logEffect.error(s"${name} inventories directory: ${dir.pathAsString} is not writable. Please check existense and file permission.")
InventoryProcessingLogger.logEffect.error(s"${name} inventories directory: ${dir.pathAsString} is not writable. Please check existence and file permission.")
}
}

Expand All @@ -236,7 +241,7 @@ class InventoryFileWatcher(
logDirPerm(failed, "Failed")

implicit val scheduler = ZioRuntime.unsafeRun(ZIO.access[Blocking](_.get.blockingExecutor.asEC).provide(ZioRuntime.environment))
// the cron that look for old files (or existing ones on startup/watcher retart)
// the cron that look for old files (or existing ones on startup/watcher restart)
val cronForMissed = new SchedulerMissedNotify(
incoming :: updated :: Nil
, files => ZIO.foreach_(files)(fileProcessor.addFilePure)
Expand Down Expand Up @@ -273,13 +278,13 @@ class InventoryFileWatcher(
ref.update(opt =>
opt match {
case None => //ok
InventoryProcessingLogger.info(s"Stoping incoming inventory watcher ignored (already stoped).")*>
InventoryProcessingLogger.info(s"Stopping incoming inventory watcher ignored (already stopped).")*>
None.succeed

case Some((w, f)) =>
(for {
_ <- ZIO.collectAll(w.stop() :: f.interrupt :: Nil)
_ <- InventoryProcessingLogger.info(s"Incoming inventory watcher stoped")
_ <- InventoryProcessingLogger.info(s"Incoming inventory watcher stopped")
} yield None).catchAll(err =>
InventoryProcessingLogger.error(s"Error when trying to stop incoming inventories file watcher. Reported exception was: ${err.fullMsg}.") *>
// in all case, remove the previous watcher, it's most likely in a dead state
Expand Down Expand Up @@ -338,10 +343,10 @@ class ProcessFile(
val toBeProcessed = ZioRuntime.unsafeRun(zio.RefM.make(Map.empty[File, Fiber[RudderError, Unit]]))

/*
* We need a queue of add file / file written even to delimitate when a file should be
* We need a queue of add file / file written even to delimit when a file should be
* processed.
* We are going to receive a streem of "write file xxx" event, and updating the map accordingly
* (reseting the task). Once the task is started, we want to free space in the map, and so enqueue
* We are going to receive a stream of "write file xxx" event, and updating the map accordingly
* (resetting the task). Once the task is started, we want to free space in the map, and so enqueue
* a "file written" event.
*/
type WatchQueue = ZQueue[Any, Nothing, Any, Nothing, WatchEvent, WatchEvent]
Expand All @@ -358,6 +363,7 @@ class ProcessFile(

case WatchEvent.Mod(file) =>
// look if the file is already here. If so, interrupt. In all case, create a new task.

(toBeProcessed.update { s =>
val newMap = {
// the new task must :
Expand Down Expand Up @@ -405,7 +411,7 @@ class ProcessFile(
* timeout and process, else do nothing.
*/
def processFile(file: File, locks: zio.Ref[Set[String]]): ZIO[Any, RudderError, Unit] = {
// the part that deals with sending to processor and then deplacing files
// the part that deals with sending to processor and then moving files
// where they belong
import zio.{Task => ZioTask}

Expand Down Expand Up @@ -444,7 +450,7 @@ class ProcessFile(
val prog = for {
_ <- InventoryProcessingLogger.trace(s"Processing new file: ${file.pathAsString}")
// We need to only try to do things on fully-written file.
// The canocic way seems to be to try to get a write lock on the file and see if
// The canonical way seems to be to try to get a write lock on the file and see if
// it works. We assume that once we successfully get the lock, it means that
// the file is written (so we can immediately release it).
done <- if(file.name.endsWith(".gz")) {
Expand Down

0 comments on commit c3b5961

Please sign in to comment.