-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes #14870: Use ZIO for effect management in Rudder #2218
Fixes #14870: Use ZIO for effect management in Rudder #2218
Conversation
Can you make it one commit ?? |
@VinceMacBuche yes, once last bit of cleaning are done. |
ok You write down that you need one more commit, ok nd you right i did not read ... |
PR rebased |
a357906
to
e39c66a
Compare
// file is currently modified, avoid processing | ||
final case class Mod(file: File) extends WatchEvent | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we replace a variable map + monix interuptible task + do on finish with a queue which takes two even: one for "file modified", and one for "file need cleaning". We keep the same logic for map+interrutible task (fiber are always interreruptibles, and should be lighter than monix task) and we enqueue a "can be clean" even when the process actually starts.
|
||
case Some(ref) => | ||
ref := newTask(file).runAsync | ||
//start the process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The place where we start the process that continuously look for event in the loop.
def saveInventory(info: SaveInventoryInfo): IOResult[InventoryProcessStatus] = { | ||
|
||
def saveWithSignature(report: InventoryReport, newInventoryStream: () => InputStream, newSignature: () => InputStream): IOResult[InventoryProcessStatus] = { | ||
ZIO.bracket(Task.effect(newInventoryStream()).mapError(SystemError(s"Error when reading inventory file '${report.name}'", _)))(is => Task.effect(is.close()).run) { inventoryStream => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
brackets are love
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your bracket is open, can you zip it ? (just joking with the french word "braguette", ignore that comment, please)
* We manage inventories buffering with a bounded queue. | ||
* In case of overflow, we reject new inventories. | ||
*/ | ||
val queue = ZioRuntime.unsafeRun(Queue.dropping[InventoryReport](maxQueueSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inventory queue was managed by an AtomicInt+appropriate locking. We now quand just use an async queue with 'dropping' strategy. In place of checking if the AtomicInt is too big, we just check if offer
returned true or false.
OK, merging this PR |
This PR superseed #2206.
The description of context is available here: https://issues.rudder.io/issues/14870
This is now ready.