Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from_stages - noproc errors when GenStage producer finishes early #88

Closed
sunaku opened this issue Jul 24, 2019 · 4 comments
Closed

from_stages - noproc errors when GenStage producer finishes early #88

sunaku opened this issue Jul 24, 2019 · 4 comments

Comments

@sunaku
Copy link
Contributor

sunaku commented Jul 24, 2019

Hello,

I'm getting :noproc errors when I connect a short-lived GenStage producer to a Flow using Flow.from_stages(). Below is a minimal example that reproduces the problem I'm seeing:

  1. Passing through Flow using multiple stages works correctly. ✔️
  2. Mapping through Flow using a single stage works correctly. ✔️
  3. Mapping through Flow using multiple stages triggers the error. 💥
Erlang/OTP 22 [erts-10.4] [source] [64-bit] [smp:32:32] [ds:32:32:10] [async-threads:1] [hipe]

Interactive Elixir (1.9.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 2) |> Enum.to_list()
[1, 2, 3]
iex(2)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 1) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms);
ms end) |> Enum.to_list()
"#PID<0.194.0>: sleep 1"
"#PID<0.194.0>: sleep 2"
"#PID<0.194.0>: sleep 3"
[1000, 2000, 3000]
iex(3)> {:ok, producer} = GenStage.from_enumerable(1..3, link: false); Flow.from_stages([producer], stages: 2) |> Flow.map(fn i -> ms = :timer.seconds(i); IO.inspect("#{inspect(self)}: sleep #{i}"); Process.sleep(ms);
ms end) |> Enum.to_list()
"#PID<0.200.0>: sleep 1"

12:14:48.166 [info]  GenStage consumer #PID<0.201.0> is stopping after receiving cancel from producer #PID<0.197.0> with reason: :noproc


12:14:48.180 [error] GenServer #PID<0.201.0> terminating
** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.3205571650.1190133762.109920>, :process, #PID<0.197.0>, :noproc}
State: {%{}, %{done?: true, producers: %{}, trigger: #Function<2.127884580/3 in Flow.Window.Global.materialize/5>}, {1, 2}, [], #Function<33.87744541/4 in Flow.Materialize.mapper_ops/1>}
"#PID<0.200.0>: sleep 2"
"#PID<0.200.0>: sleep 3"
** (exit) exited in: GenStage.close_stream(%{#Reference<0.3205571650.1190133762.109930> => {:subscribed, #PID<0.200.0>, :transient, 500, 1000, 1000}})
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (gen_stage) lib/gen_stage/stream.ex:160: GenStage.Stream.close_stream/1
    (elixir) lib/stream.ex:1400: Stream.do_resource/5
    (elixir) lib/enum.ex:3023: Enum.reverse/1
    (elixir) lib/enum.ex:2668: Enum.to_list/1
iex(3)>

How can I safely connect a short-lived GenStage producer to a Flow using multiple stages? 😕 See also GenStage: How to cancel a Flow from the producer? whose answer relies on an outdated GenStage API.

Thanks for your consideration.

@josevalim
Copy link
Member

What is your flow version? Have you tried flow master?

@josevalim
Copy link
Member

Also please report your gen_stage version and make sure you are running on latest gen_stage too.

@sunaku
Copy link
Contributor Author

sunaku commented Jul 24, 2019

Sorry for omitting such basic information 😅 Here are the versions I'm using, according to mix.lock: 💁‍♂️

  "flow": {:hex, :flow, "0.14.3", "0d92991fe53035894d24aa8dec10dcfccf0ae00c4ed436ace3efa9813a646902", [:mix], [{:gen_stage, "~> 0.14.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
  "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},

I will try to reproduce the issue using the git master versions of these packages and report back shortly.

@sunaku
Copy link
Contributor Author

sunaku commented Jul 24, 2019

Thanks! 🎉 Upgrading to the git master version of Flow (although GenStage hasn't changed) has solved it:

-  "flow": {:hex, :flow, "0.14.3", "0d92991fe53035894d24aa8dec10dcfccf0ae00c4ed436ace3efa9813a646902", [:mix], [{:gen_stage, "~> 0.14.0", [hex: :gen_stage, repo: "hexpm", optional: false]}], "hexpm"},
+  "flow": {:git, "https://github.com/plataformatec/flow.git", "1ffac6a801602bf8b02192488e58ce5728b581aa", []},
   "gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants