Skip to content

Commit

Permalink
Fixes #21665: When we receive a lot of inventories, inotify \"OVERFLO…
Browse files Browse the repository at this point in the history
…W\" is not handled
  • Loading branch information
fanf committed Aug 31, 2022
1 parent 7c81697 commit 46d075e
Showing 1 changed file with 87 additions and 49 deletions.
Expand Up @@ -37,30 +37,30 @@

package com.normation.rudder.inventory

import java.nio.file.ClosedWatchServiceException
import java.util.concurrent.TimeUnit
import better.files._

import com.normation.errors.IOResult
import com.normation.errors.RudderError
import com.normation.inventory.domain.InventoryProcessingLogger

import com.normation.zio.ZioRuntime
import scala.concurrent.ExecutionContext

import zio._
import zio.syntax._
import zio.duration._
import com.normation.zio._
import better.files._
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.TrueFileFilter
import org.joda.time.DateTime

import zio.clock.Clock
import java.io.FileNotFoundException
import java.io.InputStream
import java.nio.file
import java.nio.file.ClosedWatchServiceException
import java.nio.file.StandardWatchEventKinds
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionContext

import zio._
import zio.clock.Clock
import zio.duration._
import zio.syntax._
import com.normation.box.IOManaged
import com.normation.errors.IOResult
import com.normation.errors.RudderError
import com.normation.zio._
import com.normation.zio.ZioRuntime


