New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BoundedSourceQueue API #29770
BoundedSourceQueue API #29770
Conversation
4cac2d6
to
0cfda82
Compare
0cfda82
to
e746bc5
Compare
Test FAILed. |
1 similar comment
Test FAILed. |
Test FAILed. |
3a48a4a
to
66da0be
Compare
Test FAILed. |
66da0be
to
44aead2
Compare
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good overall!
akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSourceSpec.scala
Outdated
Show resolved
Hide resolved
new Source( | ||
scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers).mapMaterializedValue(_.asJava)) | ||
overflowStrategy match { | ||
case _: OverflowStrategies.DropNew => queue[T](bufferSize).mapMaterializedValue(_.asJavaSourceQueue()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that we want to change the queue implementation for DropNew? It's probably much better, but the change in behavior could impact users. I think we should consider deprecating this version of Source.queue
instead and promote the new method instead. The deprecation could come in a separate PR (possibly later release) since we probably want to provide a good alternative replacement for at least Backpressure
strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll revert this change.
In the future I suggest we deprecate these two overloads altogether (with a backpressure and "fail" strategy alternative). We can't deprecate OverflowStrategies.DropNew
because it's used in Source.actorRef
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we might not need a fail strategy because that can be a decision for the caller (and implemented together with a KillSwitch)
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good, some small things noted.
akka-stream-tests/src/test/scala/akka/stream/scaladsl/BoundedQueueSourceSpec.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/BoundedQueueSource.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/BoundedQueueSource.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this work, @seglo! I mostly looked at the documentation. I guess my biggest non-cosmetic suggestion is to rename BoundedQueueSource
to BoundedSourceQueue
...
@@ -1,15 +1,35 @@ | |||
# Source.queue | |||
|
|||
Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. | |||
Materialize a `SourceQueue` or `BoundedSourceQueue` onto which elements can be pushed synchronously or asynchronously for emitting from the source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a suggestion of giving an overview about the options right away (I found the distinction between synchronous
and asynchronous
not really descriptive as a user coming to this page):
Materialize a `SourceQueue` or `BoundedSourceQueue` onto which elements can be pushed synchronously or asynchronously for emitting from the source. | |
Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source. | |
The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` with `OverflowStrategy.dropNew`. The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly. | |
In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value. In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem. This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises. | |
In summary, prefer `BoundedSourceQueue` over `SourceQueue` with `OverflowStrategy.dropNew` especially in high-load scenarios. Use `SourceQueue` if you need one of the other `OverflowStrategies`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially had more content above the fold like you suggested, but a docs post-processing step didn't like it. I'll take another look to see if I can make that work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like a rule was explicitly created to keep the description one line (to keep it short).
https://github.com/akka/akka/blob/master/project/StreamOperatorsIndexGenerator.scala#L244-L248
I like your description though, so I'll try to make it work with the rule intact.
/** | ||
* A queue of the given size that gives immediate feedback whether an element could be enqueued or not. | ||
*/ | ||
trait BoundedQueueSource[T] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not BoundedSourceQueue
? After all, it's not a source but a queue and that would also be consistent with the existing naming? (I just realized that I confused those also above in my docs suggestions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw. I think it would be fine if a BoundedQueueSource(Stage)
would provide a BoundedSourceQueue
. After all these are two opposite ends of looking at the component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed BoundedQueueSource
to BoundedSourceQueue
. I changed the stage too just to make it consistent.
I originally used that convention, but I changed it to what it is now for some reason. I'll make it 👍 |
52d28f1
to
e2d1484
Compare
@johanandren @jrudolph Thanks for the thorough review. I accommodated almost all of your feedback. I'll be away next week, but feel free to push commits if necessary to have this completed this sprint. |
Test FAILed. |
e2d1484
to
779161d
Compare
Test PASSed. |
Given that you are away next week, and @jrudolph is away today, let's merge this and follow up with fixes if J has any more feedback when back next week. |
Sounds good. Thanks @johanandren 👍 |
Based on @jrudolph's FastDroppingQueue implementation in #29574
A new queue implementation that drops new elements immediately when the buffer is full. Does not use async callbacks like
Source.queue
withOverflowStrategy.dropNew
, which can still result in OOM errors (#25798).Other "drop" scenarios (
dropTail
,dropHead
) may be supported by this implementation if their use can be justified.