From 69e7e4c8d682ad1e81568c72ff0f7945ed4c304f Mon Sep 17 00:00:00 2001 From: sabiwara Date: Tue, 18 Feb 2025 11:46:11 +0900 Subject: [PATCH] Fix Stream.concat/2 bug when used in async_stream Close https://github.com/elixir-lang/elixir/issues/14277 --- lib/elixir/lib/task/supervised.ex | 25 +++++++++++++++++-------- lib/elixir/test/elixir/task_test.exs | 10 ++++++++++ 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/lib/elixir/lib/task/supervised.ex b/lib/elixir/lib/task/supervised.ex index 3fc183279d8..1697d989ced 100644 --- a/lib/elixir/lib/task/supervised.ex +++ b/lib/elixir/lib/task/supervised.ex @@ -377,16 +377,25 @@ defmodule Task.Supervised do stream_close(config) :erlang.raise(kind, reason, __STACKTRACE__) else - {:suspended, [value], next} -> - waiting = stream_spawn(value, spawned, waiting, config) - stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, next, config) - - {_, [value]} -> - waiting = stream_spawn(value, spawned, waiting, config) - stream_reduce({:cont, acc}, max - 1, spawned + 1, delivered, waiting, :done, config) - {_, []} -> stream_reduce({:cont, acc}, max, spawned, delivered, waiting, :done, config) + + result -> + {values, next} = + case result do + {:suspended, values = [_ | _], next} -> {values, next} + {_, values = [_ | _]} -> {values, :done} + end + + # right fold because values are in reverse order + {waiting, spawned} = + List.foldr(values, {waiting, spawned}, fn value, {waiting, spawned} -> + waiting = stream_spawn(value, spawned, waiting, config) + {waiting, spawned + 1} + end) + + max = max - length(values) + stream_reduce({:cont, acc}, max, spawned, delivered, waiting, next, config) end end diff --git a/lib/elixir/test/elixir/task_test.exs b/lib/elixir/test/elixir/task_test.exs index 6335b7b15df..766f93e4ee6 100644 --- a/lib/elixir/test/elixir/task_test.exs +++ b/lib/elixir/test/elixir/task_test.exs @@ -904,6 +904,16 @@ defmodule TaskTest do |> Enum.to_list() == [ok: :ok] end + test "stream concatenation edge case" do + result = + Stream.take([:foo, :bar], 1) + |> Stream.concat([1, 2]) + |> Task.async_stream(& &1) + |> Enum.to_list() + + assert result == [ok: :foo, ok: 1, ok: 2] + end + test "with $callers" do grandparent = self()