From 781a7cc442b864ba8442eb5a2fdf2a1110e45cf4 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Fri, 9 May 2025 23:29:00 -0300 Subject: [PATCH 01/22] chore: release new version --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 7e029c20..8390484f 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule GRPC.Mixfile do use Mix.Project - @version "0.10.0" + @version "0.10.1" def project do [ From db695ec1d7ebbde87d94ddbe7f67a0345ffbc6a5 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Mon, 13 Oct 2025 21:36:28 -0300 Subject: [PATCH 02/22] bump 0.10.2 -> 0.11.0 --- README.md | 4 ++-- mix.exs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e4501c32..01d6b98d 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ The package can be installed as: ```elixir def deps do [ - {:grpc, "~> 0.10"}, + {:grpc, "~> 0.11"}, {:protobuf, "~> 0.14"}, # optional for import wellknown google types {:grpc_reflection, "~> 0.2"} # optional enable grpc reflection ] @@ -316,7 +316,7 @@ In mix.exs: ```elixir def deps do [ - {:grpc, "~> 0.10"}, + {:grpc, "~> 0.11"}, {:protobuf_generate, "~> 0.1.3"} ] end diff --git a/mix.exs b/mix.exs index a9c96aa7..0b401b32 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule GRPC.Mixfile do use Mix.Project - @version "0.10.2" + @version "0.11.0" def project do [ From 93517c8726e4828a0945efdc1340839af9dd160e Mon Sep 17 00:00:00 2001 From: sleipnir Date: Mon, 20 Oct 2025 13:55:51 -0300 Subject: [PATCH 03/22] feat: added new function to handle side-effects --- lib/grpc/stream.ex | 46 ++++++++++++++++++++++++++++++++++-- lib/grpc/stream/operators.ex | 12 +++++++--- test/grpc/stream_test.exs | 22 ++++++++++++++++- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index 9bc8f96b..b2e6a839 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -203,6 +203,48 @@ defmodule GRPC.Stream do |> Flow.run() end + @doc """ + Applies a **side-effecting function** to each element of the stream **without altering** its values. + + The `effect/2` function is useful for performing **imperative or external actions** + (such as logging, sending messages, collecting metrics, or debugging) + while preserving the original stream data. + + It behaves like `Enum.each/2`, but returns the stream itself so it can continue in the pipeline. + + ## Examples + + ```elixir + iex> parent = self() + iex> stream = + ...> GRPC.Stream.from([1, 2, 3]) + ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x}) end) + ...> |> GRPC.Stream.to_flow() + ...> |> Enum.to_list() + iex> assert_receive {:seen, 1} + iex> assert_receive {:seen, 2} + iex> assert_receive {:seen, 3} + iex> stream + [1, 2, 3] + ``` + In this example, the effect/2 function sends a message to the current process + for each element in the stream, but the resulting stream values remain unchanged. + + ## Parameters + + - `stream` — The input `GRPC.Stream`. + - `effect_fun` — A function that receives each item and performs a side effect + (e.g. IO.inspect/1, Logger.info/1, send/2, etc.). + + ### Notes + + - This function is **lazy** — the `effect_fun` will only run once the stream is materialized + (e.g. via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`). + - The use of `effect/2` ensures that the original item is returned unchanged, + enabling seamless continuation of the pipeline. + """ + defdelegate effect(stream, effect_fun), to: Operators + @doc """ Sends a request to an external process and awaits a response. @@ -220,9 +262,9 @@ defmodule GRPC.Stream do ## Returns - Updated stream if successful. - - `{:error, item, reason}` if the request fails or times out. + - `{:error, reason}` if the request fails or times out. """ - @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()} + @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, reason()} defdelegate ask(stream, target, timeout \\ 5000), to: Operators @doc """ diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index 1f3e15bb..a8735128 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -9,7 +9,7 @@ defmodule GRPC.Stream.Operators do @type reason :: any() @spec ask(GRPCStream.t(), pid | atom, non_neg_integer) :: - GRPCStream.t() | {:error, any(), :timeout | :not_alive} + GRPCStream.t() | {:error, :timeout | :process_not_alive} def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end %GRPCStream{stream | flow: Flow.map(flow, mapper)} @@ -33,7 +33,7 @@ defmodule GRPC.Stream.Operators do raise "Target #{inspect(target)} is not alive. Cannot send request to it." is_nil(resolved_target) -> - {:error, item, :not_alive} + {:error, :process_not_alive} true -> send(resolved_target, {:request, item, self()}) @@ -45,12 +45,18 @@ defmodule GRPC.Stream.Operators do if raise? do raise "Timeout waiting for response from #{inspect(target)}" else - {:error, item, :timeout} + {:error, :timeout} end end end end + @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t() + def effect(%GRPCStream{flow: flow} = stream, effect_fun) do + wrap = Flow.map(flow, fn item -> tap(item, effect_fun) end) + %GRPCStream{stream | flow: wrap} + end + @spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t() def filter(%GRPCStream{flow: flow} = stream, filter) do %GRPCStream{stream | flow: Flow.filter(flow, filter)} diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index eebfa6a9..aeb50c58 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -107,7 +107,7 @@ defmodule GRPC.StreamTest do |> GRPC.Stream.to_flow() |> Enum.to_list() - assert result == [{:error, "msg", :not_alive}] + assert result == [{:error, :process_not_alive}] end end @@ -177,6 +177,26 @@ defmodule GRPC.StreamTest do end end + describe "effect/2" do + test "applies side effects without altering values" do + parent = self() + + result = + GRPC.Stream.from([1, 2, 3]) + |> GRPC.Stream.effect(fn x -> send(parent, {:effect_called, x}) end) + |> GRPC.Stream.to_flow() + |> Enum.to_list() + + # Verifica se os valores do stream permanecem os mesmos + assert Enum.sort(result) == [1, 2, 3] + + # Verifica se o efeito colateral foi realmente chamado para cada item + assert_receive {:effect_called, 1} + assert_receive {:effect_called, 2} + assert_receive {:effect_called, 3} + end + end + describe "test complex operations" do test "pipeline with all GRPC.Stream operators" do target = From b03b83788251cfbebc9c16e7e7850051324df8c4 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Mon, 20 Oct 2025 14:07:38 -0300 Subject: [PATCH 04/22] chore: added doc, remove comments --- lib/grpc/stream/operators.ex | 2 +- test/grpc/stream_test.exs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index a8735128..8303f73f 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -52,7 +52,7 @@ defmodule GRPC.Stream.Operators do end @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t() - def effect(%GRPCStream{flow: flow} = stream, effect_fun) do + def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do wrap = Flow.map(flow, fn item -> tap(item, effect_fun) end) %GRPCStream{stream | flow: wrap} end diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index aeb50c58..45007bb6 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -187,10 +187,8 @@ defmodule GRPC.StreamTest do |> GRPC.Stream.to_flow() |> Enum.to_list() - # Verifica se os valores do stream permanecem os mesmos assert Enum.sort(result) == [1, 2, 3] - # Verifica se o efeito colateral foi realmente chamado para cada item assert_receive {:effect_called, 1} assert_receive {:effect_called, 2} assert_receive {:effect_called, 3} From b7af0d3f8a71868f4af539b1f4146606bfe6d199 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Tue, 21 Oct 2025 00:30:59 -0300 Subject: [PATCH 05/22] feat: added error handler unary and stream pipelines --- .../lib/helloworld_streams/server.ex | 8 +- lib/grpc/stream.ex | 105 ++++++++++++++++-- lib/grpc/stream/operators.ex | 89 +++++++++++++-- 3 files changed, 180 insertions(+), 22 deletions(-) diff --git a/examples/helloworld_streams/lib/helloworld_streams/server.ex b/examples/helloworld_streams/lib/helloworld_streams/server.ex index 76771939..a9976dba 100644 --- a/examples/helloworld_streams/lib/helloworld_streams/server.ex +++ b/examples/helloworld_streams/lib/helloworld_streams/server.ex @@ -14,8 +14,12 @@ defmodule HelloworldStreams.Server do def say_unary_hello(request, _materializer) do GRPCStream.unary(request) |> GRPCStream.ask(Transformer) - |> GRPCStream.map(fn %HelloReply{} = reply -> - %HelloReply{message: "[Reply] #{reply.message}"} + |> GRPCStream.map(fn + %HelloReply{} = reply -> + %HelloReply{message: "[Reply] #{reply.message}"} + + {:error, reason} -> + GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}") end) |> GRPCStream.run() end diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index b2e6a839..40af74e1 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -155,6 +155,19 @@ defmodule GRPC.Stream do # We have to call `Enum.to_list` because we want to actually run and materialize the full stream. # List.flatten and List.first are used so that we can obtain the first result of the materialized list. flow + |> Flow.map(fn + {:ok, msg} -> + msg + + {:error, %GRPC.RPCError{} = reason} -> + reason + + {:error, reason} -> + GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}") + + msg -> + msg + end) |> Enum.to_list() |> List.flatten() |> List.first() @@ -190,19 +203,83 @@ defmodule GRPC.Stream do raise ArgumentError, "run_with/3 is not supported for unary streams" end - dry_run? = Keyword.get(opts, :dry_run, false) - flow - |> Flow.map(fn msg -> - if not dry_run? do - send_response(from, msg) - end + |> Flow.map(fn + {:ok, msg} -> + send_response(from, msg, opts) + + flow + + {:error, %GRPC.RPCError{} = reason} -> + send_response(from, reason, opts) + flow - flow + {:error, reason} -> + msg = GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}") + send_response(from, msg, opts) + flow + + msg -> + send_response(from, msg, opts) + flow end) |> Flow.run() end + @doc """ + Intercepts and transforms **error tuples** or **unexpected exceptions** that occur + within a gRPC stream pipeline. + + `map_error/3` allows graceful handling or recovery from errors produced by previous + operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data. + + The provided `handler/1` function receives the error reason (or the exception struct) + and can either: + + * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error. + * Return any other value to recover from the failure and continue the pipeline. + + This makes it suitable for both **input validation** and **capturing unexpected runtime errors** + in stream transformations. + + ## Parameters + + - `stream` — The input stream or `Flow` pipeline. + - `func` — A function that takes an error reason or exception and returns either a new value or an error tuple. + + ## Returns + + - A new stream where all error tuples and raised exceptions are processed by `func/1`. + + ## Examples + + iex> GRPC.Stream.from([1, 2]) + ...> |> GRPC.Stream.map(fn + ...> 2 -> raise "boom" + ...> x -> x + ...> end) + ...> |> GRPC.Stream.map_error(fn + ...> {:error, _reason} -> + ...> GRPC.RPCError.exception(message: "Validation or runtime error") + ...> msg -> msg + ...> end) + + In this example: + + * The call to `GRPC.Stream.map/2` raises an exception for value `2`. + * `map_error/3` catches the error and wraps it in a `GRPC.RPCError` struct with a custom message. + * The pipeline continues execution, transforming errors into structured responses. + + ## Notes + + - `map_error/3` is **lazy** and only executes when the stream is materialized + (via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`). + + - Use this operator to implement **robust error recovery**, **input validation**, or + to normalize exceptions from downstream Flow stages into well-defined gRPC errors. + """ + defdelegate map_error(stream, func), to: Operators + @doc """ Applies a **side-effecting function** to each element of the stream **without altering** its values. @@ -218,12 +295,12 @@ defmodule GRPC.Stream do iex> parent = self() iex> stream = ...> GRPC.Stream.from([1, 2, 3]) - ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x}) end) + ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end) ...> |> GRPC.Stream.to_flow() ...> |> Enum.to_list() - iex> assert_receive {:seen, 1} iex> assert_receive {:seen, 2} - iex> assert_receive {:seen, 3} + iex> assert_receive {:seen, 4} + iex> assert_receive {:seen, 6} iex> stream [1, 2, 3] ``` @@ -411,7 +488,11 @@ defmodule GRPC.Stream do %__MODULE__{flow: flow, options: opts} end - defp send_response(from, msg) do - GRPC.Server.send_reply(from, msg) + defp send_response(from, msg, opts) do + dry_run? = Keyword.get(opts, :dry_run, false) + + if not dry_run? do + GRPC.Server.send_reply(from, msg) + end end end diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index 8303f73f..afa02ee5 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -11,7 +11,8 @@ defmodule GRPC.Stream.Operators do @spec ask(GRPCStream.t(), pid | atom, non_neg_integer) :: GRPCStream.t() | {:error, :timeout | :process_not_alive} def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do - mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end + # mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end + mapper = fn item -> safe_invoke(&do_ask(&1, target, timeout, raise_on_error: false), item) end %GRPCStream{stream | flow: Flow.map(flow, mapper)} end @@ -53,23 +54,38 @@ defmodule GRPC.Stream.Operators do @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t() def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do - wrap = Flow.map(flow, fn item -> tap(item, effect_fun) end) - %GRPCStream{stream | flow: wrap} + %GRPCStream{ + stream + | flow: + Flow.map(flow, fn flow_item -> + tap(flow_item, fn item -> safe_invoke(effect_fun, item) end) + end) + } end @spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t() def filter(%GRPCStream{flow: flow} = stream, filter) do - %GRPCStream{stream | flow: Flow.filter(flow, filter)} + flow_wrapper = Flow.filter(flow, fn item -> safe_invoke(filter, item) end) + %GRPCStream{stream | flow: flow_wrapper} end @spec flat_map(GRPCStream.t(), (term -> Enumerable.GRPCStream.t())) :: GRPCStream.t() def flat_map(%GRPCStream{flow: flow} = stream, flat_mapper) do - %GRPCStream{stream | flow: Flow.flat_map(flow, flat_mapper)} + flow_wrapper = + Flow.flat_map(flow, fn item -> + case safe_invoke(flat_mapper, item) do + {:error, reason} -> [{:error, reason}] + res -> res + end + end) + + %GRPCStream{stream | flow: flow_wrapper} end @spec map(GRPCStream.t(), (term -> term)) :: GRPCStream.t() def map(%GRPCStream{flow: flow} = stream, mapper) do - %GRPCStream{stream | flow: Flow.map(flow, mapper)} + flow_wrapper = Flow.map(flow, fn item -> safe_invoke(mapper, item) end) + %GRPCStream{stream | flow: flow_wrapper} end @spec map_with_context(GRPCStream.t(), (map(), term -> term)) :: GRPCStream.t() @@ -79,7 +95,41 @@ defmodule GRPC.Stream.Operators do mapper.(meta, item) end - %GRPCStream{stream | flow: Flow.map(flow, wrapper)} + flow_wrapper = Flow.map(flow, fn item -> safe_invoke(wrapper, item) end) + + %GRPCStream{stream | flow: flow_wrapper} + end + + @spec map_error(GRPCStream.t(), (reason -> term)) :: GRPCStream.t() + def map_error(%GRPCStream{flow: flow} = stream, func) when is_function(func, 1) do + mapper = + Flow.map(flow, fn + {:error, _reason} = item -> + res = safe_invoke(func, item) + + case res do + {:error, %GRPC.RPCError{} = new_reason} -> + {:error, new_reason} + + {:error, new_reason} -> + msg = "[Error] #{inspect(new_reason)}" + {:error, GRPC.RPCError.exception(message: msg)} + + {:ok, other} -> + other + + other -> + other + end + + {:ok, other} -> + other + + other -> + other + end) + + %GRPCStream{stream | flow: mapper} end @spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t() @@ -94,7 +144,8 @@ defmodule GRPC.Stream.Operators do @spec reject(GRPCStream.t(), (term -> term)) :: GRPCStream.t() def reject(%GRPCStream{flow: flow} = stream, filter) do - %GRPCStream{stream | flow: Flow.reject(flow, filter)} + flow_wrapper = Flow.reject(flow, fn item -> safe_invoke(filter, item) end) + %GRPCStream{stream | flow: flow_wrapper} end @spec uniq(GRPCStream.t()) :: GRPCStream.t() @@ -106,4 +157,26 @@ defmodule GRPC.Stream.Operators do def uniq_by(%GRPCStream{flow: flow} = stream, fun) do %GRPCStream{stream | flow: Flow.uniq_by(flow, fun)} end + + # Normalizes and catches exceptions/throws. + # Returns: + # value -> successful value + # {:error, reason} -> failure + defp safe_invoke(fun, arg) do + try do + res = fun.(arg) + + case res do + {:ok, v} -> v + {:error, reason} -> {:error, reason} + other -> other + end + rescue + e -> + {:error, e} + catch + kind, reason -> + {:error, {kind, reason}} + end + end end From 5ed5a8f08ccf13e4aacbc8e51b473549326dafcd Mon Sep 17 00:00:00 2001 From: sleipnir Date: Tue, 21 Oct 2025 00:31:23 -0300 Subject: [PATCH 06/22] test: added many more tests --- test/grpc/stream_test.exs | 155 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index 45007bb6..30ef46e9 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -193,6 +193,26 @@ defmodule GRPC.StreamTest do assert_receive {:effect_called, 2} assert_receive {:effect_called, 3} end + + test "continues pipeline even if effect function raises an error" do + parent = self() + + result = + GRPC.Stream.from([1, 2, 3]) + |> GRPC.Stream.effect(fn + 2 -> raise "boom" + x -> send(parent, {:effect_called, x}) + end) + |> GRPC.Stream.to_flow() + |> Enum.to_list() + + # Even with error 2, the pipeline should continue and return all elements + assert Enum.sort(result) == [1, 2, 3] + + # The effect should have been called for 1 and 3 + assert_receive {:effect_called, 1} + assert_receive {:effect_called, 3} + end end describe "test complex operations" do @@ -275,6 +295,141 @@ defmodule GRPC.StreamTest do end end + describe "ask/3 error handling" do + test "returns timeout error if response not received in time" do + pid = + spawn(fn -> + # do not send any response + receive do + _ -> :ok + end + end) + + result = + GRPC.Stream.from([:hello]) + # very short timeout + |> GRPC.Stream.ask(pid, 10) + |> GRPC.Stream.to_flow() + |> Enum.to_list() + + assert result == [{:error, :timeout}] + end + end + + describe "safe_invoke/2 handling {:ok, value} and direct value" do + test "maps {:ok, value} to value" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn x -> {:ok, x * 10} end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert result == [10, 20] + end + + test "keeps direct values as is" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn x -> x * 5 end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert result == [5, 10] + end + end + + describe "safe_invoke/2 catches errors" do + test "map/2 handles function returning {:error, reason}" do + stream = + GRPC.Stream.from([1, 2, 3]) + |> GRPC.Stream.map(fn + 2 -> {:error, :fail} + x -> x + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert results == [1, {:error, :fail}, 3] + end + + test "map/2 catches exceptions" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn + 2 -> raise "boom" + x -> x + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert match?([1, {:error, %RuntimeError{message: "boom", __exception__: true}}], results) + end + + test "flat_map/2 catches thrown values" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.flat_map(fn + 2 -> throw(:fail) + x -> [x] + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert results == [1, {:error, {:throw, :fail}}] + end + end + + describe "map_error/2" do + test "transforms {:error, reason} tuples" do + stream = + GRPC.Stream.from([{:error, :invalid_input}, {:ok, 42}, 100]) + |> GRPC.Stream.map_error(fn + {:error, :invalid_input} -> {:error, :mapped_error} + msg -> msg + end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + + assert Enum.sort(result) == [ + 42, + 100, + {:error, + %GRPC.RPCError{ + __exception__: true, + details: nil, + message: "[Error] :mapped_error", + status: nil + }} + ] + end + + test "transforms exceptions raised inside previous map" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn + 2 -> raise "boom" + x -> x + end) + |> GRPC.Stream.map_error(fn + {:error, %RuntimeError{message: "boom"}} -> + GRPC.RPCError.exception(message: "Validation or runtime error") + + msg -> + msg + end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + + assert match?( + [ + 1, + %GRPC.RPCError{ + message: "Validation or runtime error", + status: nil, + __exception__: true, + details: nil + } + ], + result + ) + end + end + defp receive_loop do receive do {:request, item, from} -> From eb6fdd44c6b8840d13224071f6631523b7995cce Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Wed, 22 Oct 2025 00:20:45 -0300 Subject: [PATCH 07/22] Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- lib/grpc/stream.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index 40af74e1..dac05697 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -207,7 +207,6 @@ defmodule GRPC.Stream do |> Flow.map(fn {:ok, msg} -> send_response(from, msg, opts) - flow {:error, %GRPC.RPCError{} = reason} -> From 093361778993c3e7f2eeb7b2f72e7a686cc9581c Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Wed, 22 Oct 2025 00:20:52 -0300 Subject: [PATCH 08/22] Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- lib/grpc/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index dac05697..aa5acbb5 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -226,7 +226,7 @@ defmodule GRPC.Stream do end @doc """ - Intercepts and transforms **error tuples** or **unexpected exceptions** that occur + Intercepts and transforms error tuples or unexpected exceptions that occur within a gRPC stream pipeline. `map_error/3` allows graceful handling or recovery from errors produced by previous From c73eba3f0e87b67d09a784b704eda577be994129 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 00:25:00 -0300 Subject: [PATCH 09/22] fix: correct return type in doc --- lib/grpc/stream.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index aa5acbb5..d4c9b078 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -238,7 +238,7 @@ defmodule GRPC.Stream do * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error. * Return any other value to recover from the failure and continue the pipeline. - This makes it suitable for both **input validation** and **capturing unexpected runtime errors** + This makes it suitable for both input validation and capturing unexpected runtime errors in stream transformations. ## Parameters @@ -259,7 +259,7 @@ defmodule GRPC.Stream do ...> end) ...> |> GRPC.Stream.map_error(fn ...> {:error, _reason} -> - ...> GRPC.RPCError.exception(message: "Validation or runtime error") + ...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")} ...> msg -> msg ...> end) From 9bf8fd26c203f8e5abe483fda699f180423e249c Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Wed, 22 Oct 2025 00:29:23 -0300 Subject: [PATCH 10/22] Update lib/grpc/stream/operators.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- lib/grpc/stream/operators.ex | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index afa02ee5..7a399112 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -163,20 +163,19 @@ defmodule GRPC.Stream.Operators do # value -> successful value # {:error, reason} -> failure defp safe_invoke(fun, arg) do - try do - res = fun.(arg) + res = fun.(arg) - case res do - {:ok, v} -> v - {:error, reason} -> {:error, reason} - other -> other - end - rescue - e -> - {:error, e} - catch - kind, reason -> - {:error, {kind, reason}} + case res do + {:ok, v} -> v + {:error, reason} -> {:error, reason} + other -> other end + + rescue + e -> + {:error, e} + catch + kind, reason -> + {:error, {kind, reason}} end end From acaf147a89a6b94e47ef6b9efca1fcaf6e2ff067 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 00:51:44 -0300 Subject: [PATCH 11/22] chore: updated after review --- lib/grpc/stream.ex | 1 - lib/grpc/stream/operators.ex | 5 +++-- test/grpc/stream_test.exs | 25 +++++++++++-------------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index d4c9b078..afbe7775 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -260,7 +260,6 @@ defmodule GRPC.Stream do ...> |> GRPC.Stream.map_error(fn ...> {:error, _reason} -> ...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")} - ...> msg -> msg ...> end) In this example: diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index 7a399112..e6fd944b 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -162,6 +162,8 @@ defmodule GRPC.Stream.Operators do # Returns: # value -> successful value # {:error, reason} -> failure + # {:error, {:exception, exception}} -> failure due to exception + # {:error, {kind, reason}} -> failure due to throw or exit defp safe_invoke(fun, arg) do res = fun.(arg) @@ -170,10 +172,9 @@ defmodule GRPC.Stream.Operators do {:error, reason} -> {:error, reason} other -> other end - rescue e -> - {:error, e} + {:error, {:exception, e}} catch kind, reason -> {:error, {kind, reason}} diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index 30ef46e9..b84f7aa3 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -358,7 +358,7 @@ defmodule GRPC.StreamTest do end) results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() - assert match?([1, {:error, %RuntimeError{message: "boom", __exception__: true}}], results) + assert match?([1, {:error, {:exception, %RuntimeError{message: "boom"}}}], results) end test "flat_map/2 catches thrown values" do @@ -379,8 +379,8 @@ defmodule GRPC.StreamTest do stream = GRPC.Stream.from([{:error, :invalid_input}, {:ok, 42}, 100]) |> GRPC.Stream.map_error(fn - {:error, :invalid_input} -> {:error, :mapped_error} - msg -> msg + {:error, :invalid_input} -> + {:error, :mapped_error} end) result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() @@ -406,11 +406,8 @@ defmodule GRPC.StreamTest do x -> x end) |> GRPC.Stream.map_error(fn - {:error, %RuntimeError{message: "boom"}} -> - GRPC.RPCError.exception(message: "Validation or runtime error") - - msg -> - msg + {:error, {:exception, %RuntimeError{message: "boom"}}} -> + {:error, GRPC.RPCError.exception(message: "Validation or runtime error")} end) result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() @@ -418,12 +415,12 @@ defmodule GRPC.StreamTest do assert match?( [ 1, - %GRPC.RPCError{ - message: "Validation or runtime error", - status: nil, - __exception__: true, - details: nil - } + {:error, + %GRPC.RPCError{ + status: nil, + message: "Validation or runtime error", + details: nil + }} ], result ) From f6bffaebb332fe31877a0f2b724e0b3ce56e099a Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 00:56:07 -0300 Subject: [PATCH 12/22] docs: adds a better explanation of the different types of input --- lib/grpc/stream.ex | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index afbe7775..c5f0f4cb 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -232,8 +232,13 @@ defmodule GRPC.Stream do `map_error/3` allows graceful handling or recovery from errors produced by previous operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data. - The provided `handler/1` function receives the error reason (or the exception struct) - and can either: + The provided `handler/1` function receives the error reason (or the exception struct) like: + + {:error, reason} -> failure + {:error, {:exception, exception}} -> failure due to exception + {:error, {kind, reason}} -> failure due to throw or exit + + And can either: * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error. * Return any other value to recover from the failure and continue the pipeline. @@ -258,7 +263,7 @@ defmodule GRPC.Stream do ...> x -> x ...> end) ...> |> GRPC.Stream.map_error(fn - ...> {:error, _reason} -> + ...> {:error, {:exception, _reason}} -> ...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")} ...> end) From 140e1a98f9c7b7c0739129f013b1630f00b99f3a Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 01:06:57 -0300 Subject: [PATCH 13/22] mix format --- lib/grpc/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index c5f0f4cb..bec3ee1f 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -237,7 +237,7 @@ defmodule GRPC.Stream do {:error, reason} -> failure {:error, {:exception, exception}} -> failure due to exception {:error, {kind, reason}} -> failure due to throw or exit - + And can either: * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error. From 34c8dd4ec6741988376a5be056757506b7f62792 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 15:53:11 -0300 Subject: [PATCH 14/22] test: reintroduces tests that were removed by mistake --- test/grpc/stream_test.exs | 135 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index f9b4e882..c20ed21a 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -165,6 +165,141 @@ defmodule GRPC.StreamTest do end end + describe "ask/3 error handling" do + test "returns timeout error if response not received in time" do + pid = + spawn(fn -> + # do not send any response + receive do + _ -> :ok + end + end) + + result = + GRPC.Stream.from([:hello]) + # very short timeout + |> GRPC.Stream.ask(pid, 10) + |> GRPC.Stream.to_flow() + |> Enum.to_list() + + assert result == [{:error, :timeout}] + end + end + + describe "safe_invoke/2 handling {:ok, value} and direct value" do + test "maps {:ok, value} to value" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn x -> {:ok, x * 10} end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert result == [10, 20] + end + + test "keeps direct values as is" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn x -> x * 5 end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert result == [5, 10] + end + end + + describe "safe_invoke/2 catches errors" do + test "map/2 handles function returning {:error, reason}" do + stream = + GRPC.Stream.from([1, 2, 3]) + |> GRPC.Stream.map(fn + 2 -> {:error, :fail} + x -> x + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert results == [1, {:error, :fail}, 3] + end + + test "map/2 catches exceptions" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn + 2 -> raise "boom" + x -> x + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert match?([1, {:error, {:exception, %RuntimeError{message: "boom"}}}], results) + end + + test "flat_map/2 catches thrown values" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.flat_map(fn + 2 -> throw(:fail) + x -> [x] + end) + + results = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + assert results == [1, {:error, {:throw, :fail}}] + end + end + + describe "map_error/2" do + test "transforms {:error, reason} tuples" do + stream = + GRPC.Stream.from([{:error, :invalid_input}, {:ok, 42}, 100]) + |> GRPC.Stream.map_error(fn + {:error, :invalid_input} -> {:error, :mapped_error} + msg -> msg + end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + + assert Enum.sort(result) == [ + 42, + 100, + {:error, + %GRPC.RPCError{ + __exception__: true, + details: nil, + message: ":mapped_error", + status: nil + }} + ] + end + + test "transforms exceptions raised inside previous map" do + stream = + GRPC.Stream.from([1, 2]) + |> GRPC.Stream.map(fn + 2 -> raise "boom" + x -> x + end) + |> GRPC.Stream.map_error(fn + {:error, %RuntimeError{message: "boom"}} -> + GRPC.RPCError.exception(message: "Validation or runtime error") + + msg -> + msg + end) + + result = stream |> GRPC.Stream.to_flow() |> Enum.to_list() + + assert match?( + [ + 1, + {:error, + %GRPC.RPCError{ + status: nil, + message: "{:exception, %RuntimeError{message: \"boom\"}}", + details: nil + }} + ], + result + ) + end + end + describe "map/2, flat_map/2, filter/2" do test "maps values correctly" do result = From 28ed00c8726bbb3cb0e25a4080ea93618389a1b9 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Wed, 22 Oct 2025 15:56:30 -0300 Subject: [PATCH 15/22] docs: introduces documentation for error handling and side effects --- README.md | 126 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 103 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 425ce6ec..e6688758 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,10 @@ - [Unary RPC using Stream API](#unary-rpc-using-stream-api) - [Server-Side Streaming](#server-side-streaming) - [Bidirectional Streaming](#bidirectional-streaming) + - [Effects and Error Handling](#effects-and-error-handling) + - [Side Effects](#side-effects-with-effect2) + - [Recovery from errors](#recovery-from-errors) + - [Unified Error Matching and Propagation](#unified-error-matching-and-propagation) - [Application Startup](#application-startup) - [Client Usage](#client-usage) - [Basic Connection and RPC](#basic-connection-and-rpc) @@ -144,28 +148,104 @@ def say_bid_stream_hello(request, materializer) do |> GRPC.Stream.run_with(materializer) end ``` -The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below: - -| Function | Description | Parameters / Options | -|:---------------------------------|:-------------|:----------------------| -| **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**
• `input` — stream, list, or gRPC struct.
**Options:**
• `:join_with` — PID or name of an external `GenStage` producer.
• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).
• `:propagate_context` — if `true`, propagates the materializer context.
• `:materializer` — the current `%GRPC.Server.Stream{}`.
• Other options supported by `Flow`. | -| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**
• `input` — single gRPC message.
**Options:** same as `from/2`. | -| **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**
• `stream` — `%GRPC.Stream{}` struct. | -| **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**
• `stream` — `%GRPC.Stream{}` with `unary: true` option. | -| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `materializer` — `%GRPC.Server.Stream{}`.
**Options:**
• `:dry_run` — if `true`, responses are not sent. | -| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `target` — PID or atom.
• `timeout` — in milliseconds. | -| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. | -| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — function `(item -> boolean)`. | -| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> Enumerable.t())`. | -| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)`. | -| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(context, item -> term)`. | -| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `opts` — partitioning options (`Flow.partition/2`). | -| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `acc_fun` — initializer function `() -> acc`.
• `reducer_fun` — `(item, acc -> acc)`. | -| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**
• `stream` — `%GRPC.Stream{}`. | -| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)` for uniqueness determination. | -| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**
• `stream` — `%GRPC.Server.Stream{}`.
**Returns:** `map` containing decoded headers. | - -For a complete list of available operators see [here](lib/grpc/stream.ex). +The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex). + +--- + +### Effects and Error Handling + +#### Side Effects + +The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications. + +Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values — it preserves the original stream while executing the provided callback safely for each emitted element. + +```elixir +iex> parent = self() +iex> stream = +...> GRPC.Stream.from([1, 2, 3]) +...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end) +...> |> GRPC.Stream.to_flow() +...> |> Enum.to_list() +iex> assert_receive {:seen, 2} +iex> assert_receive {:seen, 4} +iex> assert_receive {:seen, 6} +iex> stream +[1, 2, 3] +``` + +Key characteristics: + +* The callback function (`effect_fun`) is invoked for each item emitted downstream. +* The result of the callback is ignored, ensuring that the stream’s structure and values remain unchanged. +* Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1. +* Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow. + +This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream. + +--- + +#### Recovery from errors + +The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline. + +It provides a unified mechanism for handling: + +* Expected errors, such as validation or domain failures (`{:error, reason}`) +* Unexpected runtime errors, including raised or thrown exceptions inside other operators. + +```elixir +iex> GRPC.Stream.from([1, 2]) +...> |> GRPC.Stream.map(fn +...> 2 -> raise "boom" +...> x -> x +...> end) +...> |> GRPC.Stream.map_error(fn +...> {:error, {:exception, _reason}} -> +...> {:error, GRPC.RPCError.exception(message: "Booomm")} +...> end) +``` + +In this example: + +* The function inside `map/2` raises an exception for the value `2`. +* map_error/2 captures and transforms that error into a structured `GRPC.RPCError` response. +* The stream continues processing without being interrupted. + +This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines. + +--- + +#### Unified Error Matching and Propagation + +All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline. + +This ensures that user-defined functions within the stream — whether pure transformations, side effects, or external calls — always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors. + +```elixir +def say_unary_hello(request, _materializer) do + GRPCStream.unary(request) + |> GRPCStream.ask(Transformer) + |> GRPCStream.map(fn + %HelloReply{} = reply -> + %HelloReply{message: "[Reply] #{reply.message}"} + + {:error, reason} -> + {:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")} + + error -> + Logger.error("Unknown error") + error + end) + |> GRPCStream.run() +end +``` + +By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution. + +This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors. + +>_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context. --- @@ -175,7 +255,7 @@ Add the server supervisor to your application's supervision tree: ```elixir defmodule Helloworld.Application do - @ false + @moduledoc false use Application @impl true From 73a89eb18a032dcb05377987b8a4670f4a793a9b Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:18:23 -0300 Subject: [PATCH 16/22] Update README.md Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e6688758..6eb2f6d5 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,8 @@ defmodule HelloworldStreams.Server do @spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any() def say_unary_hello(request, materializer) do - GRPC.Stream.unary(request, materializer: materializer) + request + |> GRPC.Stream.unary(materializer: materializer) |> GRPC.Stream.map(fn %HelloReply{} = reply -> %HelloReply{message: "[Reply] #{reply.message}"} end) From aa2d81c5df5b2015225c89dcaa5e264c4f8ebef1 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:18:40 -0300 Subject: [PATCH 17/22] Update README.md Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6eb2f6d5..67ce8648 100644 --- a/README.md +++ b/README.md @@ -210,7 +210,7 @@ iex> GRPC.Stream.from([1, 2]) In this example: * The function inside `map/2` raises an exception for the value `2`. -* map_error/2 captures and transforms that error into a structured `GRPC.RPCError` response. +* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response. * The stream continues processing without being interrupted. This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines. From aee6678aede8f00fb9a8daa8270b8a03e1eb6059 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:18:52 -0300 Subject: [PATCH 18/22] Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- lib/grpc/stream.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index 83f9f91e..f1bcf9e9 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -216,7 +216,7 @@ defmodule GRPC.Stream do end @doc """ - Applies a **side-effecting function** to each element of the stream **without altering** its values. + Applies a side-effect function to each element of the stream without altering its values. The `effect/2` function is useful for performing **imperative or external actions** (such as logging, sending messages, collecting metrics, or debugging) From f3f1df742c12c92c457c83375f4f55a4f4f47601 Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:20:39 -0300 Subject: [PATCH 19/22] Update test/grpc/integration/server_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- test/grpc/integration/server_test.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs index 0e2fe787..62b1f168 100644 --- a/test/grpc/integration/server_test.exs +++ b/test/grpc/integration/server_test.exs @@ -45,8 +45,7 @@ defmodule GRPC.Integration.ServerTest do end def stream_messages(msg_request, materializer) do - Stream.cycle(1..5) - |> Stream.take(5) + 1..5 |> GRPC.Stream.from() |> GRPC.Stream.map(fn i -> %Transcode.Message{ From bcb38fa2430194b2bd9346d6cddfa020ba6dea4a Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:20:59 -0300 Subject: [PATCH 20/22] Update test/grpc/stream_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- test/grpc/stream_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index c20ed21a..83ff1b01 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -168,7 +168,7 @@ defmodule GRPC.StreamTest do describe "ask/3 error handling" do test "returns timeout error if response not received in time" do pid = - spawn(fn -> + spawn_link(fn -> # do not send any response receive do _ -> :ok From a6bb159f6223909a2c273deb6d7d4fb207eed53f Mon Sep 17 00:00:00 2001 From: Adriano Santos Date: Mon, 27 Oct 2025 13:21:33 -0300 Subject: [PATCH 21/22] Update test/grpc/stream_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> --- test/grpc/stream_test.exs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs index 83ff1b01..e2166233 100644 --- a/test/grpc/stream_test.exs +++ b/test/grpc/stream_test.exs @@ -169,10 +169,7 @@ defmodule GRPC.StreamTest do test "returns timeout error if response not received in time" do pid = spawn_link(fn -> - # do not send any response - receive do - _ -> :ok - end + Process.sleep(:infinity) end) result = From 8e8eb4df4b57361a2630f5e42f970d8a22b8dc21 Mon Sep 17 00:00:00 2001 From: sleipnir Date: Mon, 27 Oct 2025 13:45:14 -0300 Subject: [PATCH 22/22] chore: resolve review comments --- lib/grpc/stream.ex | 6 +--- lib/grpc/stream/operators.ex | 52 ++++++++++++--------------- test/grpc/integration/server_test.exs | 1 + 3 files changed, 25 insertions(+), 34 deletions(-) diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex index f1bcf9e9..ea113d41 100644 --- a/lib/grpc/stream.ex +++ b/lib/grpc/stream.ex @@ -272,12 +272,8 @@ defmodule GRPC.Stream do - `target`: Target process PID or atom name. - `timeout`: Timeout in milliseconds (defaults to `5000`). - ## Returns - - - Updated stream if successful. - - `{:error, reason}` if the request fails or times out. """ - @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, reason()} + @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, :timeout | :process_not_alive} defdelegate ask(stream, target, timeout \\ 5000), to: Operators @doc """ diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex index 9e5a5f35..3404abbf 100644 --- a/lib/grpc/stream/operators.ex +++ b/lib/grpc/stream/operators.ex @@ -11,7 +11,6 @@ defmodule GRPC.Stream.Operators do @spec ask(GRPCStream.t(), pid | atom, non_neg_integer) :: GRPCStream.t() | {:error, :timeout | :process_not_alive} def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do - # mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end mapper = fn item -> safe_invoke(&do_ask(&1, target, timeout, raise_on_error: false), item) end %GRPCStream{stream | flow: Flow.map(flow, mapper)} end @@ -54,13 +53,12 @@ defmodule GRPC.Stream.Operators do @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t() def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do - %GRPCStream{ - stream - | flow: - Flow.map(flow, fn flow_item -> - tap(flow_item, fn item -> safe_invoke(effect_fun, item) end) - end) - } + flow = + Flow.map(flow, fn flow_item -> + tap(flow_item, fn item -> safe_invoke(effect_fun, item) end) + end) + + %GRPCStream{stream | flow: flow} end @spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t() @@ -104,32 +102,28 @@ defmodule GRPC.Stream.Operators do def map_error(%GRPCStream{flow: flow} = stream, func) when is_function(func, 1) do mapper = Flow.map(flow, fn - {:error, _reason} = item -> - res = safe_invoke(func, item) - - case res do - {:error, %GRPC.RPCError{} = new_reason} -> - {:error, new_reason} - - {:error, new_reason} -> - msg = "#{inspect(new_reason)}" - {:error, GRPC.RPCError.exception(message: msg)} + {:error, reason} -> handle_error(func, reason) + {:ok, value} -> value + other -> other + end) - {:ok, other} -> - other + %GRPCStream{stream | flow: mapper} + end - other -> - other - end + defp handle_error(func, reason) do + case safe_invoke(func, {:error, reason}) do + {:error, %GRPC.RPCError{} = rpc_error} -> + {:error, rpc_error} - {:ok, other} -> - other + {:error, other_reason} -> + {:error, GRPC.RPCError.exception(message: inspect(other_reason))} - other -> - other - end) + {:ok, value} -> + value - %GRPCStream{stream | flow: mapper} + other -> + other + end end @spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t() diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs index 62b1f168..36ba357f 100644 --- a/test/grpc/integration/server_test.exs +++ b/test/grpc/integration/server_test.exs @@ -46,6 +46,7 @@ defmodule GRPC.Integration.ServerTest do def stream_messages(msg_request, materializer) do 1..5 + |> Stream.take(5) |> GRPC.Stream.from() |> GRPC.Stream.map(fn i -> %Transcode.Message{