Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Stream.split(collection, count) ⇒ {splitted_values, rest_of_stream} #2922

Closed
sunaku opened this issue Dec 5, 2014 · 3 comments
Closed

Comments

@sunaku
Copy link
Contributor

sunaku commented Dec 5, 2014

Hello,

Please add support for a Stream.split(collection, count) function that returns {splitted_values, rest_of_stream} so that I can continue consuming a stream from where I left off, instead of having to start all over again from the beginning each time I want to consume more of the stream. For example:

{[1, 2], rest_of_stream} = Stream.split(1..5, 2)
[3, 4, 5] = Enum.to_list(rest_of_stream)

In particular, this would allow me to consume a file stream in piecemeal fashion, rather than having to swoop through the entire file in one shot (which is what the Stream module API currently supports):

$ iex
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [smp:2:2] [async-threads:10] [kernel-poll:false]

Interactive Elixir (1.0.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> stream = File.stream!("orders.csv") |> Stream.map(&String.rstrip/1)
#Stream<[enum: %File.Stream{line_or_bytes: :line,
  modes: [:raw, :read_ahead, :binary], path: "orders.csv", raw: true},
 funs: [#Function<45.29647706/1 in Stream.map/2>]]>
iex(2)> stream |> Enum.each(&IO.inspect/1)
"id,ship_to,net_amount"
"123,:NC,100.00"
"124,:OK,35.50"
"125,:TX,24.00"
"126,:TX,44.80"
"127,:NC,25.00"
"128,:MA,10.00"
"129,:CA,102.00"
"120,:NC,50.00"
:ok
iex(3)> header = stream |> Enum.take(1)
["id,ship_to,net_amount"]
iex(4)> body = stream |> Stream.map(&( String.split(&1, ",") ))
#Stream<[enum: %File.Stream{line_or_bytes: :line,
  modes: [:raw, :read_ahead, :binary], path: "orders.csv", raw: true},
 funs: [#Function<45.29647706/1 in Stream.map/2>,
  #Function<45.29647706/1 in Stream.map/2>]]>
iex(5)> body |> Enum.each(&IO.inspect/1)
["id", "ship_to", "net_amount"]   # <== here is the problem! the stream got reset
["123", ":NC", "100.00"]
["124", ":OK", "35.50"]
["125", ":TX", "24.00"]
["126", ":TX", "44.80"]
["127", ":NC", "25.00"]
["128", ":MA", "10.00"]
["129", ":CA", "102.00"]
["120", ":NC", "50.00"]
:ok

See also issue #2515 for a related discussion about a more general form of this feature.

Thanks for your consideration.

@sunaku sunaku changed the title add Stream.split(collection, count) #=> {splitted_values, rest_of_stream} Stream.split(collection, count) #=> {splitted_values, rest_of_stream} Dec 5, 2014
@sunaku sunaku changed the title Stream.split(collection, count) #=> {splitted_values, rest_of_stream} Stream.split(collection, count) &rarr;<br> {splitted_values, rest_of_stream} Dec 5, 2014
@sunaku sunaku changed the title Stream.split(collection, count) &rarr;<br> {splitted_values, rest_of_stream} Stream.split(collection, count) #=> {splitted_values, rest_of_stream} Dec 5, 2014
@sunaku sunaku changed the title Stream.split(collection, count) #=> {splitted_values, rest_of_stream} add Stream.split(collection, count) #=> {splitted_values, rest_of_stream} Dec 5, 2014
@sunaku sunaku changed the title add Stream.split(collection, count) #=> {splitted_values, rest_of_stream} add Stream.split(collection, count) ⇒ {splitted_values, rest_of_stream} Dec 5, 2014
@josevalim
Copy link
Member

That cannot work as you expect. A stream does not traverse a collection yet, so what happens if someone starts to traverse the second element of the split before the first one? For this reason, there is no way to implement Stream.split/2 without using a process for state and none of the stream functions require a separate process today. As mentioned on IRC, your issue will be tackled by the other issue where you commented. For now, Stream.transform/2 is definitely the best and most performatic way to go.

@sunaku
Copy link
Contributor Author

sunaku commented Dec 5, 2014

I don't know how complex Elixir's stream implementation is, but based on what I learned about streams, there isn't any need for state in order to implement this feature: all you need are thunks (see slides 11-14 in this lecture).

As long as the source of a stream is not mutable and not a resource, I think that sharing a thunk among multiple processes should be OK because they would independently be accessing the same stream from (possibly) different positions using their own independent "cursors" (call stacks, essentially). 😅

@josevalim
Copy link
Member

And the only way to implement thunks in elixir without requiring native
extensions is with a process (or ets). Plus all of the Stream API is meant
to work with resources too.

José Valimwww.plataformatec.com.br
http://www.plataformatec.com.br/Founder and Lead Developer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants