Skip to content

Commit

Permalink
Allow consuming multiple items from suspended enumerable in Stream.tr…
Browse files Browse the repository at this point in the history
…ansform/3

Closes #5763.
Closes #5772.

Signed-off-by: José Valim <jose.valim@plataformatec.com.br>
  • Loading branch information
José Valim committed Feb 16, 2017
1 parent e002ac5 commit 2d4722a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
36 changes: 19 additions & 17 deletions lib/elixir/lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -789,16 +789,18 @@ defmodule Stream do
do_after(after_fun, user_acc)
:erlang.raise(kind, reason, stacktrace)
else
{:suspended, [val], next} ->
do_transform_user(val, user_acc, user, fun, :cont, next, inner_acc, inner, after_fun)
{_, [val]} ->
do_transform_user(val, user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
{_, []} ->
do_transform(user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
{:suspended, vals, next} ->
do_transform_user(:lists.reverse(vals), user_acc, user, fun, :cont, next, inner_acc, inner, after_fun)
{_, vals} ->
do_transform_user(:lists.reverse(vals), user_acc, user, fun, :halt, next, inner_acc, inner, after_fun)
end
end

defp do_transform_user(val, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
defp do_transform_user([], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
end

defp do_transform_user([val | vals], user_acc, user, fun, next_op, next, inner_acc, inner, after_fun) do
user.(val, user_acc)
catch
kind, reason ->
Expand All @@ -808,20 +810,20 @@ defmodule Stream do
:erlang.raise(kind, reason, stacktrace)
else
{[], user_acc} ->
do_transform(user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
do_transform_user(vals, user_acc, user, fun, next_op, next, inner_acc, inner, after_fun)
{list, user_acc} when is_list(list) ->
do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner,
do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner,
&Enumerable.List.reduce(list, &1, fun), after_fun)
{:halt, user_acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, elem(inner_acc, 1)}
{other, user_acc} ->
do_enum_transform(user_acc, user, fun, next_op, next, inner_acc, inner,
do_enum_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner,
&Enumerable.reduce(other, &1, inner), after_fun)
end

defp do_list_transform(user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do
defp do_list_transform(vals, user_acc, user, fun, next_op, next, inner_acc, inner, reduce, after_fun) do
try do
reduce.(inner_acc)
catch
Expand All @@ -832,17 +834,17 @@ defmodule Stream do
:erlang.raise(kind, reason, stacktrace)
else
{:done, acc} ->
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:suspended, acc, c} ->
{:suspended, acc, &do_list_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
{:suspended, acc, &do_list_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
end

defp do_enum_transform(user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do
defp do_enum_transform(vals, user_acc, user, fun, next_op, next, {op, inner_acc}, inner, reduce, after_fun) do
try do
reduce.({op, [:outer | inner_acc]})
catch
Expand All @@ -855,15 +857,15 @@ defmodule Stream do
# Only take into account outer halts when the op is not halt itself.
# Otherwise, we were the ones wishing to halt, so we should just stop.
{:halted, [:outer | acc]} when op != :halt ->
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, [_ | acc]} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:done, [_ | acc]} ->
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
do_transform_user(vals, user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:suspended, [_ | acc], c} ->
{:suspended, acc, &do_enum_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
{:suspended, acc, &do_enum_transform(vals, user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
end

Expand Down
8 changes: 8 additions & 0 deletions lib/elixir/test/elixir/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ defmodule StreamTest do
assert Process.get(:stream_transform)
end

test "transform/3 (via flat_map) handles multiple returns from suspension" do
assert [false]
|> Stream.take(1)
|> Stream.concat([true])
|> Stream.flat_map(&[&1])
|> Enum.to_list() == [false, true]
end

test "iterate/2" do
stream = Stream.iterate(0, &(&1+2))
assert Enum.take(stream, 5) == [0, 2, 4, 6, 8]
Expand Down

0 comments on commit 2d4722a

Please sign in to comment.