New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Flow.lazyInit mirroring Sink.lazyInit #24427

Closed
rkuhn opened this Issue Jan 26, 2018 · 7 comments

Comments

Projects
None yet
6 participants
@rkuhn
Collaborator

rkuhn commented Jan 26, 2018

Sink.lazyInit allows the construction of a Sink in response to the beginning of the stream. Flows can be constructed using the method as well by closing them for example with a Sink.actorRef and wiring that up in .mapMaterializedValue with a fresh Source, using Sink.lazyInit for the first part. It would be nice to take this ceremony and associated runtime overhead from the users into the library, with obvious optimization opportunities.

An open question might be whether it is possible to switch out flows while the stream is running, similar to RxJava’s switchMap operator, but this may be challenging to the point of uselessness due to the multi-threaded Akka Streams runtime environment.

@rkuhn rkuhn added the t:stream label Jan 26, 2018

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 26, 2018

Member

Sounds useful.

The switch one is absolutely possible to do, in fact the interpretater enables this pretty easily.. let’s keep this ticket about the init one though. :)

Member

ktoso commented Jan 26, 2018

Sounds useful.

The switch one is absolutely possible to do, in fact the interpretater enables this pretty easily.. let’s keep this ticket about the init one though. :)

@gosubpl

This comment has been minimized.

Show comment
Hide comment
@gosubpl

gosubpl Jan 26, 2018

Contributor

On it.

Contributor

gosubpl commented Jan 26, 2018

On it.

@ktoso

This comment has been minimized.

Show comment
Hide comment
@ktoso

ktoso Jan 29, 2018

Member

Thanks guys :)

Member

ktoso commented Jan 29, 2018

Thanks guys :)

@ktoso ktoso modified the milestones: 2.4.21, 2.5.10 Jan 29, 2018

@hepin1989

This comment has been minimized.

Show comment
Hide comment
@hepin1989

hepin1989 Jan 30, 2018

Contributor

FYI, the rxjava's switchIfEmpty is useful too.

Contributor

hepin1989 commented Jan 30, 2018

FYI, the rxjava's switchIfEmpty is useful too.

@gosubpl

This comment has been minimized.

Show comment
Hide comment
@gosubpl

gosubpl Feb 9, 2018

Contributor

What would be the preferred API? Do we need the Future Factory

def lazyInit[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, Future[M]]

or is the standard immediate factory enough?

def lazyInit[I, O, M](flowFactory: I ⇒ Flow[I, O, M], fallback: () ⇒ M): Flow[I, O, M]

The implementation might do the one with futures, whereas there would be an adapter with interface for immediate one just wrapping it in Future.succesful().

The strong point for having the immediate API anyway is that in most usages the Flow has signature Flow[I, O, NotUsed], so there is no place for future there.

I don't know though, if the 'future' version has any potential advantages over the 'immediate' one, as the futureFactory is delayed initialisation, anyway.

I also think that we can get close to the switchMap functionality by using splitWhen and concatSubstreams:

      val probe = Source(List(1, 1, 2, 1, 1, 4, 1, 1))
        .splitWhen(_ % 2 == 0)
        .via(new LazyFlow[Int, Int, NotUsed](e => Flow.fromFunction[Int, Int](i => i * e), () => NotUsed))
        .concatSubstreams
        .runWith(TestSink.probe[Int](system))

      probe.request(8).expectNext(1, 1, 4, 2, 2, 16, 4, 4)
Contributor

gosubpl commented Feb 9, 2018

What would be the preferred API? Do we need the Future Factory

def lazyInit[I, O, M](flowFactory: I ⇒ Future[Flow[I, O, M]], fallback: () ⇒ M): Flow[I, O, Future[M]]

or is the standard immediate factory enough?

def lazyInit[I, O, M](flowFactory: I ⇒ Flow[I, O, M], fallback: () ⇒ M): Flow[I, O, M]

The implementation might do the one with futures, whereas there would be an adapter with interface for immediate one just wrapping it in Future.succesful().

The strong point for having the immediate API anyway is that in most usages the Flow has signature Flow[I, O, NotUsed], so there is no place for future there.

I don't know though, if the 'future' version has any potential advantages over the 'immediate' one, as the futureFactory is delayed initialisation, anyway.

I also think that we can get close to the switchMap functionality by using splitWhen and concatSubstreams:

      val probe = Source(List(1, 1, 2, 1, 1, 4, 1, 1))
        .splitWhen(_ % 2 == 0)
        .via(new LazyFlow[Int, Int, NotUsed](e => Flow.fromFunction[Int, Int](i => i * e), () => NotUsed))
        .concatSubstreams
        .runWith(TestSink.probe[Int](system))

      probe.request(8).expectNext(1, 1, 4, 2, 2, 16, 4, 4)
@patriknw

This comment has been minimized.

Show comment
Hide comment
@patriknw

patriknw Feb 10, 2018

Member

@gosubpl I think it should be Future[Flow] to be consistent with Sink.lazyInit and it's more flexible without adding much more internal complexity. I think Future.succesful is simple enough for end users if they don't need the future.

Incidentally I was also looking for this feature the other day, for using in Artery. In other words I'm very interested in getting this done and looking forward to the PR.

Member

patriknw commented Feb 10, 2018

@gosubpl I think it should be Future[Flow] to be consistent with Sink.lazyInit and it's more flexible without adding much more internal complexity. I think Future.succesful is simple enough for end users if they don't need the future.

Incidentally I was also looking for this feature the other day, for using in Artery. In other words I'm very interested in getting this done and looking forward to the PR.

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 11, 2018

@gosubpl

This comment has been minimized.

Show comment
Hide comment
@gosubpl

gosubpl Feb 11, 2018

Contributor

Almost there. Expect PR real soon.

Contributor

gosubpl commented Feb 11, 2018

Almost there. Expect PR real soon.

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 11, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 20, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 20, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 20, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 21, 2018

patriknw added a commit that referenced this issue Feb 21, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 22, 2018

gosubpl added a commit to gosubpl/akka that referenced this issue Feb 22, 2018

@patriknw patriknw closed this Feb 22, 2018

@patriknw patriknw added this to the 2.5.10 milestone Feb 22, 2018

@ktoso ktoso removed the 3 - in progress label Feb 23, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment