Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for suspendable Streams #2515

Closed
ewildgoose opened this issue Jul 11, 2014 · 11 comments
Closed

Add support for suspendable Streams #2515

ewildgoose opened this issue Jul 11, 2014 · 11 comments

Comments

@ewildgoose
Copy link

Streams offer an interesting way to bottle up a closure, effectively hiding state and then consume the output later. However, this is limited to consuming the entire stream at a later point, in some cases it might be useful to be able to stop and resume in sections.

The feature request is first class support in the language for Stream suspension, however, I don't have a concrete suggestion on how that might look? My requirements are fairly "toy" at present, but my use might mainly be to pull items one by one from a stream, eg

s = Stream.cycle([1,2,3])
{first, s} = "take_one" s

do something with first

{next, s} = "take_one" s

Perhaps an "iterator" type which could wrap a stream? The iterator could support Enumerable, and then the interface feels approximately like using Enum methods, but you also get back a continuation (the iterator) which allows you to continue if you wish?

@jcspencer
Copy link

Stream.pause(stream) and Stream.resume(stream) would be good for stopping and resuming streams, and then maybe a method such as {:ok, item} = Stream.next(stream) and a {:ok, item} = Stream.prev(stream) to match would be good for taking one element from a Stream.

Just my thoughts on how how this could be tackled.

@ewildgoose
Copy link
Author

I'm not sure I can see how the .pause and .resume would work in practice?

Consider:
items = Enum.take(stream,10)

I want the stream to pause after taking those 10 items, so somehow I need to obtain a continuation on the state of the stream having pulled those 10 items? ie I need the 10 items and a continuation on the current state?

@josevalim
Copy link
Member

@ewildgoose exactly, it would always need to return the continuation. I haven't thought properly about this issue yet as it will only be tackled after 1.0 but one idea that came to mind is to have a Enum.next(enum_or_continuation, 100) that always returns {entries, continuation}. We would at least need a Enum.halt(continuation) for closing a continuation too.

@ewildgoose
Copy link
Author

This is a half formed idea, but what about an iterator wrapper? This can then act as a modifier for how Enum works, but by using the native Enum implementation under the bonnet. So:

i = Iterator.new(some_stream)
{values, i} = Iterator.take(i, 10)
{more_values, i} = Iterator.take_while(i, some_condition)
Iterator.halt(i)

So "Iterator" is an implementation for Enum

@josevalim
Copy link
Member

I would like to avoid a new module but it is a possibility, definitely. It depends on how large the surface API will be.

@ewildgoose
Copy link
Author

Remember I'm very fresh to Elixir. Do you have a suggestion on a different API?

I think Streams have the potential to offer very different ways to build Elixir applications (vs lists). It feels closer to pure FP styles (eg Haskell), also it has some future possibilities for interesting multi-processing ideas, and to my inexperienced eye, the laziness feels like it enables some more general solutions without paying processing penalty up front

I think some of the more experienced folks should offer their options... Perhaps also canvassing for use cases would help? Remembering I'm probably not structuring my apps optimally, how about:

  • Parsing headers (email, http, etc). Parse enough to satisfy current requirements, parse the rest if we need them later?
  • Parsing MIME structures (erk), or perhaps XML/JSON. Want to defer potentially expensive operations until required.
  • My current use is solving Euler problems. I currently desire to stuff a queue with Stream.cycle(...) items, then every time my item gets to the front of the queue I want the next item out of the cycle and then it goes back in the queue.

I think it's worth trying to get this API right and not rushing it, (but equally please don't forget about it...)

@ewildgoose
Copy link
Author

Perhaps I'm on the wrong track

a) For Enum our basic function is reduce, which arguably passes back it's continuation as an end result
b) For Stream our basic function is transform, but this does NOT pass back it's accumulator at the end?

Perhaps the API for Stream is wrong? Perhaps Stream.transform should work the same as Enum.reduce? Alternatively perhaps Stream.transform should be built from some other function which returns the continuation AND has a public API? (I think at present the implementation building blocks of transform are all private?)

I think perhaps this is a better route?

@hamiltop
Copy link
Contributor

Here's my approach to solving the problem. It's a little side effect heavy, but that's just because of the CPS approach I took.

