Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offer Flow from Source => Source function #26592

Open
derolf opened this issue Mar 21, 2019 · 6 comments
Open

Offer Flow from Source => Source function #26592

derolf opened this issue Mar 21, 2019 · 6 comments
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream

Comments

@derolf
Copy link

derolf commented Mar 21, 2019

Assume we have a function of type

func[A, B](in: Source[A]) : Source[B]

These are for example being generated by the akka-grpc framework.

We would like to have a method to construct a Flow from func:

Flow.fromSource2Source[A, B](func: Source[A] => Source[B]) : Flow[A, B]
@derolf derolf changed the title Offer Flow from () => Source => Source Offer Flow from Source => Source function Mar 21, 2019
@2m
Copy link
Member

2m commented Mar 29, 2019

You can use flatMapConcat with such function to achieve that:

val source = Source(0 to 5)
val duplicator: Int => Source[Int, NotUsed] = el => Source(List(el, el))
source.flatMapConcat(duplicator).runForeach(println)

https://scalafiddle.io/sf/v5ntXyA/0

@derolf
Copy link
Author

derolf commented Mar 29, 2019

I already found a solution:

def SourceProcessor[In, Out](f : Source[In, NotUsed] => Source[Out, NotUsed]): Flow[In, Out, NotUsed] =
  Flow[In].prefixAndTail(0).flatMapConcat { case (Nil, in) => f(in) }

It uses prefixAndTail to highjack the underyling Source.

@2m
Copy link
Member

2m commented Mar 29, 2019

Ahh, I misunderstood. Good that you found a solution.

@derolf
Copy link
Author

derolf commented Mar 29, 2019

@2m Anyway, I appreciate your help! It took me half a day to figure out that “hack”...

@raboof
Copy link
Member

raboof commented Mar 29, 2019

Good find, that is definitely not trivial! Perhaps we should have a convenience method for this somewhere, or at least mention it as a pattern somewhere in the docs?

@derolf
Copy link
Author

derolf commented Mar 29, 2019

@raboof I basically started to write my own GraphStage until I found out that I am actually cloning prefixAndTail...

Yes, could be just added as FlowOps.via(func: (Source) => Source).

@johanandren johanandren added 0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream labels Apr 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0 - new Ticket is unclear on it's purpose or if it is valid or not t:stream
Projects
None yet
Development

No branches or pull requests

4 participants