From 46d075e37200879bb4cab560838d8e52f55b18a7 Mon Sep 17 00:00:00 2001 From: "Francois @fanf42 Armand" Date: Tue, 30 Aug 2022 12:37:28 +0200 Subject: [PATCH] Fixes #21665: When we receive a lot of inventories, inotify \"OVERFLOW\" is not handled --- .../inventory/InventoryFileWatcher.scala | 136 +++++++++++------- 1 file changed, 87 insertions(+), 49 deletions(-) diff --git a/webapp/sources/rudder/rudder-core/src/main/scala/com/normation/rudder/inventory/InventoryFileWatcher.scala b/webapp/sources/rudder/rudder-core/src/main/scala/com/normation/rudder/inventory/InventoryFileWatcher.scala index ccb66fa38b..e27ef9fbd8 100644 --- a/webapp/sources/rudder/rudder-core/src/main/scala/com/normation/rudder/inventory/InventoryFileWatcher.scala +++ b/webapp/sources/rudder/rudder-core/src/main/scala/com/normation/rudder/inventory/InventoryFileWatcher.scala @@ -37,30 +37,30 @@ package com.normation.rudder.inventory -import java.nio.file.ClosedWatchServiceException -import java.util.concurrent.TimeUnit -import better.files._ - -import com.normation.errors.IOResult -import com.normation.errors.RudderError import com.normation.inventory.domain.InventoryProcessingLogger -import com.normation.zio.ZioRuntime -import scala.concurrent.ExecutionContext - -import zio._ -import zio.syntax._ -import zio.duration._ -import com.normation.zio._ +import better.files._ import org.apache.commons.io.FileUtils import org.apache.commons.io.filefilter.TrueFileFilter import org.joda.time.DateTime -import zio.clock.Clock import java.io.FileNotFoundException import java.io.InputStream +import java.nio.file +import java.nio.file.ClosedWatchServiceException +import java.nio.file.StandardWatchEventKinds +import java.util.concurrent.TimeUnit +import scala.concurrent.ExecutionContext +import zio._ +import zio.clock.Clock +import zio.duration._ +import zio.syntax._ import com.normation.box.IOManaged +import com.normation.errors.IOResult +import com.normation.errors.RudderError +import com.normation.zio._ +import com.normation.zio.ZioRuntime /* @@ -127,11 +127,10 @@ class InventoryFileWatcher( // service for cleaning - val cleaner = new ProcessOldFiles(fileProcessor, maxOldInventoryAge, takeCareOfUnprocessedFileAfter, InventoryProcessingUtils.hasValidInventoryExtension) + val cleaner = new CheckExistingInventoryFilesImpl(fileProcessor, incoming :: updated :: Nil, maxOldInventoryAge, takeCareOfUnprocessedFileAfter, InventoryProcessingUtils.hasValidInventoryExtension) // the cron that look for old files (or existing ones on startup/watcher restart) val cronForMissed = new SchedulerMissedNotify( - incoming :: updated :: Nil - , cleaner.processOldFiles _ + cleaner , collectOldInventoriesFrequency , ZioRuntime.environment ) @@ -155,7 +154,7 @@ class InventoryFileWatcher( opt.succeed case None => - val w = Watchers(incoming, updated, fileProcessor.addFile) + val w = Watchers(incoming, updated, fileProcessor, cleaner) (for { _ <- w.start() // start scheduler for old file, it will take care of inventories @@ -215,11 +214,26 @@ final class Watchers(incoming: FileMonitor, updates: FileMonitor) { } object Watchers { - def apply(incoming: File, updates: File, checkProcess: File => Unit): Watchers = { + def apply(incoming: File, updates: File, checkNew: HandleIncomingInventoryFile, checkOld: CheckExistingInventoryFiles): Watchers = { def newWatcher(directory: File): FileMonitor = { new FileMonitor(directory, recursive = false) { private var stopRequired = false + // a one element queue to tempo overflow events + val tempoOverflow = ZioRuntime.unsafeRun(ZQueue.dropping[Unit](1)) + + // process overflow + val overflowFiber = ZioRuntime.unsafeRun((for { + _ <- tempoOverflow.take + // if we are overflowing, we got at least a couple hundred inventories. Wait one minute before continuing + _ <- InventoryProcessingLogger.info("Inotify watcher event overflow: waiting a minute before checking what inventories need to be processed") + _ <- UIO.unit.delay(1.minutes) + // clean-up other overflow that happened during that time + _ <- tempoOverflow.takeAll + _ <- checkOld.checkFilesOlderThan(0.milli) + } yield ()).forever.forkDaemon.provideLayer(ZioRuntime.layers)) + + /* * when a file is written, depending about how it is handle, by what process, and at which rate, * we can have form 1 create only to 1 create and lots of modify events. @@ -233,13 +247,30 @@ object Watchers { override def onModify(file: File, count: Int): Unit = { // we want to avoid processing some file if( InventoryProcessingUtils.hasValidInventoryExtension(file) ) { - checkProcess(file) + checkNew.addFile(file) } else { val ext = file.extension(includeDot = false, includeAll = false).getOrElse("") InventoryProcessingLogger.logEffect.debug(s"watcher ignored file ${file.name} (unrecognized extension: '${ext}')") } } + + override def onUnknownEvent(event: file.WatchEvent[_]): Unit = { + event.kind() match { + case StandardWatchEventKinds.OVERFLOW => + tempoOverflow.offer(()).runNow + + case _ => + InventoryProcessingLogger.logEffect.debug(s"Inotify sent an unknown event: ${event.getClass.getName}: ${event.kind()} ; ${event.context()}") + } + } + + override def onException(exception: Throwable): Unit = { + InventoryProcessingLogger.logEffect.error(s"Error with inotify inventory watcher. If new inventory are not immediately processed," + + s" you may need to restart watcher (see https://docs.rudder.io/api/#tag/Inventories/operation/fileWatcherRestart):" + + s"${exception.getClass.getName}: ${exception.getMessage}") + } + // When we call "stop" on the FileMonitor, it always throws a ClosedWatchServiceException. // It seems to be because the "run" in start continue to try to execute its // action on the stop watcher. We need to catch that. @@ -263,6 +294,7 @@ object Watchers { } override def close() = { + ZioRuntime.unsafeRun(overflowFiber.interrupt) stopRequired = true service.close() } @@ -296,12 +328,17 @@ final case class FilteredFiles( ) // A class that filter and add back inventory files -class ProcessOldFiles( +trait CheckExistingInventoryFiles { + def checkFilesOlderThan(d: Duration): UIO[Unit] +} + +class CheckExistingInventoryFilesImpl( fileProcessor: HandleIncomingInventoryFile + , directories: List[File] , purgeAfter: Duration // time after which old inventories are deleted , waitingSignatureTime: Duration , hasValidExtension: File => Boolean -) { +) extends CheckExistingInventoryFiles { def processOldFiles(files: List[File]): UIO[Unit] = { val now = DateTime.now() @@ -376,21 +413,10 @@ class ProcessOldFiles( } } } -} - -/* - * A class that periodically add inventories that we likely missed by inotify - */ -class SchedulerMissedNotify( - directories: List[File] - , addFiles : List[File] => UIO[Unit] - , period : Duration - , zclock : Clock -){ /* * List all files that need to be added again - simply all files older than period. - * Be careful that files can disapear in the middle of a java.nio.Files.walk, so just + * Be careful that files can disappear in the middle of a java.nio.Files.walk, so just * don't use that (or better files methods that use that) * See: https://issues.rudder.io/issues/19268 */ @@ -399,36 +425,48 @@ class SchedulerMissedNotify( (for { // if that fails, just exit ageLimit <- IOResult.effect(DateTime.now().minusMillis(d.toMillis.toInt)) - filter = (f:File) => if(f.exists && InventoryProcessingUtils.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))) Some(f) else None + filter = (f: File) => if (f.exists && InventoryProcessingUtils.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))) Some(f) else None // if the listing fails, just exit children <- ZIO.foreach(directories)(d => IOResult.effect(FileUtils.listFilesAndDirs(d.toJava, TrueFileFilter.TRUE, TrueFileFilter.TRUE).asScala)) - // filter file by file. In case of error, just skip it. Speficically ignore FileNotFound (davfs temp files disapear) + // filter file by file. In case of error, just skip it. Specifically ignore FileNotFound (davfs temp files disapear) filtered <- ZIO.foreach(children.flatten) { file => IO.effect(filter(File(file.toPath))).catchAll { - case _:FileNotFoundException => // just ignore + case _: FileNotFoundException => // just ignore InventoryProcessingLogger.trace(s"Ignoring file '${file.toString}' when processing old inventories: " + - s"FileNotFoundException (likely if disapeared between directory listing and filtering)" + s"FileNotFoundException (likely it disappeared between directory listing and filtering)" ) *> UIO.effectTotal(None) - case ex: Throwable => // log and switch to the next + case ex: Throwable => // log and switch to the next InventoryProcessingLogger.warn(s"Error when processing file in old inventories: '${file.toString}': ${ex.getMessage}") *> UIO.effectTotal(None) } } - } yield (filtered.flatten)).catchAll(err => - InventoryProcessingLogger.warn(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}")*> - Nil.succeed - ) + } yield (filtered.flatten)).catchAll(err => + InventoryProcessingLogger.warn(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}") *> + Nil.succeed + ) } + def checkFilesOlderThan(d: Duration): UIO[Unit] = for { + files <- listFiles(d) + _ <- ZIO.when(files.nonEmpty) { + InventoryProcessingLogger.debug(s"Found old inventories: ${files.map(_.pathAsString).mkString(", ")}") + } + _ <- addFiles(files) + } yield () +} +/* + * A class that periodically add inventories that we likely missed by inotify + */ +class SchedulerMissedNotify( + checker: CheckExistingInventoryFiles + , period : Duration + , zclock : Clock +){ val schedule = { def loop(d: Duration) = for { - files <- listFiles(d) - _ <- ZIO.when(files.nonEmpty) { - InventoryProcessingLogger.debug(s"Found old inventories: ${files.map(_.pathAsString).mkString(", ")}") - } - _ <- addFiles(files) - _ <- UIO.unit.delay(period) + _ <- checker.checkFilesOlderThan(d) + _ <- UIO.unit.delay(period) } yield () (loop(Duration.Zero) *> loop(period).forever).forkDaemon.provide(zclock) }