Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Source.subscriber().to(Sink.publisher()) work #16923

Closed
rkuhn opened this issue Feb 24, 2015 · 22 comments
Closed

make Source.subscriber().to(Sink.publisher()) work #16923

rkuhn opened this issue Feb 24, 2015 · 22 comments

Comments

@rkuhn
Copy link
Contributor

rkuhn commented Feb 24, 2015

Currently this is not supported by the materialization scheme, it will simply not do anything since both ends of this connection are “lazy”. The materializer should realize this and insert a dummy-Processor.

@rkuhn rkuhn added 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding t:stream labels Feb 24, 2015
@rkuhn rkuhn added this to the streams-1.0 milestone Feb 24, 2015
@drewhk
Copy link
Member

drewhk commented Feb 24, 2015

Note: the test case should also include the variants:

Sink.subscriber().withAttributes(...).to(...)

.. and the same with Sink, since withAttributes introduces wrapping in the internal layout.

@rkuhn rkuhn modified the milestones: streams-1.x, streams-1.0-RC1 Mar 30, 2015
@mehmeteking
Copy link

I plan to tackle this issue. If OK, on which branch should I work?

@ktoso
Copy link
Member

ktoso commented Apr 6, 2015

Cool, thanks! If you'd like to give it a shot then branch off release-2.3-dev and check contributing.md for general development hints.

@viktorklang
Copy link
Member

What's the status here?

@rkuhn
Copy link
Contributor Author

rkuhn commented Jun 5, 2015 via email

@viktorklang
Copy link
Member

File copying?

@drewhk
Copy link
Member

drewhk commented Jun 6, 2015

WDYM file copy? How does it relate to Source.publisher() and Sink.subscriber?

@viktorklang
Copy link
Member

viktorklang commented Jun 6, 2015 via email

@drewhk
Copy link
Member

drewhk commented Jun 6, 2015

I think there is a misunderstanding here. What you want works:

def copy(p: Publisher[ByteString], s: Subscriber[ByteString]) = Source(p).to(Sink(s)).run()

The ticket here is about the Sink and Source that materializes to a Publisher and Subscriber. Source.pubisher.to(Sink.subscriber) should translate to an identity Processor since it does nothing with the elements.

@viktorklang
Copy link
Member

No, no misunderstanding, I am talking about a method defined outside of
Akka.

On Sat, Jun 6, 2015 at 9:31 AM, drewhk notifications@github.com wrote:

I think there is a misunderstanding here. What you want works:

def copy(p: Publisher[ByteString], s: Subscriber[ByteString]) = Source(p).to(Sink(s)).run()

The ticket here is about the Sink and Source that materializes to a
Publisher and Subscriber. Source.pubisher.to(Sink.subscriber) should
translate to an identity Processor since it does nothing with the elements.


Reply to this email directly or view it on GitHub
#16923 (comment).

Cheers,

@drewhk
Copy link
Member

drewhk commented Jun 6, 2015

But the code example you pasted is very easy to implement and has nothing to do with the bug discussed in this ticket. What is the real world example you think is problematic?

@viktorklang
Copy link
Member

My point is that any generic code that hooks up Publishers to Subscribers
could be "vulnerable".
And hooking up a Publisher to a Subscriber is the entire point of RS, so it
should work, or do you disagree?

On Sat, Jun 6, 2015 at 10:18 AM, drewhk notifications@github.com wrote:

But the code example you pasted is very easy to implement and has nothing
to do with the bug discussed in this ticket. What is the real world example
you think is problematic?


Reply to this email directly or view it on GitHub
#16923 (comment).

Cheers,

@drewhk
Copy link
Member

drewhk commented Jun 6, 2015

But in this bug there are no hooking up of Publishers to Subscribers. As I showed in my example snippet above, that scenario works perfectly fine.

Source(publisher) is not the same as Source.subscriber(). The latter is in reality a symbolic notation to express that you want the Subscriber interface of the stage to the right, while Sink.publisher is a symbolic notation that you want to access the Publisher of the stage to the left. Neither of these modues (the sink and the source) introduce a new Processor, they always expose the Processors to the left or right. For example

Source.subscriber.via(someProcessor).toMat(Sink.publisher)(Keep.both)

Will materialize into a pair of Subsriber and Publisher, directly giving access to someProcessor (or if it is a chain, then to the leftmost Subscriber and the rightmost Publisher). When you say

Source.subscriber.toMat(Sink.publisher)(Keep.both)

Then you basically instructed the framework to take the subscriber of the stage on the right, but that stage is instructed to take the publisher of the stage to the left. I.e. this does not work right now. Of course

Source.subsciber.via(Flow[T]).toMat(Sink.publisher)(Keep.both)

does work, since the empty Flow will materialize to an identity processor, and the materialized result will give you its Subscriber and Publisher interfaces.

This ticket is a real bug of course, but it is not trivial to fix, and is a really weird corner case.

@drewhk
Copy link
Member

drewhk commented Jun 6, 2015

I try to fix it now, I might have a good solution...

