Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Initial implementation of distributed and broadcast #1235

Merged
merged 7 commits into from Sep 25, 2018

Conversation

pchlupacek
Copy link
Contributor

@pchlupacek pchlupacek commented Aug 22, 2018

@mpilquist, @SystemFw , @djspiewak, @ChristopherDavenport, @pchiusano , @AdamChlupacek

This is WIP that implements concurrency primitives, that shall allow user easily work with concurrent streams.

Two primitives are introduced:

  • Stream.distribute - allows to distribute items in round robin mode to multiple consumers
  • Stream.broadcast- allows to broadcast items to multiple consumers.

Specifically, the distribute can allow very simple yet powerful fanout implementation and programs like this:

def source: Stream[F, Int] = ???

source.distributeThrough(maxConcurrent=10) { _.sum }.sum.compile.last.unsafeRunSync

The previous allows to distribute source through 10 workers, sum their results and then sum results of all these workers, all in 10 concurrent workers.

The implementation is very rough now, it needs cleanup and testing.

I would love to have feedback from you what do you think, discuss signatures, naming etc.

If you agree I shall move forward, then I would make tests, do cleanup and so on.

else {
this.broadcast[F2].flatMap { source =>
def subscribe(pipe: Pipe[F2, O, O2]) =
source.pull.uncons1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why operate on individual elements here (uncons1) instead of chunks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure on this one. I had it implemented with chunks, essentially saying that if you need to operate on single elements, rechunk to single element chunks. Then, I sort of for consistency did same what we have on queue, so operate on element and leave chunking to user.

I have no strong preference here, whether chunking operation has to be explicit or implicit.

@mpilquist
Copy link
Member

Looks really promising @pchlupacek!

I need a bit more time to absorb this stuff. Naming wise, what do you think of following akka-stream's lead here and renaming distribute to balance? https://doc.akka.io/docs/akka/2.5.3/scala/stream/stream-graphs.html

@@ -0,0 +1,218 @@
package fs2.async.mutable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer this type (and Distribute) in either fs2 package or fs2.async package. The immutable / mutable split confuses folks in my experience.

For 1.0, we could get rid of those packages even. immutable.Signal would become async.ReadableSignal or something and mutable.Signal would become WritableSignal or something, then perhaps Signal mixes in both interfaces?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with getting rid of them. Even if we want to keep the sub typing, it could be ReadOnlySignal or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I would like to consider either remove Topic, or simplify behaviour to be similar with Broadcast

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially the idea would be to define topic with broadcast and queue...

Copy link
Collaborator

@SystemFw SystemFw Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew you were going to get there :P

I guess I'll have to study what Broadcast does more closely, but I feel Broadcast doesn't subsume Topic just as Stream doesn't subsume Queue. You need to be able to "push" elements to a Stream --> Queue, you need to be able to push elements to multiple streams --> Topic, and not in every scenario the pushing comes from a stream (sometimes it's just an F)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially the idea would be to define topic with broadcast and queue...

Ah, so not removing. In which case yeah, definitely up for consideration I think :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SystemFw the problem with topic is that in binds too much stuff together, giving you really low control over whats going on in there. For example, it is unclear how the publish with F should behave when there are no consumers, or the consumers are waiting to complete the last batch.

I hope we would be able somehow fix that, w/o introducing awkward API. In worst case we can leave it as it is, but that would have two distinct implementations of similar behaviour.

Don't get me wrong in our production code we rely on topic, but I believe we can do much better than current implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with topic is that in binds too much stuff together, giving you really low control over whats going on in there

I agree

ut that would have two distinct implementations of similar behaviour.

I agree

Don't get me wrong in our production code we rely on topic, but I believe we can do much better than current implementation.

I agree :)

Thanks for clarifying, we're on the same page

@SystemFw
Copy link
Collaborator

Haven't yet had time to properly review those, but my gut feeling is that I'm going to like them :P

@pchlupacek
Copy link
Contributor Author

@mpilquist @SystemFw this is ready for review now. The failures seems to be unrelated

}

/**
* Like `distribute` but instead of providing stream as source, it runs each stream through
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/distribute/apply/ ?

* may send elements for processing to another machine.
*
* Note that resulting stream will not emit single value (Unit). If you need to emit unit values from your sinks, consider
* using `distributeThrough`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/distribute/balance/

@mpilquist
Copy link
Member

👍 Looks great. I'll likely do another minor ScalaDoc consistency/editing pass if that's okay but will do that after merge. I'm good with merging once @SystemFw takes a look.

Copy link
Collaborator

@SystemFw SystemFw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow, this is really great stuff. Excited to dive more into this and the new PubSub scheme in my own code :)

Will merge now, I agree we should make a pass at the scaladoc afterwards

@@ -253,14 +253,14 @@ private[fs2] object PubSub {
}

def subscribe(selector: Selector): F[Boolean] =
state.modify { ps =>
update { ps =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that important, but I'd rather update be called something else (if possible). I looked at it at first glance, and got confused by assuming if was Ref.update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, feel free to rename it. For example updateAndLoop may be good idea...

@SystemFw SystemFw merged commit b403366 into series/1.0 Sep 25, 2018
@pchlupacek pchlupacek deleted the feature/fan-out branch September 25, 2018 10:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants