diff --git a/lib/elixir/lib/task/supervised.ex b/lib/elixir/lib/task/supervised.ex index 3fc183279d8..1697d989ced 100644 --- a/lib/elixir/lib/task/supervised.ex +++ b/lib/elixir/lib/task/supervised.ex @@ -377,16 +377,25 @@ defmodule Task.Supervised do stream_close(config) :erlang.raise(kind, reason, __STACKTRACE__) else - {:suspended, [value], next} -> - waiting = stream_spawn(value, spawned, waiting, config) - stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next, config) - - {_, [value]} -> - waiting = stream_spawn(value, spawned, waiting, config) - stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done, config) - {_, []} -> stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done, config) + + result -> + {values, next} = + case result do + {:suspended, values = [_ | _], next} -> {values, next} + {_, values = [_ | _]} -> {values, :done} + end + + # right fold because values are in reverse order + {waiting, spawned} = + List.foldr(values, {waiting, spawned}, fn value, {waiting, spawned} -> + waiting = stream_spawn(value, spawned, waiting, config) + {waiting, spawned + 1} + end) + + max = max - length(values) + stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config) end end diff --git a/lib/elixir/test/elixir/task_test.exs b/lib/elixir/test/elixir/task_test.exs index 6335b7b15df..766f93e4ee6 100644 --- a/lib/elixir/test/elixir/task_test.exs +++ b/lib/elixir/test/elixir/task_test.exs @@ -904,6 +904,16 @@ defmodule TaskTest do |> Enum.to_list() == [ok: :ok] end + test "stream concatenation edge case" do + result = + Stream.take([:foo, :bar], 1) + |> Stream.concat([1, 2]) + |> Task.async_stream(& &1) + |> Enum.to_list() + + assert result == [ok: :foo, ok: 1, ok: 2] + end + test "with $callers" do grandparent = self()