/*
Expand Down Expand Up @@ -127,11 +127,10 @@ class InventoryFileWatcher(


// service for cleaning
val cleaner = new ProcessOldFiles(fileProcessor, maxOldInventoryAge, takeCareOfUnprocessedFileAfter, InventoryProcessingUtils.hasValidInventoryExtension)
val cleaner = new CheckExistingInventoryFilesImpl(fileProcessor, incoming :: updated :: Nil, maxOldInventoryAge, takeCareOfUnprocessedFileAfter, InventoryProcessingUtils.hasValidInventoryExtension)
// the cron that look for old files (or existing ones on startup/watcher restart)
val cronForMissed = new SchedulerMissedNotify(
incoming :: updated :: Nil
, cleaner.processOldFiles _
cleaner
, collectOldInventoriesFrequency
, ZioRuntime.environment
)
Expand All @@ -155,7 +154,7 @@ class InventoryFileWatcher(
opt.succeed

case None =>
val w = Watchers(incoming, updated, fileProcessor.addFile)
val w = Watchers(incoming, updated, fileProcessor, cleaner)
(for {
_ <- w.start()
// start scheduler for old file, it will take care of inventories
Expand Down Expand Up @@ -215,11 +214,26 @@ final class Watchers(incoming: FileMonitor, updates: FileMonitor) {
}
object Watchers {

def apply(incoming: File, updates: File, checkProcess: File => Unit): Watchers = {
def apply(incoming: File, updates: File, checkNew: HandleIncomingInventoryFile, checkOld: CheckExistingInventoryFiles): Watchers = {
def newWatcher(directory: File): FileMonitor = {
new FileMonitor(directory, recursive = false) {
private var stopRequired = false

// a one element queue to tempo overflow events
val tempoOverflow = ZioRuntime.unsafeRun(ZQueue.dropping[Unit](1))

// process overflow
val overflowFiber = ZioRuntime.unsafeRun((for {
_ <- tempoOverflow.take
// if we are overflowing, we got at least a couple hundred inventories. Wait one minute before continuing
_ <- InventoryProcessingLogger.info("Inotify watcher event overflow: waiting a minute before checking what inventories need to be processed")
_ <- UIO.unit.delay(1.minutes)
// clean-up other overflow that happened during that time
_ <- tempoOverflow.takeAll
_ <- checkOld.checkFilesOlderThan(0.milli)
} yield ()).forever.forkDaemon.provideLayer(ZioRuntime.layers))


/*
* when a file is written, depending about how it is handle, by what process, and at which rate,
* we can have form 1 create only to 1 create and lots of modify events.
Expand All @@ -233,13 +247,30 @@ object Watchers {
override def onModify(file: File, count: Int): Unit = {
// we want to avoid processing some file
if( InventoryProcessingUtils.hasValidInventoryExtension(file) ) {
checkProcess(file)
checkNew.addFile(file)
} else {
val ext = file.extension(includeDot = false, includeAll = false).getOrElse("")
InventoryProcessingLogger.logEffect.debug(s"watcher ignored file ${file.name} (unrecognized extension: '${ext}')")
}
}


override def onUnknownEvent(event: file.WatchEvent[_]): Unit = {
event.kind() match {
case StandardWatchEventKinds.OVERFLOW =>
tempoOverflow.offer(()).runNow

case _ =>
InventoryProcessingLogger.logEffect.debug(s"Inotify sent an unknown event: ${event.getClass.getName}: ${event.kind()} ; ${event.context()}")
}
}

override def onException(exception: Throwable): Unit = {
InventoryProcessingLogger.logEffect.error(s"Error with inotify inventory watcher. If new inventory are not immediately processed," +
s" you may need to restart watcher (see https://docs.rudder.io/api/#tag/Inventories/operation/fileWatcherRestart):" +
s"${exception.getClass.getName}: ${exception.getMessage}")
}

// When we call "stop" on the FileMonitor, it always throws a ClosedWatchServiceException.
// It seems to be because the "run" in start continue to try to execute its
// action on the stop watcher. We need to catch that.
Expand All @@ -263,6 +294,7 @@ object Watchers {
}

override def close() = {
ZioRuntime.unsafeRun(overflowFiber.interrupt)
stopRequired = true
service.close()
}
Expand Down Expand Up @@ -296,12 +328,17 @@ final case class FilteredFiles(
)

// A class that filter and add back inventory files
class ProcessOldFiles(
trait CheckExistingInventoryFiles {
def checkFilesOlderThan(d: Duration): UIO[Unit]
}

class CheckExistingInventoryFilesImpl(
fileProcessor: HandleIncomingInventoryFile
, directories: List[File]
, purgeAfter: Duration // time after which old inventories are deleted
, waitingSignatureTime: Duration
, hasValidExtension: File => Boolean
) {
) extends CheckExistingInventoryFiles {

def processOldFiles(files: List[File]): UIO[Unit] = {
val now = DateTime.now()
Expand Down Expand Up @@ -376,21 +413,10 @@ class ProcessOldFiles(
}
}
}
}

/*
* A class that periodically add inventories that we likely missed by inotify
*/
class SchedulerMissedNotify(
directories: List[File]
, addFiles : List[File] => UIO[Unit]
, period : Duration
, zclock : Clock
){

/*
* List all files that need to be added again - simply all files older than period.
* Be careful that files can disapear in the middle of a java.nio.Files.walk, so just
* Be careful that files can disappear in the middle of a java.nio.Files.walk, so just
* don't use that (or better files methods that use that)
* See: https://issues.rudder.io/issues/19268
*/
Expand All @@ -399,36 +425,48 @@ class SchedulerMissedNotify(
(for {
// if that fails, just exit
ageLimit <- IOResult.effect(DateTime.now().minusMillis(d.toMillis.toInt))
filter = (f:File) => if(f.exists && InventoryProcessingUtils.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))) Some(f) else None
filter = (f: File) => if (f.exists && InventoryProcessingUtils.hasValidInventoryExtension(f) && (f.isRegularFile && ageLimit.isAfter(f.lastModifiedTime.toEpochMilli))) Some(f) else None
// if the listing fails, just exit
children <- ZIO.foreach(directories)(d => IOResult.effect(FileUtils.listFilesAndDirs(d.toJava, TrueFileFilter.TRUE, TrueFileFilter.TRUE).asScala))
// filter file by file. In case of error, just skip it. Speficically ignore FileNotFound (davfs temp files disapear)
// filter file by file. In case of error, just skip it. Specifically ignore FileNotFound (davfs temp files disapear)
filtered <- ZIO.foreach(children.flatten) { file =>
IO.effect(filter(File(file.toPath))).catchAll {
case _:FileNotFoundException => // just ignore
case _: FileNotFoundException => // just ignore
InventoryProcessingLogger.trace(s"Ignoring file '${file.toString}' when processing old inventories: " +
s"FileNotFoundException (likely if disapeared between directory listing and filtering)"
s"FileNotFoundException (likely it disappeared between directory listing and filtering)"
) *> UIO.effectTotal(None)
case ex: Throwable => // log and switch to the next
case ex: Throwable => // log and switch to the next
InventoryProcessingLogger.warn(s"Error when processing file in old inventories: '${file.toString}': ${ex.getMessage}") *>
UIO.effectTotal(None)
}
}
} yield (filtered.flatten)).catchAll(err =>
InventoryProcessingLogger.warn(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}")*>
Nil.succeed
)
} yield (filtered.flatten)).catchAll(err =>
InventoryProcessingLogger.warn(s"Error when looking for old inventories that weren't processed: ${err.fullMsg}") *>
Nil.succeed
)
}

def checkFilesOlderThan(d: Duration): UIO[Unit] = for {
files <- listFiles(d)
_ <- ZIO.when(files.nonEmpty) {
InventoryProcessingLogger.debug(s"Found old inventories: ${files.map(_.pathAsString).mkString(", ")}")
}
_ <- addFiles(files)
} yield ()
}

/*
* A class that periodically add inventories that we likely missed by inotify
*/
class SchedulerMissedNotify(
checker: CheckExistingInventoryFiles
, period : Duration
, zclock : Clock
){
val schedule = {
def loop(d: Duration) = for {
files <- listFiles(d)
_ <- ZIO.when(files.nonEmpty) {
InventoryProcessingLogger.debug(s"Found old inventories: ${files.map(_.pathAsString).mkString(", ")}")
}
_ <- addFiles(files)
_ <- UIO.unit.delay(period)
_ <- checker.checkFilesOlderThan(d)
_ <- UIO.unit.delay(period)
} yield ()
(loop(Duration.Zero) *> loop(period).forever).forkDaemon.provide(zclock)
}
Expand Down

0 comments on commit 46d075e

Please sign in to comment.