New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can merging of multiple sources be prioritized? #22864

Merged
merged 15 commits into from Jun 28, 2017

Conversation

Projects
None yet
6 participants
@arpanchaudhury
Contributor

arpanchaudhury commented May 7, 2017

This is a fan-in junction which leverages the capability on scala.util.Random to prioritize data coming from multiple sources. This is quite similar to MergePreferred except that it is fair and tries to consume data in the ratio of provided priorities.

Usage

val elementCount = 10000
val s1 = Source.fromIterator(() => Seq.fill(elementCount)(1).iterator)
val s2 = Source.fromIterator(() => Seq.fill(elementCount)(2).iterator)
val s3 = Source.fromIterator(() => Seq.fill(elementCount)(3).iterator)
val combinedSource = Source.combine(s1, s2, s3)(MergePrioritized(_, immutable.Seq(6, 3, 1)))

In the above example three sources s1, s2 and s3 are merged with ratio 6 :: 3 :: 1. For infinite stream of data pushed through these sources MergePrioritized maintains the ratio while merging.

NB: If data pushed through is small the ratio might not be maintained exactly.

In this pull request I have added MergePrioritized stage.

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci May 7, 2017

Collaborator

Can one of the repo owners verify this patch?

Collaborator

akka-ci commented May 7, 2017

Can one of the repo owners verify this patch?

@raboof

This comment has been minimized.

Show comment
Hide comment
@raboof

raboof May 7, 2017

Member

OK TO TEST

Member

raboof commented May 7, 2017

OK TO TEST

@akka-ci akka-ci added validating tested and removed validating labels May 7, 2017

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci May 7, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented May 7, 2017

Test PASSed.

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren May 7, 2017

Member

Hi @arpanchaudhury, while we appreciate PRs, and this particular one might be interesting, please follow the contributor guide (https://github.com/akka/akka/blob/master/CONTRIBUTING.md) and start with a ticket so we can first discuss your idea, and then first after that move on to creating a PR. That way there is no risk that you put effort into making a PR that is then not merged because of being a bad fit in akka-streams core.

Member

johanandren commented May 7, 2017

Hi @arpanchaudhury, while we appreciate PRs, and this particular one might be interesting, please follow the contributor guide (https://github.com/akka/akka/blob/master/CONTRIBUTING.md) and start with a ticket so we can first discuss your idea, and then first after that move on to creating a PR. That way there is no risk that you put effort into making a PR that is then not merged because of being a bad fit in akka-streams core.

@arpanchaudhury

This comment has been minimized.

Show comment
Hide comment
@arpanchaudhury

arpanchaudhury May 7, 2017

Contributor

Creating an issue as suggested. #22865

Contributor

arpanchaudhury commented May 7, 2017

Creating an issue as suggested. #22865

@arpanchaudhury arpanchaudhury changed the title from Can Merging of multiple sources be prioritized? to Can merging of multiple sources be prioritized? May 7, 2017

@akka-ci akka-ci added validating and removed tested labels May 8, 2017

@akka-ci akka-ci added validating tested and removed validating labels May 8, 2017

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci May 8, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented May 8, 2017

Test PASSed.

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci May 8, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented May 8, 2017

Test PASSed.

@akka-ci akka-ci added the tested label May 8, 2017

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci May 8, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented May 8, 2017

Test PASSed.

@akka-ci akka-ci added validating and removed tested labels May 10, 2017

@akka-ci akka-ci added tested and removed validating labels Jun 8, 2017

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Jun 8, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented Jun 8, 2017

Test PASSed.

@arpanchaudhury

This comment has been minimized.

Show comment
Hide comment
@arpanchaudhury

arpanchaudhury Jun 8, 2017

Contributor

PR Updated 😃

Contributor

arpanchaudhury commented Jun 8, 2017

PR Updated 😃

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Jun 8, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented Jun 8, 2017

Test PASSed.

@patriknw

looking good, but I think the javadsl has to be changed

Show outdated Hide outdated akka-docs/src/main/paradox/java/stream/stream-graphs.md
(ones / twos).round shouldEqual 2
(ones / threes).round shouldEqual 6
(twos / threes).round shouldEqual 3

This comment has been minimized.

@patriknw

patriknw Jun 28, 2017

Member

so this could fail if we are unlucky, I guess the risk is small enough

@patriknw

patriknw Jun 28, 2017

Member

so this could fail if we are unlucky, I guess the risk is small enough

This comment has been minimized.

@arpanchaudhury

arpanchaudhury Jun 28, 2017

Contributor

I will increase no of elements in each source to 20k, currently its 10k. Just saw the test failed in CI.

@arpanchaudhury

arpanchaudhury Jun 28, 2017

Contributor

I will increase no of elements in each source to 20k, currently its 10k. Just saw the test failed in CI.

Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala
Show outdated Hide outdated akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Jun 28, 2017

Collaborator

Test FAILed.

Collaborator

akka-ci commented Jun 28, 2017

Test FAILed.

@akka-ci

This comment has been minimized.

Show comment
Hide comment
@akka-ci

akka-ci Jun 28, 2017

Collaborator

Test PASSed.

Collaborator

akka-ci commented Jun 28, 2017

Test PASSed.

@patriknw

LGTM

@patriknw patriknw merged commit bfb8f16 into akka:master Jun 28, 2017

2 checks passed

Jenkins PR Validation Test PASSed. 8138 tests run, 463 skipped, 0 failed.
Details
typesafe-cla-validator All users have signed the CLA
Details

@arpanchaudhury arpanchaudhury deleted the arpanchaudhury:wip-source-priority-merge branch Jun 28, 2017

richardimaoka added a commit to richardimaoka/akka that referenced this pull request Jul 1, 2017

Can merging of multiple sources be prioritized? (akka#22864)
* Adding MergePrioritized graph stage

* Adding MergePrioritized in akka docs

* Adding suggested documentation of MergePrioritized in akka docs

* Adding thread safe random number generator ThreadLocalRandom

* fixing documentation for MergePrioritized

* Refactoring selectNextElement() in MergePrioritized for less memory allocations

* Removing extra parameter in MergePrioritized and using SplittableRandom

* Refactoring GraphMergePrioritizedSpec

* Changes for paradox migration

* Optimized a few methods in MergePrioritized (akka#22864)

* increased no of elements in source in GraphMergePrioritizedSpec (akka#22864)

richardimaoka added a commit to richardimaoka/akka that referenced this pull request Jul 22, 2017

Can merging of multiple sources be prioritized? (akka#22864)
* Adding MergePrioritized graph stage

* Adding MergePrioritized in akka docs

* Adding suggested documentation of MergePrioritized in akka docs

* Adding thread safe random number generator ThreadLocalRandom

* fixing documentation for MergePrioritized

* Refactoring selectNextElement() in MergePrioritized for less memory allocations

* Removing extra parameter in MergePrioritized and using SplittableRandom

* Refactoring GraphMergePrioritizedSpec

* Changes for paradox migration

* Optimized a few methods in MergePrioritized (akka#22864)

* increased no of elements in source in GraphMergePrioritizedSpec (akka#22864)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment