New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes #16773: Batch of new nodes can overflow rudder server with inventories #3367
Fixes #16773: Batch of new nodes can overflow rudder server with inventories #3367
Conversation
PR updated with a new commit |
PR updated with a new commit |
PR updated with a new commit |
inv <- saveInventoryBuffer.take | ||
// deduplicate. TakeAll is not blocking | ||
all <- saveInventoryBuffer.takeAll | ||
_ <- saveInventoryBuffer.offerAll(all.filterNot { case (f, _) => inv._1.name == f.name }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't there a risk that we loose inventory ?
Doc says:
- For Sliding Queue: uses
Sliding
Strategy - If there is room in the queue, it places the values otherwise it removes the old elements and
- enqueues the new ones. Always returns true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it contradict the comment up there: if we reoffer old inventories, they become new
* inventory. | ||
* As only a small structure is saved (SaveInventory is a name and two function to create stream from file), buffer can be big. | ||
* When queue is full, we prefer to drop old inventories to new ones. In the worst case, they will be catched up by the | ||
* scheduler. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can still be surprising. Why not using a bounded queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to have a non blocking, RAM-bounbed structure. If there is a bug somewhere in the code (which is far from impossible for concurrent and async processing which involves inotify and moving files around. A blocking structure can lead to thread accumulation or even deadlock, while here, in the worst case we loose an event (and so an inventory processing). But it's easy to set up workaround for that, which is: SchedulerMissedNotify
.
def safeMove[T](chunk: =>T): IOResult[Unit] = { | ||
Task.effect{chunk ; ()}.catchAll { | ||
case ex: NoSuchFileException => // ignore | ||
InventoryProcessingLogger.debug(s"Ignored exception '${ex.getClass.getSimpleName} ${ex.getMessage}'. The was file was correctly handled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message is not very explicit
"The file '${inventory.pathAsString}' was correctly handled" maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arf :)
PR updated with a new commit |
This PR is not mergeable to upper versions. |
OK, squash merging this PR |
6eb9284
to
282148b
Compare
https://issues.rudder.io/issues/16773
Add a
saveInventoryBuffer
that stays between inotify watcher (in charge of the complexe logic of making sense of inotify events and waiting for signature) and the part that [parse inventory, check signature, forward to backend] and is heavy on CPU and RAM.That buffer can be big (chose 1024 without much rationnal safe that it's big and a power of 2, as asked in ZQueue comment).
The main benefits are:
Only the code path for inventories coming from file system has the buffer, because for REST, the API await the pre-processing status. So we need to wait for signature check etc, and so that part must be synchronous. A better API would be to just move uploaded inventory/signature to
/var/rudder/inventories/incoming
(or a dedicated incoming directory) and from that point, let them follow the usual path - but it's an API change which will need a major version for change.Other important code changes:
InventoryFileWatcher.scala
,sendToProcessor
was renamed tosendToProcessorBlocking
and moved out ofprocessFile
.processFile
now send files to process to the buffer.InventoryProcessor.scala
, the backend waiting queue,queue
was renamed toblockingQueue
and as the name implies, it is now a BLOCKING queue. Its default size is reduced inconfigurations.properties.sample
because now, the only interest of a big queue would be for people using A LOT rest endpoint,blockingQueue
in the REST API code path to keep previous, non-blocking behavior.