BoundedSourceQueue API#29770
Conversation
4cac2d6 to
0cfda82
Compare
0cfda82 to
e746bc5
Compare
|
Test FAILed. |
1 similar comment
|
Test FAILed. |
|
Test FAILed. |
3a48a4a to
66da0be
Compare
|
Test FAILed. |
66da0be to
44aead2
Compare
|
Test FAILed. |
| new Source( | ||
| scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers).mapMaterializedValue(_.asJava)) | ||
| overflowStrategy match { | ||
| case _: OverflowStrategies.DropNew => queue[T](bufferSize).mapMaterializedValue(_.asJavaSourceQueue()) |
There was a problem hiding this comment.
Are we sure that we want to change the queue implementation for DropNew? It's probably much better, but the change in behavior could impact users. I think we should consider deprecating this version of Source.queue instead and promote the new method instead. The deprecation could come in a separate PR (possibly later release) since we probably want to provide a good alternative replacement for at least Backpressure strategy.
There was a problem hiding this comment.
I'll revert this change.
In the future I suggest we deprecate these two overloads altogether (with a backpressure and "fail" strategy alternative). We can't deprecate OverflowStrategies.DropNew because it's used in Source.actorRef as well.
There was a problem hiding this comment.
and we might not need a fail strategy because that can be a decision for the caller (and implemented together with a KillSwitch)
|
Test PASSed. |
johanandren
left a comment
There was a problem hiding this comment.
Looking good, some small things noted.
| # Source.queue | ||
|
|
||
| Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. | ||
| Materialize a `SourceQueue` or `BoundedSourceQueue` onto which elements can be pushed synchronously or asynchronously for emitting from the source. |
There was a problem hiding this comment.
Here's a suggestion of giving an overview about the options right away (I found the distinction between synchronous and asynchronous not really descriptive as a user coming to this page):
| Materialize a `SourceQueue` or `BoundedSourceQueue` onto which elements can be pushed synchronously or asynchronously for emitting from the source. | |
| Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source. | |
| The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` with `OverflowStrategy.dropNew`. The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly. | |
| In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value. In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem. This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises. | |
| In summary, prefer `BoundedSourceQueue` over `SourceQueue` with `OverflowStrategy.dropNew` especially in high-load scenarios. Use `SourceQueue` if you need one of the other `OverflowStrategies`. |
There was a problem hiding this comment.
I initially had more content above the fold like you suggested, but a docs post-processing step didn't like it. I'll take another look to see if I can make that work.
There was a problem hiding this comment.
It looks like a rule was explicitly created to keep the description one line (to keep it short).
https://github.com/akka/akka/blob/master/project/StreamOperatorsIndexGenerator.scala#L244-L248
I like your description though, so I'll try to make it work with the rule intact.
| /** | ||
| * A queue of the given size that gives immediate feedback whether an element could be enqueued or not. | ||
| */ | ||
| trait BoundedQueueSource[T] { |
There was a problem hiding this comment.
Why not BoundedSourceQueue? After all, it's not a source but a queue and that would also be consistent with the existing naming? (I just realized that I confused those also above in my docs suggestions).
There was a problem hiding this comment.
Btw. I think it would be fine if a BoundedQueueSource(Stage) would provide a BoundedSourceQueue. After all these are two opposite ends of looking at the component.
There was a problem hiding this comment.
I renamed BoundedQueueSource to BoundedSourceQueue. I changed the stage too just to make it consistent.
I originally used that convention, but I changed it to what it is now for some reason. I'll make it 👍 |
52d28f1 to
e2d1484
Compare
|
@johanandren @jrudolph Thanks for the thorough review. I accommodated almost all of your feedback. I'll be away next week, but feel free to push commits if necessary to have this completed this sprint. |
|
Test FAILed. |
e2d1484 to
779161d
Compare
|
Test PASSed. |
|
Given that you are away next week, and @jrudolph is away today, let's merge this and follow up with fixes if J has any more feedback when back next week. |
|
Sounds good. Thanks @johanandren 👍 |
Based on @jrudolph's FastDroppingQueue implementation in #29574
A new queue implementation that drops new elements immediately when the buffer is full. Does not use async callbacks like
Source.queuewithOverflowStrategy.dropNew, which can still result in OOM errors (#25798).Other "drop" scenarios (
dropTail,dropHead) may be supported by this implementation if their use can be justified.