Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 46 additions & 29 deletions lib/elixir/lib/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,13 @@ defmodule Stream do
def chunk(enum, n, step, leftover \\ nil)
when is_integer(n) and n > 0 and is_integer(step) and step > 0 do
limit = :erlang.max(n, step)
if is_nil(leftover) do
lazy enum, {[], 0}, fn(f1) -> R.chunk(n, step, limit, f1) end
else
lazy enum, {[], 0},
fn(f1) -> R.chunk(n, step, limit, f1) end,
&do_chunk(&1, n, leftover, &2)
end
lazy enum, {[], 0},
fn(f1) -> R.chunk(n, step, limit, f1) end,
&do_chunk(&1, n, leftover, &2)
end

defp do_chunk(acc(_, {_, 0}, _) = acc, _, _, _) do
{:cont, acc}
end
defp do_chunk(acc, _, nil, _), do: skip(acc)
defp do_chunk(acc(_, {_, 0}, _) = acc, _, _, _), do: skip(acc)

defp do_chunk(acc(h, {buffer, count} = old, t), n, leftover, f1) do
buffer = :lists.reverse(buffer, Enum.take(leftover, n - count))
Expand All @@ -197,9 +192,7 @@ defmodule Stream do
&do_chunk_by(&1, &2)
end

defp do_chunk_by(acc(_, nil, _) = acc, _f1) do
{:cont, acc}
end
defp do_chunk_by(acc(_, nil, _) = acc, _f1), do: skip(acc)

defp do_chunk_by(acc(h, {buffer, _}, t), f1) do
next_with_acc(f1, :lists.reverse(buffer), h, nil, t)
Expand Down Expand Up @@ -749,6 +742,30 @@ defmodule Stream do
allows an after function to be given which is invoked when the stream
halts or completes.

After function has an option to return a function of arity 1, in which
case this function will be called back with a current state of an
accumulator given. The value returned by this function will be assigned
to the whole result of `transform/4`. Any other returned value
will be safely ignored, and the accumulated value will be returned as is.

## Examples

iex> [1, 2, 3, 4]
...> |> Stream.transform(
...> fn -> [] end,
...> fn i, acc ->
...> case i do
...> i when rem(i, 2) == 0 -> {[acc], [i]}
...> _ -> {[], acc ++ [i]}
...> end
...> end,
...> fn rest -> # call me back
...> fn acc -> [rest | acc] end # append the tail
...> end)
...> |> Enum.to_list
[[1], [2, 3], [4]]


This function can be seen as a combination of `Stream.resource/3` with
`Stream.transform/3`.
"""
Expand All @@ -769,17 +786,15 @@ defmodule Stream do

defp do_transform(user_acc, _user, _fun, _next_op, next, {:halt, inner_acc}, _inner, after_fun) do
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, inner_acc}
{:halted, do_after(after_fun, {inner_acc, user_acc})}
end

defp do_transform(user_acc, user, fun, next_op, next, {:suspend, inner_acc}, inner, after_fun) do
{:suspended, inner_acc, &do_transform(user_acc, user, fun, next_op, next, &1, inner, after_fun)}
end

defp do_transform(user_acc, _user, _fun, :halt, _next, {_, inner_acc}, _inner, after_fun) do
do_after(after_fun, user_acc)
{:halted, inner_acc}
{:halted, do_after(after_fun, {inner_acc, user_acc})}
end

defp do_transform(user_acc, user, fun, :cont, next, inner_acc, inner, after_fun) do
Expand All @@ -788,7 +803,7 @@ defmodule Stream do
catch
kind, reason ->
stacktrace = System.stacktrace
do_after(after_fun, user_acc)
do_after(after_fun, {nil, user_acc})
:erlang.raise(kind, reason, stacktrace)
else
{:suspended, [val], next} ->
Expand All @@ -806,7 +821,7 @@ defmodule Stream do
kind, reason ->
stacktrace = System.stacktrace
next.({:halt, []})
do_after(after_fun, user_acc)
do_after(after_fun, {nil, user_acc})
:erlang.raise(kind, reason, stacktrace)
else
{[], user_acc} ->
Expand All @@ -816,8 +831,7 @@ defmodule Stream do
&Enumerable.List.reduce(list, &1, fun), after_fun)
{:halt, user_acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, elem(inner_acc, 1)}
{:halted, do_after(after_fun, {elem(inner_acc, 1), user_acc})}
{other, user_acc} ->
do_enum_transform(user_acc, user, fun, next_op, next, inner_acc, inner,
&Enumerable.reduce(other, &1, inner), after_fun)
Expand All @@ -830,15 +844,14 @@ defmodule Stream do
kind, reason ->
stacktrace = System.stacktrace
next.({:halt, []})
do_after(after_fun, user_acc)
do_after(after_fun, {nil, user_acc})
:erlang.raise(kind, reason, stacktrace)
else
{:done, acc} ->
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, acc} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:halted, do_after(after_fun, {acc, user_acc})}
{:suspended, acc, c} ->
{:suspended, acc, &do_list_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
Expand All @@ -851,7 +864,7 @@ defmodule Stream do
kind, reason ->
stacktrace = System.stacktrace
next.({:halt, []})
do_after(after_fun, user_acc)
do_after(after_fun, {nil, user_acc})
:erlang.raise(kind, reason, stacktrace)
else
# Only take into account outer halts when the op is not halt itself.
Expand All @@ -860,17 +873,21 @@ defmodule Stream do
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:halted, [_ | acc]} ->
next.({:halt, []})
do_after(after_fun, user_acc)
{:halted, acc}
{:halted, do_after(after_fun, {acc, user_acc})}
{:done, [_ | acc]} ->
do_transform(user_acc, user, fun, next_op, next, {:cont, acc}, inner, after_fun)
{:suspended, [_ | acc], c} ->
{:suspended, acc, &do_enum_transform(user_acc, user, fun, next_op, next, &1, inner, c, after_fun)}
end
end

defp do_after(nil, _user_acc), do: :ok
defp do_after(fun, user_acc), do: fun.(user_acc)
defp do_after(nil, {acc, _user_acc}), do: acc
defp do_after(fun, {acc, user_acc}) do
case fun.(user_acc) do
finalize when is_function(finalize, 1) -> finalize.(acc)
_ -> acc
end
end

defp do_transform_each(x, [:outer | acc], f) do
case f.(x, acc) do
Expand Down
22 changes: 22 additions & 0 deletions lib/elixir/test/elixir/stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,28 @@ defmodule StreamTest do
assert Process.get(:stream_transform)
end

test "transform/4 with a tail" do
assert [1, 2, 3, 4] |> Stream.transform(fn -> [] end,
fn i, acc ->
case i do
i when rem(i, 2) == 0 -> {[acc], [i]}
_ -> {[], acc ++ [i]}
end
end,
fn rest -> fn acc -> [rest | acc] end end) |> Enum.to_list == [[1], [2, 3], [4]]
end

test "transform/4 with a completely changed accumulator" do
assert [1, 2, 3, 4] |> Stream.transform(fn -> [] end,
fn i, acc ->
case i do
i when rem(i, 2) == 0 -> {[acc], [i]}
_ -> {[], acc ++ [i]}
end
end,
fn rest -> fn acc -> rest end end) |> Enum.to_list == [4]
end

test "transform/4 with early halt" do
stream = Stream.repeatedly(fn -> throw(:error) end)
|> Stream.transform(fn -> nil end, &{[&1, &2], &1},
Expand Down