New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream cookbook #16498
Stream cookbook #16498
Conversation
Test FAILed. |
currentLoad ~> collector.left | ||
reportTicks ~> collector.right | ||
|
||
collector.out ~> sink |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any zipping with time would need some kind of (in the text of the docs) hints about buffer sizes etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thats the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Looks good so far! |
if (buffer.isEmpty) ctx.pull() | ||
else { | ||
val emit = buffer.take(chunkSize) | ||
buffer = buffer.drop(chunkSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a splitAt
?
lots of awesome stuff here! |
Test FAILed. |
5dee3a9
to
6051f27
Compare
Now added docs |
there is a flaky test case still, and missing the Java part |
|
||
We express our solution as a :class:`StatefulStage` because it has support for emitting multiple elements easily | ||
through its ``emit(iterator, ctx)`` method. Since an incoming ByteString chunk might contain multiple lines (frames) | ||
this feature comes handy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
looking very good |
LGTM after minor updates and perhaps adding the example Will mentioned (rate limiting multiple streams) |
Added a rate limiter |
Test FAILed. |
} | ||
|
||
// get a stream of word counts | ||
val counts: Source[(String, Int)] = countedWords.mapAsync(identity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can deadlock
3a18f86
to
3a8b662
Compare
Test PASSed. |
…rewhk Stream cookbook
NOT FOR MERGE
Just a quick preview, WIP. Feel free to recommend new recipes.