@drewhk drewhk added 3 - in progress Someone is working on this ticket and removed 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding labels Jun 6, 2015
@drewhk drewhk modified the milestones: streams-1.0-RC4, streams-1.x Jun 6, 2015
drewhk added a commit to drewhk/akka that referenced this issue Jun 6, 2015
drewhk added a commit to drewhk/akka that referenced this issue Jun 15, 2015
drewhk pushed a commit that referenced this issue Jun 15, 2015
=str #16923: Inject identity between SubscriberSource and PublisherSink
@drewhk drewhk removed the 3 - in progress Someone is working on this ticket label Jun 15, 2015
@drewhk drewhk closed this as completed Jun 15, 2015
@jroper
Copy link
Contributor

jroper commented Jun 16, 2015

Could I check, does this fix also fix the NPE thrown when you run this against RC3:

Source(1 to 3).runWith(
  Sink.publisher[Int].mapMaterializedValue(p => Source(p).runFold(0)(_ + _))
)

@drewhk
Copy link
Member

drewhk commented Jun 16, 2015

Ok, file a ticket. I think it is because of the mapping functions on materialized values are evaluated during materialization, which is not safe in this case.

Btw, why do you need this? Why not

Source(1 to 3).runFold(...)

or

Source(1 to 3).to(Sink.fold(...)).run()

You only need Sink.publisher if you need access to the raw Publisher, which you usually need at the end of the stream and only if you are working with other RS implementations that accept a Publisher. Even then, if they don't need a Publisher passed, but they can provide a Subscriber instead you are better off with

Source(1 to 3).to(Sink(3rdPartySubscriber)).run()

@jroper
Copy link
Contributor

jroper commented Jun 16, 2015

My use case is actually to flatten Future[Sink[E, Future[A]]] to Sink[E, Future[A]], the example code above was just trying to simplify it into a reproducable test. Here's my actual code:

val future: Future[Sink[T, Future[M]]] = ...
Sink.publisher[T].mapMaterialized { publisher =>
  future.recover {
    case error => Sink.cancelled[T].mapMaterialized(_ => Future.failed(error))
  } flatMap { sink =>
    Source(publisher).toMat(sink)(Keep.right).run()
  }
}

If you can see another way to do this (especially if it doesn't require a materialiser), I'm very interested to know.

@drewhk
Copy link
Member

drewhk commented Jun 16, 2015

Hmm, why do you need a Future[Sink] in the first place? There is no flattening for Sinks, but It is possible to flatten Sources like

Source(sourceFuture).mapAsync(identity).flatten(FlattenStrategy.concat)

But the issue here is that it is not idiomatic, since Source/Sink are lazy by default, i.e. it only executes when the stream runs, not before, so it is lazy already. Can you give me the use case for Future[Sink]? I think it might be better just to create a new Sink that solves your problem.

Btw, in your case, if you want to still do the hacky way, just use fanoutPublisher:

Sink.fanoutPublisher(4, 4).mapMaterialized ...

The above will work, because it creates an actual actor. The problem with Sink.publisher is that it is just a marker that "I want to access the Publisher to my left", which is problematic during materialization since "to the left" might not have been started up yet. In other words, Sink.publisher does not create anything runnable, it always piggybacks on the existing stages.

@jroper
Copy link
Contributor

jroper commented Jun 16, 2015

Play Framework actions return a Sink to consume the body - they essentially look like this:

RequestHeader => Sink[ByteString, Future[(ResponseHeader, Source[ByteString])]

Now, let's say that body is being uploaded to S3 directly, so the Sink will be the HTTP client. But before we even make the request, we need to authenticate the user, and that authentication is asynchronous. So we'll have a Future of a Sink to consume the body, which means we need to flatten it to a Sink before we can consume it.

Of course, we could change the API to return a Future of a Sink, instead of just a Sink, but conceptually a Future of a Sink is just a Sink that won't do anything in response to its onSubscribe method until sometime in the future, so that feels like it would be making the API needlessly complex.

@rkuhn
Copy link
Contributor Author

rkuhn commented Jun 17, 2015

Thanks for this example, @jroper, makes perfect sense to me. So it’s Reactive Iteratees all over :-)

@viktorklang
Copy link
Member

Reacteratees, folks! You heard it here first!

Cheers,

On 17 Jun 2015 08:33, "Roland Kuhn" notifications@github.com wrote:

Thanks for this example, @jroper https://github.com/jroper, makes
perfect sense to me. So it’s Reactive Iteratees all over :-)


Reply to this email directly or view it on GitHub
#16923 (comment).

@drewhk
Copy link
Member

drewhk commented Jun 17, 2015

@jroper ok, that makes sense. Currently just use the fanoutPublisher workaround which I posted above, and hopefully we will fix the original problem in the meantime.

ktoso pushed a commit to ktoso/akka that referenced this issue Jan 11, 2016
ktoso pushed a commit to ktoso/akka that referenced this issue Jan 11, 2016
ktoso pushed a commit to ktoso/akka that referenced this issue Jan 11, 2016
…-drewhk

=str akka#16923: Inject identity between SubscriberSource and PublisherSink
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants