New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method for prematerializing Sink/Source/Flow etc. #17769

Closed
drewhk opened this Issue Jun 18, 2015 · 21 comments

Comments

Projects
None yet
5 participants
@drewhk
Member

drewhk commented Jun 18, 2015

For example

val (ref, source) = Source.actorRef[Int].preMaterialize(materializer)

Will return the materialized value, and a non-reusable source wrapping a running Publisher obtained form the original Source.

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Jun 24, 2015

Use case example :

  • I'm writing a library that provides an Akka Stream Source.
  • I need to push elements dynamically into this source (via an actor)
  • but I don't know what clients will do with the source when they get it
    • maybe transform it with map, merge it with another source, split it to several sources...

So need to materialize the actor to be able to send elements into the source.

I could just do

val source = Source.actorRef[Item](Int.MaxValue, OverflowStrategy.fail)
val actorRef = Flow[Item].to(Sink.xxx).runWith(source)

But I don't want to consume the source elements directly by sending them into a Sink, as the source will be used later by my lib clients.

In this case, preMaterialize method can help.

loicdescotte commented Jun 24, 2015

Use case example :

  • I'm writing a library that provides an Akka Stream Source.
  • I need to push elements dynamically into this source (via an actor)
  • but I don't know what clients will do with the source when they get it
    • maybe transform it with map, merge it with another source, split it to several sources...

So need to materialize the actor to be able to send elements into the source.

I could just do

val source = Source.actorRef[Item](Int.MaxValue, OverflowStrategy.fail)
val actorRef = Flow[Item].to(Sink.xxx).runWith(source)

But I don't want to consume the source elements directly by sending them into a Sink, as the source will be used later by my lib clients.

In this case, preMaterialize method can help.

dotta added a commit to dotta/playframework that referenced this issue Aug 4, 2015

Use Akka Streams for WS streamed response body handling. Fix #4689
One thing I didn't manage to do in this commit is reimplementing
`NingWS.executeStream` using Akka Stream. The problem is that I can't find a
convenient solution for attaching a listener to the Ning AsyncHttpClient, so
that the received (response) bytes can be pushed into an Akka Stream `Source`.

For instance, I've tried to use `Source.actorRef` as follow:

val (ref, publisher) = Source.actorRef[NingAsyncHandler].toMat(Sink.publisher)(Keep.both).run() val
source = Source(publisher)

(note: `NingAsyncHandler` is a type extending `akka.actor.Actor`)

The above would return a Reactive Stream `Publisher` (i.e., `publisher`) and an
`ActorRef` that I hoped I could use to feed the bytes to the `publisher` when
the Ning `AsyncHandler.onBodyPartReceived` method is called on the listener.

Unfortunately, this doesn't quite work because the type of `publisher` is
`Publisher[NingAsyncHandler]`, and `source` is of type `Source[NingAsyncHandler,
Unit]`.  While, what we would want is to have a `source` of type
`Source[ByteArray,_]`.

Furthermore, when using `Source.actorRef` we need to materialize the stream,
which isn't ideal here. Interestingly, about this last point there is actually
a ticket akka/akka#17769 that would make it possible to avoid the stream
materialization. Though, even if this is fixed, I still don't see how to create
a properly typed source (see paragraph above).

I'm now wondering if it is possible to implement the desired behavior with Akka
Stream, or if we need to fallback to the Reactive Stream API (which, of course,
I'd very much like to avoid).

Cc-ing @ktoso and @2m from the akka team, as they might be interested in having
a look and provide feedback.

@drewhk drewhk modified the milestones: streams-2.0, streams-backlog Dec 8, 2015

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Feb 5, 2016

Is it still needed in Akka Streams 2? Do we have something like a channel to push elements dynamically into a source?

loicdescotte commented Feb 5, 2016

Is it still needed in Akka Streams 2? Do we have something like a channel to push elements dynamically into a source?

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Feb 5, 2016

Member

Source.queue?

Member

drewhk commented Feb 5, 2016

Source.queue?

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Feb 5, 2016

@drewhk Indeed it is exactly what I was looking for! Many thanks :)
I guess the issue can be closed?

loicdescotte commented Feb 5, 2016

@drewhk Indeed it is exactly what I was looking for! Many thanks :)
I guess the issue can be closed?

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Feb 6, 2016

Sorry you can ignore my previous comment, it might be useful for example if we need to get a reference to a source from a SourceQueue before it's materialized.
Just a use case example : Play Framework is now using Akka Streams, and it can automatically stream elements from a source to the browser using Status(httpCode).chunk(source) , for example Ok.chunked(source).
So Play defines its own Sink and it can be useful to be able to push elements into a source before it's materialized by Play.

loicdescotte commented Feb 6, 2016

Sorry you can ignore my previous comment, it might be useful for example if we need to get a reference to a source from a SourceQueue before it's materialized.
Just a use case example : Play Framework is now using Akka Streams, and it can automatically stream elements from a source to the browser using Status(httpCode).chunk(source) , for example Ok.chunked(source).
So Play defines its own Sink and it can be useful to be able to push elements into a source before it's materialized by Play.

@rkuhn rkuhn removed the 1 - triaged label Mar 9, 2016

@qpiny

This comment has been minimized.

Show comment
Hide comment
@qpiny

qpiny Apr 2, 2016

If I understand correctly, Status(httpCode) should not ask for a Source but should be a Sink.
Then you can get the actorRef by doing:

val actorRef = source.runWith(Status(httpCode))

qpiny commented Apr 2, 2016

If I understand correctly, Status(httpCode) should not ask for a Source but should be a Sink.
Then you can get the actorRef by doing:

val actorRef = source.runWith(Status(httpCode))
@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 4, 2016

Hi @qpiny ,
Sorry my message was certainly not very clear.
I need to materialize the queue to get a source, I'm not trying to get an actor.

With Play I need a source to use this method. But it is just a use case example.

loicdescotte commented Apr 4, 2016

Hi @qpiny ,
Sorry my message was certainly not very clear.
I need to materialize the queue to get a source, I'm not trying to get an actor.

With Play I need a source to use this method. But it is just a use case example.

@qpiny

This comment has been minimized.

Show comment
Hide comment
@qpiny

qpiny Apr 4, 2016

Hi,
I understand. I agree, A solution to get "prematerialized" value of a source would be useful, even if it breaks source reusability.
I also use Play with stream and I'm annoyed that materialized value is inaccessible.
For you specific use case, you can construct a queue with a poll method that returns a future of the next element and make a source using

Source.unfoldAsync(queue) { q ⇒ q.poll.map(e ⇒ (Some(q, e))) }

qpiny commented Apr 4, 2016

Hi,
I understand. I agree, A solution to get "prematerialized" value of a source would be useful, even if it breaks source reusability.
I also use Play with stream and I'm annoyed that materialized value is inaccessible.
For you specific use case, you can construct a queue with a poll method that returns a future of the next element and make a source using

Source.unfoldAsync(queue) { q ⇒ q.poll.map(e ⇒ (Some(q, e))) }
@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 4, 2016

@qpiny right now I'm solving this like this :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer("tick"))

    Ok.chunked(Source.fromPublisher(pub))
  }

loicdescotte commented Apr 4, 2016

@qpiny right now I'm solving this like this :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer("tick"))

    Ok.chunked(Source.fromPublisher(pub))
  }

@patriknw patriknw added the t:stream label Apr 4, 2016

@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Apr 4, 2016

Member

@drewhk is this still relevant?

Member

patriknw commented Apr 4, 2016

@drewhk is this still relevant?

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 5, 2016

Member

Relevant it is, but I am hesitant to put this into the stream library. This is something users should deal with IMHO and write their own helper if needed. There is too much opportunity of self-foot-blasting.

Member

drewhk commented Apr 5, 2016

Relevant it is, but I am hesitant to put this into the stream library. This is something users should deal with IMHO and write their own helper if needed. There is too much opportunity of self-foot-blasting.

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 5, 2016

@drewhk so your advice is to use Sink.asPublisher as a bridge in our util libs?
Thanks :)

loicdescotte commented Apr 5, 2016

@drewhk so your advice is to use Sink.asPublisher as a bridge in our util libs?
Thanks :)

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 5, 2016

Member

Hm, there is an alternative:

def peekMatValue(src: Source[T, M]): (Source[T, M], Future[M]) = {
   val p = Promise[M]
   (src.mapMaterializedValue { m => p.trySuccess(m) }, p.future )
}

The above does not do any prematerialization and gives back the materialized value in a Future which completes once the Source has been materialized.

Member

drewhk commented Apr 5, 2016

Hm, there is an alternative:

def peekMatValue(src: Source[T, M]): (Source[T, M], Future[M]) = {
   val p = Promise[M]
   (src.mapMaterializedValue { m => p.trySuccess(m) }, p.future )
}

The above does not do any prematerialization and gives back the materialized value in a Future which completes once the Source has been materialized.

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 5, 2016

@drewhk Thanks it's great I'll try to play with that!

loicdescotte commented Apr 5, 2016

@drewhk Thanks it's great I'll try to play with that!

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 6, 2016

@drewhk
I did not manage to slove my source queue issue with your tip :

//simple play action
def sourceQueueAction = Action.async {
      val queueSource = Source.queue[String](10, OverflowStrategy.fail)
      val futureQueue = peekMatValue(queueSource)

      futureQueue.map{ queue =>
        println("queue is ready")
        val tick = Source.tick(0 second, 1 second, "tick")
        tick.runForeach(t => queue.offer("tick"))
        //Ok.chunked takes a source to stream data on browser
        Ok.chunked(queueSource)
      }

}


def peekMatValue[T,M](src: Source[T, M]): Future[M] = {
     val p = Promise[M]
     src.mapMaterializedValue { m =>
       println("queue is materialized")
       p.trySuccess(m)
     }
     p.future
}

Both of the two println are never printing. It seems that when the queue is materialized, it's too late to send messages . Or maybe it's never materialized?
The previous example with the intermediate publisher was working (see #17769 (comment))

Please tell me if it's not the right place to discuss about that...
Thanks :)

loicdescotte commented Apr 6, 2016

@drewhk
I did not manage to slove my source queue issue with your tip :

//simple play action
def sourceQueueAction = Action.async {
      val queueSource = Source.queue[String](10, OverflowStrategy.fail)
      val futureQueue = peekMatValue(queueSource)

      futureQueue.map{ queue =>
        println("queue is ready")
        val tick = Source.tick(0 second, 1 second, "tick")
        tick.runForeach(t => queue.offer("tick"))
        //Ok.chunked takes a source to stream data on browser
        Ok.chunked(queueSource)
      }

}


def peekMatValue[T,M](src: Source[T, M]): Future[M] = {
     val p = Promise[M]
     src.mapMaterializedValue { m =>
       println("queue is materialized")
       p.trySuccess(m)
     }
     p.future
}

Both of the two println are never printing. It seems that when the queue is materialized, it's too late to send messages . Or maybe it's never materialized?
The previous example with the intermediate publisher was working (see #17769 (comment))

Please tell me if it's not the right place to discuss about that...
Thanks :)

@qpiny

This comment has been minimized.

Show comment
Hide comment
@qpiny

qpiny Apr 6, 2016

Hi,
It doesn't work because peekMatValue is call when the source is started (connect to sink) and you return the source when the pre-materialized value is complete.
The following code should work (not tested) :

def sourceQueueAction = Action {
      val queueSource = Source.queue[String](10, OverflowStrategy.fail)
      val futureQueue = peekMatValue(queueSource)

      futureQueue.map { queue =>
        println("queue is ready")
        val tick = Source.tick(0 second, 1 second, "tick")
        tick.runForeach(t => queue.offer("tick"))
      }
      Ok.chunked(queueSource)
}

qpiny commented Apr 6, 2016

Hi,
It doesn't work because peekMatValue is call when the source is started (connect to sink) and you return the source when the pre-materialized value is complete.
The following code should work (not tested) :

def sourceQueueAction = Action {
      val queueSource = Source.queue[String](10, OverflowStrategy.fail)
      val futureQueue = peekMatValue(queueSource)

      futureQueue.map { queue =>
        println("queue is ready")
        val tick = Source.tick(0 second, 1 second, "tick")
        tick.runForeach(t => queue.offer("tick"))
      }
      Ok.chunked(queueSource)
}
@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 6, 2016

@qpiny not working better :(
thanks anyway :)

loicdescotte commented Apr 6, 2016

@qpiny not working better :(
thanks anyway :)

@qpiny

This comment has been minimized.

Show comment
Hide comment
@qpiny

qpiny Apr 6, 2016

Right !
Sources are immutable. You must used source returns by mapMaterializedValue in OK.chunked.
The working code (tested) is :

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .map(println)
        .runForeach { _ => queue.offer("tick"); () }
    }
    Ok.chunked(queueSource)

  }

  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      println("queue is materialized")
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }

qpiny commented Apr 6, 2016

Right !
Sources are immutable. You must used source returns by mapMaterializedValue in OK.chunked.
The working code (tested) is :

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .map(println)
        .runForeach { _ => queue.offer("tick"); () }
    }
    Ok.chunked(queueSource)

  }

  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      println("queue is materialized")
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }
@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 7, 2016

@qpiny @drewhk awesome it's working !!
Many thanks :) :)

In my opinion the issue can be closed...

loicdescotte commented Apr 7, 2016

@qpiny @drewhk awesome it's working !!
Many thanks :) :)

In my opinion the issue can be closed...

@loicdescotte

This comment has been minimized.

Show comment
Hide comment
@loicdescotte

loicdescotte Apr 7, 2016

@qpiny @drewhk
Just to understand the difference between the 2 versions, if I'm not using the source from the mapMaterializedValue but the original source does it mean that my sourceQueue is not linked to the queue?
I don't really understand the reason behind this....

Thanks :)

loicdescotte commented Apr 7, 2016

@qpiny @drewhk
Just to understand the difference between the 2 versions, if I'm not using the source from the mapMaterializedValue but the original source does it mean that my sourceQueue is not linked to the queue?
I don't really understand the reason behind this....

Thanks :)

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 7, 2016

Member

The Source is an immutable description of the thing that will be executed, it is not running yet (apart from a few exceptions, like when you prematerialize somthing and re-wrap in a Source). All modifications to this description results in a new description which is a new Source instance - i.e. the original Source is not modified.

In fact, you can usually (and should) design your code exposing these reusable compontents/descrptions like Sources Sinks, Flows, etc. They can be cached, reused arbitrary many times and composed in any ways possible safely. Think of them as composable, immutable factories

Member

drewhk commented Apr 7, 2016

The Source is an immutable description of the thing that will be executed, it is not running yet (apart from a few exceptions, like when you prematerialize somthing and re-wrap in a Source). All modifications to this description results in a new description which is a new Source instance - i.e. the original Source is not modified.

In fact, you can usually (and should) design your code exposing these reusable compontents/descrptions like Sources Sinks, Flows, etc. They can be cached, reused arbitrary many times and composed in any ways possible safely. Think of them as composable, immutable factories

@drewhk drewhk closed this Apr 7, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment