Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.delayWith allows custom delay for each element. #25000

Merged
merged 4 commits into from Nov 27, 2019

Conversation

@jbgi
Copy link
Contributor

jbgi commented Apr 27, 2018

Similar functionality already exist in contrib but does not handle any buffer. Goal is also to remove duplicate code and functionality between the two implementations. Part of the code is just copy from contrib implementation (DelayStrategy and tests)

For the record, my use case is is to implement retry queues: cf. https://blog.pragmatists.com/retrying-consumer-architecture-in-the-apache-kafka-939ac4cb851a - Each element is tagged with a retry timestamp; the DelayStrategy use it to determine the delay before retry.
I believe it is a common and important use case.

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Apr 27, 2018

Thank you for your pull request! After a quick sanity check one of the team will reply with 'OK TO TEST' to kick off our automated validation on Jenkins. This compiles the project, runs the tests, and checks for things like binary compatibility and source code formatting. When two team members have also manually reviewed and (perhaps after asking for some amendments) accepted your contribution, it should be good to be merged.

For more details about our contributing process, check out CONTRIBUTING.md - and feel free to ask!

Copy link
Member

johanandren left a comment

Seems useful and general enough that it could go directly in streams.

* It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
*
* Note that elements are not re-ordered: if an element is given a delay much shorter than it predecessor, it will still
* have to wait for the preceding element before being emitted.

This comment has been minimized.

Copy link
@johanandren

johanandren Apr 30, 2018

Member

👍 Good spelling this out!

* @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delayWith(delayStrategySupplier: () DelayStrategy[Out], strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] =

This comment has been minimized.

Copy link
@johanandren

johanandren Apr 30, 2018

Member

Default parameters are no fun to keep binary compatible, but I think that maybe it is important to be explicit with the fact that elements may get lost, so maybe just have no default/overload at all?

This comment has been minimized.

Copy link
@jbgi

jbgi Apr 30, 2018

Author Contributor

Agreed (especially since the default was lossy). removed default value in last commit.

@jbgi jbgi force-pushed the jbgi:delay-with branch from 95a328d to 337f8bf Apr 30, 2018
@raboof

This comment has been minimized.

Copy link
Member

raboof commented Apr 30, 2018

OK TO TEST

@jbgi jbgi force-pushed the jbgi:delay-with branch from 337f8bf to c2d42b4 Apr 30, 2018
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Apr 30, 2018

Test FAILed.

1 similar comment
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Apr 30, 2018

Test FAILed.

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented May 15, 2018

Test FAILed.

@johanandren

This comment has been minimized.

Copy link
Member

johanandren commented May 15, 2018

Hmm, I don't quite understand the error here, could it be the restructure of stream operator docs that is interfering with the docs of this? Can you rebase and see if that helps?

@jbgi

This comment has been minimized.

Copy link
Contributor Author

jbgi commented May 15, 2018

@johanandren as I understand a doc for this new method need to be added, somewhere?

@ktoso

This comment has been minimized.

Copy link
Member

ktoso commented May 15, 2018

Correct, each operator has to have a documentation page; So this one has to get [error] Caused by: java.io.FileNotFoundException: /localhome/jenkinsakka/workspace/pr-validator-per-commit-jenkins/akka-docs/src/main/paradox/stream/operators/Flow/delayWith.md (No such file or directory) and fill it in using the exact same style as other operators (copy any file from there, and change names / docs)

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Aug 22, 2018

Test FAILed.

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Aug 22, 2018

Test FAILed.

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 4, 2019

Test FAILed.

@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 4, 2019

Test FAILed.

@raboof raboof force-pushed the jbgi:delay-with branch from 11887ae to 972db4e Oct 4, 2019
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Oct 4, 2019

Test PASSed.

@raboof raboof force-pushed the jbgi:delay-with branch from 972db4e to 2f3234f Nov 25, 2019
@akka-ci

This comment has been minimized.

Copy link
Collaborator

akka-ci commented Nov 25, 2019

Test PASSed.

@raboof
raboof approved these changes Nov 26, 2019
@johanandren johanandren added this to the 2.6.1 milestone Nov 27, 2019
@johanandren johanandren merged commit 6d893fb into akka:master Nov 27, 2019
3 checks passed
3 checks passed
Jenkins PR Validation Test PASSed. 5172 tests run, 458 skipped, 0 failed.
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
typesafe-cla-validator All users have signed the CLA
Details
navaro1 added a commit to navaro1/akka that referenced this pull request Dec 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.