https://github.com/hamiltop/streamz/blob/master/lib/streamz.ex#L90

The alternative to accepting a function is to return a tuple of {result, continuation} which sort of breaks the liberal use of the pipe operator.

@holsee
Copy link

holsee commented Sep 24, 2014

TLDR; Why can't Stream be a high level concept allowing any push based data conform to the same contract be its a file stream, a tcp stream, a generator or a enum etc..?

One of the nicest aproaches to this problem I have seen is the notion of subscriptions and putting the decision to produce in the hands of the producer and the consumption as a concept which is detached from the processing of the stream. When you don't wish to consume anymore, simply unsubscribe. This can be intelligent based on a contract defined. If you stream |> take(5) |> consume the take could handle the unsubscription as a rough example.

This led me to read some of the code in stream.ex to see how it is done, and my first impression was this is built on Enumerable and an Iterator so this could be tricky to achieve what I suggested with things as they are today.

Some thoughts around taking streams to the next level...

I love how elegantly the continuation is achieved, but to me this is still back to front in that (and I could be wrong) the "Successive values are generated by calling next_fun on the previous value." which tells me we can pause by not taking the next item right? This reminds me of code I wrote years ago which turned Enumerable collections into pushed based collections, and I always felt I was missing a step then these guys came along in 2009 and blew my mind with this video.

Should/could stream become push?

Now for a stream to be push, the producer would have to be calling the consumer and operators would be functions which are applied via a map function over the source stream (or 'flat_map' over multiple streams) and the decision to produce would stay firmly in the hands of the producer. No blocking waiting on next_fun to return the next item.

So operators would have a signature of taking the source 'stream' and a function/1 not working with a iterator as it we know it anymore. Rather having subscribers which provide functions for on_next, on_error, on_completed or something along those lines.

Providing the flexibility to treat collections of data as time dependent "streams" which can be composed and transformed with set operators is an wonderful concept. Stream comes close as it stands today, but this thread highlights one of the short comings.

Streams would need to defer any execution until something is listening/subscribed, potentially we would need a contract defined potentially out of band between the final consumer and the producer. Such a mechanism would also enable suspension, and even back pressure to be communicated down to the producer(s) which is very cool and I'm sure we could find a nice idiomatic way to achieve this.

I guess the questions remain...
Is Stream the place to house this? It is nice and lightweight as it is and does its job well.

And the fact remains I might be way off the point (feel free to call me on this I don't claim to be an authority in this area).

Do we want it to grow to become something like an 'Observable' in Rx, or provide a standard contract to allow "user land" implementations to slot in with a tight core of set operators provided as they are today on Stream?

Would a port of Rx to elixir be something to think about?

The correct level of abstraction?

Stepping back from this a bit... at what level of abstraction would such problems be best solved on this platform? Each time I think this low level in Elixir or Erlang I get told to think at a higher level.. so here is my attempt.

Should we be thinking in terms of manipulating primitives such as streams (as they are today) in this way (i.e. adding suspension or making it a push stream) or should we be thinking about separate processes and data streaming between them with the ability to communicate out of band through message passing?

The idea of suspension and back pressure seem closely related to me.

Excuse my potential naivety, I'm certainly no expert in Elixir or Erlang. I would be interested on knowing if there is any precedent for push based streams?

@hamiltop
Copy link
Contributor

@holsee Here's some good background on what we've done along those lines:

https://groups.google.com/forum/#!msg/elixir-lang-talk/E_m_gwtoON4/Tue8FFE1Tm8J

To expand a bit, a push based model is how we do GenEvent.stream. Consider the following code:

{:ok, manager} = GenEvent.start_link
stream = GenEvent.stream(manager)

Stream.map(stream, &(1 + &1)) |> Stream.take(100) |> Enum.map fn (el) ->
  IO.puts el
end

The manager sends data to the current process (calling Enum.map creates a subscription) and blocks until an ack is received. Once 100 events have been received, it unsubscribes. A suspendable stream could be built on top of a GenEvent fairly easily by manually subscribing and processing events as needed.

@josevalim
Copy link
Member

Closing this for now. I haven't seen further use cases that require suspendable streams. Meanwhile, we are working on GenRouter with will move streaming to processes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

5 participants