diff --git a/lib/elixir/lib/stream.ex b/lib/elixir/lib/stream.ex index 61105ab644b..72561738d4b 100644 --- a/lib/elixir/lib/stream.ex +++ b/lib/elixir/lib/stream.ex @@ -160,18 +160,13 @@ defmodule Stream do def chunk(enum, n, step, leftover \\ nil) when is_integer(n) and n > 0 and is_integer(step) and step > 0 do limit = :erlang.max(n, step) - if is_nil(leftover) do - lazy enum, {[], 0}, fn(f1) -> R.chunk(n, step, limit, f1) end - else - lazy enum, {[], 0}, - fn(f1) -> R.chunk(n, step, limit, f1) end, - &do_chunk(&1, n, leftover, &2) - end + lazy enum, {[], 0}, + fn(f1) -> R.chunk(n, step, limit, f1) end, + &do_chunk(&1, n, leftover, &2) end - defp do_chunk(acc(_, {_, 0}, _) = acc, _, _, _) do - {:cont, acc} - end + defp do_chunk(acc, _, nil, _), do: skip(acc) + defp do_chunk(acc(_, {_, 0}, _) = acc, _, _, _), do: skip(acc) defp do_chunk(acc(h, {buffer, count} = old, t), n, leftover, f1) do buffer = :lists.reverse(buffer, Enum.take(leftover, n - count)) @@ -197,9 +192,7 @@ defmodule Stream do &do_chunk_by(&1, &2) end - defp do_chunk_by(acc(_, nil, _) = acc, _f1) do - {:cont, acc} - end + defp do_chunk_by(acc(_, nil, _) = acc, _f1), do: skip(acc) defp do_chunk_by(acc(h, {buffer, _}, t), f1) do next_with_acc(f1, :lists.reverse(buffer), h, nil, t) @@ -749,6 +742,30 @@ defmodule Stream do allows an after function to be given which is invoked when the stream halts or completes. + After function has an option to return a function of arity 1, in which + case this function will be called back with a current state of an + accumulator given. The value returned by this function will be assigned + to the whole result of `transform/4`. Any other returned value + will be safely ignored, and the accumulated value will be returned as is. + + ## Examples + + iex> [1, 2, 3, 4] + ...> |> Stream.transform( + ...> fn -> [] end, + ...> fn i, acc -> + ...> case i do + ...> i when rem(i, 2) == 0 -> {[acc], [i]} + ...> _ -> {[], acc ++ [i]} + ...> end + ...> end, + ...> fn rest -> # call me back + ...> fn acc -> [rest | acc] end # append the tail + ...> end) + ...> |> Enum.to_list + [[1], [2, 3], [4]] + + This function can be seen as a combination of `Stream.resource/3` with `Stream.transform/3`. """ @@ -769,8 +786,7 @@ defmodule Stream do defp do_transform(user_acc, _user, _fun, _next_op, next, {:halt, inner_acc}, _inner, after_fun) do next.({:halt, []}) - do_after(after_fun, user_acc) - {:halted, inner_acc} + {:halted, do_after(after_fun, {inner_acc, user_acc})} end defp do_transform(user_acc, user, fun, next_op, next, {:suspend, inner_acc}, inner, after_fun) do @@ -778,8 +794,7 @@ defmodule Stream do end defp do_transform(user_acc, _user, _fun, :halt, _next, {_, inner_acc}, _inner, after_fun) do - do_after(after_fun, user_acc) - {:halted, inner_acc} + {:halted, do_after(after_fun, {inner_acc, user_acc})} end defp do_transform(user_acc, user, fun, :cont, next, inner_acc, inner, after_fun) do @@ -788,7 +803,7 @@ defmodule Stream do catch kind, reason -> stacktrace = System.stacktrace - do_after(after_fun, user_acc) + do_after(after_fun, {nil, user_acc}) :erlang.raise(kind, reason, stacktrace) else {:suspended, [val], next} -> @@ -806,7 +821,7 @@ defmodule Stream do kind, reason -> stacktrace = System.stacktrace next.({:halt, []}) - do_after(after_fun, user_acc) + do_after(after_fun, {nil, user_acc}) :erlang.raise(kind, reason, stacktrace) else {[], user_acc} -> @@ -816,8 +831,7 @@ defmodule Stream do &Enumerable.List.reduce(list, &1, fun), after_fun) {:halt, user_acc} -> next.({:halt, []}) - do_after(after_fun, user_acc) - {:halted, elem(inner_acc, 1)} + {:halted, do_after(after_fun, {elem(inner_acc, 1), user_acc})} {other, user_acc} -> do_enum_transform(user_acc, user, fun, next_op, next, inner_acc, inner, &Enumerable.reduce(other, &1, inner), after_fun) @@ -830,15 +844,14 @@ defmodule Stream do kind, reason -> stacktrace = System.stacktrace next.({:halt, []}) - do_after(after_fun, user_acc) + do_after(after_fun, {nil, user_acc}) :erlang.raise(kind, reason, stacktrace) else {:done, acc} -> do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:halted, acc} -> next.({:halt, []}) - do_after(after_fun, user_acc) - {:halted, acc} + {:halted, do_after(after_fun, {acc, user_acc})} {:suspended, acc, c} -> {:suspended, acc, &do_list_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)} end @@ -851,7 +864,7 @@ defmodule Stream do kind, reason -> stacktrace = System.stacktrace next.({:halt, []}) - do_after(after_fun, user_acc) + do_after(after_fun, {nil, user_acc}) :erlang.raise(kind, reason, stacktrace) else # Only take into account outer halts when the op is not halt itself. @@ -860,8 +873,7 @@ defmodule Stream do do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:halted, [_ | acc]} -> next.({:halt, []}) - do_after(after_fun, user_acc) - {:halted, acc} + {:halted, do_after(after_fun, {acc, user_acc})} {:done, [_ | acc]} -> do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun) {:suspended, [_ | acc], c} -> @@ -869,8 +881,13 @@ defmodule Stream do end end - defp do_after(nil, _user_acc), do: :ok - defp do_after(fun, user_acc), do: fun.(user_acc) + defp do_after(nil, {acc, _user_acc}), do: acc + defp do_after(fun, {acc, user_acc}) do + case fun.(user_acc) do + finalize when is_function(finalize, 1) -> finalize.(acc) + _ -> acc + end + end defp do_transform_each(x, [:outer | acc], f) do case f.(x, acc) do diff --git a/lib/elixir/test/elixir/stream_test.exs b/lib/elixir/test/elixir/stream_test.exs index 619efe24fb7..8da73e28201 100644 --- a/lib/elixir/test/elixir/stream_test.exs +++ b/lib/elixir/test/elixir/stream_test.exs @@ -654,6 +654,28 @@ defmodule StreamTest do assert Process.get(:stream_transform) end + test "transform/4 with a tail" do + assert [1, 2, 3, 4] |> Stream.transform(fn -> [] end, + fn i, acc -> + case i do + i when rem(i, 2) == 0 -> {[acc], [i]} + _ -> {[], acc ++ [i]} + end + end, + fn rest -> fn acc -> [rest | acc] end end) |> Enum.to_list == [[1], [2, 3], [4]] + end + + test "transform/4 with a completely changed accumulator" do + assert [1, 2, 3, 4] |> Stream.transform(fn -> [] end, + fn i, acc -> + case i do + i when rem(i, 2) == 0 -> {[acc], [i]} + _ -> {[], acc ++ [i]} + end + end, + fn rest -> fn acc -> rest end end) |> Enum.to_list == [4] + end + test "transform/4 with early halt" do stream = Stream.repeatedly(fn -> throw(:error) end) |> Stream.transform(fn -> nil end, &{[&1, &2], &1},