Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #17507: inventory watcher should ignore uuid.hive file #3007

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ final class Watchers(incoming: FileMonitor, updates: FileMonitor) {
}
object Watchers {

val inventoryExtentions = "gz" :: "xml" :: "ocs" :: "sign" :: Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be a set, it's done for that - but actually, for 4 elements, it's still special cased so it shouldn't matter at all.

def hasValidInventoryExtension(file : File) = {
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
inventoryExtentions.contains(ext)
}
def apply(incoming: File, updates: File, checkProcess: File => Unit): Watchers = {
def newWatcher(directory: File): FileMonitor = {
new FileMonitor(directory, recursive = false) {
val maxDepth = 0 // see parent class for logic, needed to minimize change in overriden `process` method
val maxDepth = 0 // see parent class for logic, needed to minimize change in overridden `process` method

private var stopRequired = false

Expand All @@ -98,10 +103,10 @@ object Watchers {
override def onCreate(file: File, count: Int): Unit = onModify(file, count)
override def onModify(file: File, count: Int): Unit = {
// we want to avoid processing some file
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
if( ext == "gz" || ext == "xml" || ext == "ocs" || ext == "sign") {
if( hasValidInventoryExtension(file) ) {
checkProcess(file)
} else {
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
InventoryProcessingLogger.logEffect.debug(s"watcher ignored file ${file.name} (unrecognized extension: '${ext}')")
}
}
Expand Down Expand Up @@ -169,14 +174,14 @@ class SchedulerMissedNotify(
){

/*
* List all files that need to be rehad - simply all files older than period
* List all files that need to be add again - simply all files older than period
*/
def listFiles(d: Duration): UIO[List[File]] = {
IOResult.effect{
val ageLimit = DateTime.now().minusMillis(d.toMillis.toInt)
val filter = (f:File) => (
(f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))
)
val filter = (f:File) => {
Watchers.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))
}
directories.flatMap(_.collectChildren(filter).toList)
}.catchAll(err =>
InventoryProcessingLogger.error(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}")*>
Expand Down Expand Up @@ -218,7 +223,7 @@ class InventoryFileWatcher(
if(dir.isDirectory && dir.isWriteable) {
InventoryProcessingLogger.logEffect.debug(s"${name} inventories directory [ok]: ${dir.pathAsString}")
} else {
InventoryProcessingLogger.logEffect.error(s"${name} inventories directory: ${dir.pathAsString} is not writable. Please check existense and file permission.")
InventoryProcessingLogger.logEffect.error(s"${name} inventories directory: ${dir.pathAsString} is not writable. Please check existence and file permission.")
}
}

Expand All @@ -236,7 +241,7 @@ class InventoryFileWatcher(
logDirPerm(failed, "Failed")

implicit val scheduler = ZioRuntime.unsafeRun(ZIO.access[Blocking](_.get.blockingExecutor.asEC).provide(ZioRuntime.environment))
// the cron that look for old files (or existing ones on startup/watcher retart)
// the cron that look for old files (or existing ones on startup/watcher restart)
val cronForMissed = new SchedulerMissedNotify(
incoming :: updated :: Nil
, files => ZIO.foreach_(files)(fileProcessor.addFilePure)
Expand Down Expand Up @@ -273,13 +278,13 @@ class InventoryFileWatcher(
ref.update(opt =>
opt match {
case None => //ok
InventoryProcessingLogger.info(s"Stoping incoming inventory watcher ignored (already stoped).")*>
InventoryProcessingLogger.info(s"Stopping incoming inventory watcher ignored (already stopped).")*>
None.succeed

case Some((w, f)) =>
(for {
_ <- ZIO.collectAll(w.stop() :: f.interrupt :: Nil)
_ <- InventoryProcessingLogger.info(s"Incoming inventory watcher stoped")
_ <- InventoryProcessingLogger.info(s"Incoming inventory watcher stopped")
} yield None).catchAll(err =>
InventoryProcessingLogger.error(s"Error when trying to stop incoming inventories file watcher. Reported exception was: ${err.fullMsg}.") *>
// in all case, remove the previous watcher, it's most likely in a dead state
Expand Down Expand Up @@ -338,10 +343,10 @@ class ProcessFile(
val toBeProcessed = ZioRuntime.unsafeRun(zio.RefM.make(Map.empty[File, Fiber[RudderError, Unit]]))

/*
* We need a queue of add file / file written even to delimitate when a file should be
* We need a queue of add file / file written even to delimit when a file should be
* processed.
* We are going to receive a streem of "write file xxx" event, and updating the map accordingly
* (reseting the task). Once the task is started, we want to free space in the map, and so enqueue
* We are going to receive a stream of "write file xxx" event, and updating the map accordingly
* (resetting the task). Once the task is started, we want to free space in the map, and so enqueue
* a "file written" event.
*/
type WatchQueue = ZQueue[Any, Nothing, Any, Nothing, WatchEvent, WatchEvent]
Expand All @@ -358,6 +363,7 @@ class ProcessFile(

case WatchEvent.Mod(file) =>
// look if the file is already here. If so, interrupt. In all case, create a new task.

(toBeProcessed.update { s =>
val newMap = {
// the new task must :
Expand Down Expand Up @@ -405,7 +411,7 @@ class ProcessFile(
* timeout and process, else do nothing.
*/
def processFile(file: File, locks: zio.Ref[Set[String]]): ZIO[Any, RudderError, Unit] = {
// the part that deals with sending to processor and then deplacing files
// the part that deals with sending to processor and then moving files
// where they belong
import zio.{Task => ZioTask}

Expand Down Expand Up @@ -444,7 +450,7 @@ class ProcessFile(
val prog = for {
_ <- InventoryProcessingLogger.trace(s"Processing new file: ${file.pathAsString}")
// We need to only try to do things on fully-written file.
// The canocic way seems to be to try to get a write lock on the file and see if
// The canonical way seems to be to try to get a write lock on the file and see if
// it works. We assume that once we successfully get the lock, it means that
// the file is written (so we can immediately release it).
done <- if(file.name.endsWith(".gz")) {
Expand Down