diff --git a/README.md b/README.md
index 6f266dc7..67ce8648 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,10 @@
- [Unary RPC using Stream API](#unary-rpc-using-stream-api)
- [Server-Side Streaming](#server-side-streaming)
- [Bidirectional Streaming](#bidirectional-streaming)
+ - [Effects and Error Handling](#effects-and-error-handling)
+ - [Side Effects](#side-effects-with-effect2)
+ - [Recovery from errors](#recovery-from-errors)
+ - [Unified Error Matching and Propagation](#unified-error-matching-and-propagation)
- [Application Startup](#application-startup)
- [Client Usage](#client-usage)
- [Basic Connection and RPC](#basic-connection-and-rpc)
@@ -101,8 +105,9 @@ defmodule HelloworldStreams.Server do
alias Helloworld.HelloReply
@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
- def say_unary_hello(request, _materializer) do
- GRPC.Stream.unary(request)
+ def say_unary_hello(request, materializer) do
+ request
+ |> GRPC.Stream.unary(materializer: materializer)
|> GRPC.Stream.map(fn %HelloReply{} = reply ->
%HelloReply{message: "[Reply] #{reply.message}"}
end)
@@ -144,28 +149,104 @@ def say_bid_stream_hello(request, materializer) do
|> GRPC.Stream.run_with(materializer)
end
```
-The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:
-
-| Function | Description | Parameters / Options |
-|:---------------------------------|:-------------|:----------------------|
-| **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**
• `input` — stream, list, or gRPC struct.
**Options:**
• `:join_with` — PID or name of an external `GenStage` producer.
• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).
• `:propagate_context` — if `true`, propagates the materializer context.
• `:materializer` — the current `%GRPC.Server.Stream{}`.
• Other options supported by `Flow`. |
-| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**
• `input` — single gRPC message.
**Options:** same as `from/2`. |
-| **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**
• `stream` — `%GRPC.Stream{}` struct. |
-| **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**
• `stream` — `%GRPC.Stream{}` with `unary: true` option. |
-| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `materializer` — `%GRPC.Server.Stream{}`.
**Options:**
• `:dry_run` — if `true`, responses are not sent. |
-| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `target` — PID or atom.
• `timeout` — in milliseconds. |
-| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
-| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — function `(item -> boolean)`. |
-| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> Enumerable.t())`. |
-| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)`. |
-| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(context, item -> term)`. |
-| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `opts` — partitioning options (`Flow.partition/2`). |
-| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `acc_fun` — initializer function `() -> acc`.
• `reducer_fun` — `(item, acc -> acc)`. |
-| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**
• `stream` — `%GRPC.Stream{}`. |
-| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**
• `stream` — `%GRPC.Stream{}`.
• `fun` — `(item -> term)` for uniqueness determination. |
-| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**
• `stream` — `%GRPC.Server.Stream{}`.
**Returns:** `map` containing decoded headers. |
-
-For a complete list of available operators see [here](lib/grpc/stream.ex).
+The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex).
+
+---
+
+### Effects and Error Handling
+
+#### Side Effects
+
+The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications.
+
+Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values — it preserves the original stream while executing the provided callback safely for each emitted element.
+
+```elixir
+iex> parent = self()
+iex> stream =
+...> GRPC.Stream.from([1, 2, 3])
+...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end)
+...> |> GRPC.Stream.to_flow()
+...> |> Enum.to_list()
+iex> assert_receive {:seen, 2}
+iex> assert_receive {:seen, 4}
+iex> assert_receive {:seen, 6}
+iex> stream
+[1, 2, 3]
+```
+
+Key characteristics:
+
+* The callback function (`effect_fun`) is invoked for each item emitted downstream.
+* The result of the callback is ignored, ensuring that the stream’s structure and values remain unchanged.
+* Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1.
+* Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow.
+
+This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream.
+
+---
+
+#### Recovery from errors
+
+The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline.
+
+It provides a unified mechanism for handling:
+
+* Expected errors, such as validation or domain failures (`{:error, reason}`)
+* Unexpected runtime errors, including raised or thrown exceptions inside other operators.
+
+```elixir
+iex> GRPC.Stream.from([1, 2])
+...> |> GRPC.Stream.map(fn
+...> 2 -> raise "boom"
+...> x -> x
+...> end)
+...> |> GRPC.Stream.map_error(fn
+...> {:error, {:exception, _reason}} ->
+...> {:error, GRPC.RPCError.exception(message: "Booomm")}
+...> end)
+```
+
+In this example:
+
+* The function inside `map/2` raises an exception for the value `2`.
+* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response.
+* The stream continues processing without being interrupted.
+
+This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines.
+
+---
+
+#### Unified Error Matching and Propagation
+
+All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline.
+
+This ensures that user-defined functions within the stream — whether pure transformations, side effects, or external calls — always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors.
+
+```elixir
+def say_unary_hello(request, _materializer) do
+ GRPCStream.unary(request)
+ |> GRPCStream.ask(Transformer)
+ |> GRPCStream.map(fn
+ %HelloReply{} = reply ->
+ %HelloReply{message: "[Reply] #{reply.message}"}
+
+ {:error, reason} ->
+ {:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")}
+
+ error ->
+ Logger.error("Unknown error")
+ error
+ end)
+ |> GRPCStream.run()
+end
+```
+
+By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution.
+
+This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors.
+
+>_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context.
---
@@ -175,7 +256,7 @@ Add the server supervisor to your application's supervision tree:
```elixir
defmodule Helloworld.Application do
- @ false
+ @moduledoc false
use Application
@impl true
diff --git a/examples/helloworld_streams/lib/helloworld_streams/server.ex b/examples/helloworld_streams/lib/helloworld_streams/server.ex
index 76771939..a9976dba 100644
--- a/examples/helloworld_streams/lib/helloworld_streams/server.ex
+++ b/examples/helloworld_streams/lib/helloworld_streams/server.ex
@@ -14,8 +14,12 @@ defmodule HelloworldStreams.Server do
def say_unary_hello(request, _materializer) do
GRPCStream.unary(request)
|> GRPCStream.ask(Transformer)
- |> GRPCStream.map(fn %HelloReply{} = reply ->
- %HelloReply{message: "[Reply] #{reply.message}"}
+ |> GRPCStream.map(fn
+ %HelloReply{} = reply ->
+ %HelloReply{message: "[Reply] #{reply.message}"}
+
+ {:error, reason} ->
+ GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")
end)
|> GRPCStream.run()
end
diff --git a/lib/grpc/stream.ex b/lib/grpc/stream.ex
index 9099dd54..ea113d41 100644
--- a/lib/grpc/stream.ex
+++ b/lib/grpc/stream.ex
@@ -158,7 +158,7 @@ defmodule GRPC.Stream do
"GRPC.Stream.run/1 requires a materializer to be set in the GRPC.Stream"
end
- send_response(materializer, Enum.at(flow, 0))
+ send_response(materializer, Enum.at(flow, 0), opts)
:noreply
end
@@ -193,19 +193,71 @@ defmodule GRPC.Stream do
raise ArgumentError, "run_with/3 is not supported for unary streams"
end
- dry_run? = Keyword.get(opts, :dry_run, false)
-
flow
- |> Flow.map(fn msg ->
- if not dry_run? do
- send_response(from, msg)
- end
-
- flow
+ |> Flow.map(fn
+ {:ok, msg} ->
+ send_response(from, msg, opts)
+ flow
+
+ {:error, %GRPC.RPCError{} = reason} ->
+ send_response(from, reason, opts)
+ flow
+
+ {:error, reason} ->
+ msg = GRPC.RPCError.exception(message: "#{inspect(reason)}")
+ send_response(from, msg, opts)
+ flow
+
+ msg ->
+ send_response(from, msg, opts)
+ flow
end)
|> Flow.run()
end
+ @doc """
+ Applies a side-effect function to each element of the stream without altering its values.
+
+ The `effect/2` function is useful for performing **imperative or external actions**
+ (such as logging, sending messages, collecting metrics, or debugging)
+ while preserving the original stream data.
+
+ It behaves like `Enum.each/2`, but returns the stream itself so it can continue in the pipeline.
+
+ ## Examples
+
+ ```elixir
+ iex> parent = self()
+ iex> stream =
+ ...> GRPC.Stream.from([1, 2, 3])
+ ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end)
+ ...> |> GRPC.Stream.to_flow()
+ ...> |> Enum.to_list()
+ iex> assert_receive {:seen, 2}
+ iex> assert_receive {:seen, 4}
+ iex> assert_receive {:seen, 6}
+ iex> stream
+ [1, 2, 3]
+ ```
+ In this example, the effect/2 function sends a message to the current process
+ for each element in the stream, but the resulting stream values remain unchanged.
+
+ ## Parameters
+
+ - `stream` — The input `GRPC.Stream`.
+ - `effect_fun` — A function that receives each item and performs a side effect
+ (e.g. IO.inspect/1, Logger.info/1, send/2, etc.).
+
+ ### Notes
+
+ - This function is **lazy** — the `effect_fun` will only run once the stream is materialized
+ (e.g. via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).
+ - The use of `effect/2` ensures that the original item is returned unchanged,
+ enabling seamless continuation of the pipeline.
+ """
+ @spec effect(t(), (term -> any)) :: t()
+ defdelegate effect(stream, effect_fun), to: Operators
+
@doc """
Sends a request to an external process and awaits a response.
@@ -220,12 +272,8 @@ defmodule GRPC.Stream do
- `target`: Target process PID or atom name.
- `timeout`: Timeout in milliseconds (defaults to `5000`).
- ## Returns
-
- - Updated stream if successful.
- - `{:error, item, reason}` if the request fails or times out.
"""
- @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()}
+ @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, :timeout | :process_not_alive}
defdelegate ask(stream, target, timeout \\ 5000), to: Operators
@doc """
@@ -261,6 +309,64 @@ defmodule GRPC.Stream do
@spec map(t(), (term -> term)) :: t()
defdelegate map(stream, mapper), to: Operators
+ @doc """
+ Intercepts and transforms error tuples or unexpected exceptions that occur
+ within a gRPC stream pipeline.
+
+ `map_error/3` allows graceful handling or recovery from errors produced by previous
+ operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data.
+
+ The provided `handler/1` function receives the error reason (or the exception struct) like:
+
+ {:error, reason} -> failure
+ {:error, {:exception, exception}} -> failure due to exception
+ {:error, {kind, reason}} -> failure due to throw or exit
+
+ And can either:
+
+ * Return a new error tuple — e.g. `{:error, new_reason}` — to re-emit a modified error.
+ * Return any other value to recover from the failure and continue the pipeline.
+
+ This makes it suitable for both input validation and capturing unexpected runtime errors
+ in stream transformations.
+
+ ## Parameters
+
+ - `stream` — The input stream or `Flow` pipeline.
+ - `func` — A function that takes an error reason or exception and returns either a new value or an error tuple.
+
+ ## Returns
+
+ - A new stream where all error tuples and raised exceptions are processed by `func/1`.
+
+ ## Examples
+
+ iex> GRPC.Stream.from([1, 2])
+ ...> |> GRPC.Stream.map(fn
+ ...> 2 -> raise "boom"
+ ...> x -> x
+ ...> end)
+ ...> |> GRPC.Stream.map_error(fn
+ ...> {:error, {:exception, _reason}} ->
+ ...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")}
+ ...> end)
+
+ In this example:
+
+ * The call to `GRPC.Stream.map/2` raises an exception for value `2`.
+ * `map_error/3` catches the error and wraps it in a `GRPC.RPCError` struct with a custom message.
+ * The pipeline continues execution, transforming errors into structured responses.
+
+ ## Notes
+
+ - `map_error/3` is **lazy** and only executes when the stream is materialized
+ (via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`).
+
+ - Use this operator to implement **robust error recovery**, **input validation**, or
+ to normalize exceptions from downstream Flow stages into well-defined gRPC errors.
+ """
+ defdelegate map_error(stream, func), to: Operators
+
@doc """
Applies a transformation function to each stream item, passing the context as an additional argument.
This is useful for operations that require access to the stream's headers.
@@ -386,7 +492,11 @@ defmodule GRPC.Stream do
%__MODULE__{flow: flow, options: opts}
end
- defp send_response(from, msg) do
- GRPC.Server.send_reply(from, msg)
+ defp send_response(from, msg, opts) do
+ dry_run? = Keyword.get(opts, :dry_run, false)
+
+ if not dry_run? do
+ GRPC.Server.send_reply(from, msg)
+ end
end
end
diff --git a/lib/grpc/stream/operators.ex b/lib/grpc/stream/operators.ex
index 1f3e15bb..3404abbf 100644
--- a/lib/grpc/stream/operators.ex
+++ b/lib/grpc/stream/operators.ex
@@ -9,9 +9,9 @@ defmodule GRPC.Stream.Operators do
@type reason :: any()
@spec ask(GRPCStream.t(), pid | atom, non_neg_integer) ::
- GRPCStream.t() | {:error, any(), :timeout | :not_alive}
+ GRPCStream.t() | {:error, :timeout | :process_not_alive}
def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
- mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
+ mapper = fn item -> safe_invoke(&do_ask(&1, target, timeout, raise_on_error: false), item) end
%GRPCStream{stream | flow: Flow.map(flow, mapper)}
end
@@ -33,7 +33,7 @@ defmodule GRPC.Stream.Operators do
raise "Target #{inspect(target)} is not alive. Cannot send request to it."
is_nil(resolved_target) ->
- {:error, item, :not_alive}
+ {:error, :process_not_alive}
true ->
send(resolved_target, {:request, item, self()})
@@ -45,25 +45,45 @@ defmodule GRPC.Stream.Operators do
if raise? do
raise "Timeout waiting for response from #{inspect(target)}"
else
- {:error, item, :timeout}
+ {:error, :timeout}
end
end
end
end
+ @spec effect(GRPCStream.t(), (term -> term())) :: GRPCStream.t()
+ def effect(%GRPCStream{flow: flow} = stream, effect_fun) when is_function(effect_fun, 1) do
+ flow =
+ Flow.map(flow, fn flow_item ->
+ tap(flow_item, fn item -> safe_invoke(effect_fun, item) end)
+ end)
+
+ %GRPCStream{stream | flow: flow}
+ end
+
@spec filter(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def filter(%GRPCStream{flow: flow} = stream, filter) do
- %GRPCStream{stream | flow: Flow.filter(flow, filter)}
+ flow_wrapper = Flow.filter(flow, fn item -> safe_invoke(filter, item) end)
+ %GRPCStream{stream | flow: flow_wrapper}
end
@spec flat_map(GRPCStream.t(), (term -> Enumerable.GRPCStream.t())) :: GRPCStream.t()
def flat_map(%GRPCStream{flow: flow} = stream, flat_mapper) do
- %GRPCStream{stream | flow: Flow.flat_map(flow, flat_mapper)}
+ flow_wrapper =
+ Flow.flat_map(flow, fn item ->
+ case safe_invoke(flat_mapper, item) do
+ {:error, reason} -> [{:error, reason}]
+ res -> res
+ end
+ end)
+
+ %GRPCStream{stream | flow: flow_wrapper}
end
@spec map(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def map(%GRPCStream{flow: flow} = stream, mapper) do
- %GRPCStream{stream | flow: Flow.map(flow, mapper)}
+ flow_wrapper = Flow.map(flow, fn item -> safe_invoke(mapper, item) end)
+ %GRPCStream{stream | flow: flow_wrapper}
end
@spec map_with_context(GRPCStream.t(), (map(), term -> term)) :: GRPCStream.t()
@@ -73,7 +93,37 @@ defmodule GRPC.Stream.Operators do
mapper.(meta, item)
end
- %GRPCStream{stream | flow: Flow.map(flow, wrapper)}
+ flow_wrapper = Flow.map(flow, fn item -> safe_invoke(wrapper, item) end)
+
+ %GRPCStream{stream | flow: flow_wrapper}
+ end
+
+ @spec map_error(GRPCStream.t(), (reason -> term)) :: GRPCStream.t()
+ def map_error(%GRPCStream{flow: flow} = stream, func) when is_function(func, 1) do
+ mapper =
+ Flow.map(flow, fn
+ {:error, reason} -> handle_error(func, reason)
+ {:ok, value} -> value
+ other -> other
+ end)
+
+ %GRPCStream{stream | flow: mapper}
+ end
+
+ defp handle_error(func, reason) do
+ case safe_invoke(func, {:error, reason}) do
+ {:error, %GRPC.RPCError{} = rpc_error} ->
+ {:error, rpc_error}
+
+ {:error, other_reason} ->
+ {:error, GRPC.RPCError.exception(message: inspect(other_reason))}
+
+ {:ok, value} ->
+ value
+
+ other ->
+ other
+ end
end
@spec partition(GRPCStream.t(), keyword()) :: GRPCStream.t()
@@ -88,7 +138,8 @@ defmodule GRPC.Stream.Operators do
@spec reject(GRPCStream.t(), (term -> term)) :: GRPCStream.t()
def reject(%GRPCStream{flow: flow} = stream, filter) do
- %GRPCStream{stream | flow: Flow.reject(flow, filter)}
+ flow_wrapper = Flow.reject(flow, fn item -> safe_invoke(filter, item) end)
+ %GRPCStream{stream | flow: flow_wrapper}
end
@spec uniq(GRPCStream.t()) :: GRPCStream.t()
@@ -100,4 +151,26 @@ defmodule GRPC.Stream.Operators do
def uniq_by(%GRPCStream{flow: flow} = stream, fun) do
%GRPCStream{stream | flow: Flow.uniq_by(flow, fun)}
end
+
+ # Normalizes and catches exceptions/throws.
+ # Returns:
+ # value -> successful value
+ # {:error, reason} -> failure
+ # {:error, {:exception, exception}} -> failure due to exception
+ # {:error, {kind, reason}} -> failure due to throw or exit
+ defp safe_invoke(fun, arg) do
+ res = fun.(arg)
+
+ case res do
+ {:ok, v} -> v
+ {:error, reason} -> {:error, reason}
+ other -> other
+ end
+ rescue
+ e ->
+ {:error, {:exception, e}}
+ catch
+ kind, reason ->
+ {:error, {kind, reason}}
+ end
end
diff --git a/test/grpc/integration/server_test.exs b/test/grpc/integration/server_test.exs
index c72d7d5b..36ba357f 100644
--- a/test/grpc/integration/server_test.exs
+++ b/test/grpc/integration/server_test.exs
@@ -4,8 +4,12 @@ defmodule GRPC.Integration.ServerTest do
defmodule FeatureServer do
use GRPC.Server, service: Routeguide.RouteGuide.Service
- def get_feature(point, _stream) do
- %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+ def get_feature(point, materializer) do
+ GRPC.Stream.unary(point, materializer: materializer)
+ |> GRPC.Stream.map(fn point ->
+ %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+ end)
+ |> GRPC.Stream.run()
end
def route_chat(_ex_stream, stream) do
@@ -32,19 +36,25 @@ defmodule GRPC.Integration.ServerTest do
service: Transcode.Messaging.Service,
http_transcode: true
- def get_message(msg_request, _stream) do
- %Transcode.Message{name: msg_request.name, text: "get_message"}
+ def get_message(msg_request, stream) do
+ GRPC.Stream.unary(msg_request, materializer: stream)
+ |> GRPC.Stream.map(fn req ->
+ %Transcode.Message{name: req.name, text: "get_message"}
+ end)
+ |> GRPC.Stream.run()
end
- def stream_messages(msg_request, stream) do
- Enum.each(1..5, fn i ->
- msg = %Transcode.Message{
+ def stream_messages(msg_request, materializer) do
+ 1..5
+ |> Stream.take(5)
+ |> GRPC.Stream.from()
+ |> GRPC.Stream.map(fn i ->
+ %Transcode.Message{
name: msg_request.name,
text: "#{i}"
}
-
- GRPC.Server.send_reply(stream, msg)
end)
+ |> GRPC.Stream.run_with(materializer)
end
def create_message(msg, _stream) do
@@ -62,24 +72,33 @@ defmodule GRPC.Integration.ServerTest do
msg_request.message
end
- def get_message_with_response_body(msg_request, _) do
- %Transcode.MessageOut{
- response: %Transcode.Message{
- name: msg_request.name,
- text: "get_message_with_response_body"
+ def get_message_with_response_body(msg_request, materializer) do
+ GRPC.Stream.unary(msg_request, materializer: materializer)
+ |> GRPC.Stream.map(fn req ->
+ %Transcode.MessageOut{
+ response: %Transcode.Message{
+ name: req.name,
+ text: "get_message_with_response_body"
+ }
}
- }
+ end)
+ |> GRPC.Stream.run()
end
- def get_message_with_query(msg_request, _stream) do
- %Transcode.Message{name: msg_request.name, text: "get_message_with_query"}
+ def get_message_with_query(msg_request, materializer) do
+ GRPC.Stream.unary(msg_request, materializer: materializer)
+ |> GRPC.Stream.map(fn req ->
+ %Transcode.Message{name: req.name, text: "get_message_with_query"}
+ end)
+ |> GRPC.Stream.run()
end
- def get_message_with_subpath_query(msg_request, _stream) do
- %Transcode.Message{
- name: msg_request.message.name,
- text: "get_message_with_subpath_query"
- }
+ def get_message_with_subpath_query(msg_request, materializer) do
+ GRPC.Stream.unary(msg_request, materializer: materializer)
+ |> GRPC.Stream.map(fn req ->
+ %Transcode.Message{name: req.message.name, text: "get_message_with_subpath_query"}
+ end)
+ |> GRPC.Stream.run()
end
end
@@ -168,13 +187,14 @@ defmodule GRPC.Integration.ServerTest do
defmodule SlowServer do
use GRPC.Server, service: Routeguide.RouteGuide.Service
- def list_features(rectangle, stream) do
+ def list_features(rectangle, materializer) do
Process.sleep(400)
+ server_stream = Stream.each([rectangle.lo, rectangle.hi], fn point -> point end)
- Enum.each([rectangle.lo, rectangle.hi], fn point ->
- feature = simple_feature(point)
- GRPC.Server.send_reply(stream, feature)
- end)
+ server_stream
+ |> GRPC.Stream.from()
+ |> GRPC.Stream.map(&simple_feature/1)
+ |> GRPC.Stream.run_with(materializer)
end
defp simple_feature(point) do
diff --git a/test/grpc/stream_test.exs b/test/grpc/stream_test.exs
index 279ee758..e2166233 100644
--- a/test/grpc/stream_test.exs
+++ b/test/grpc/stream_test.exs
@@ -127,7 +127,7 @@ defmodule GRPC.StreamTest do
|> GRPC.Stream.to_flow()
|> Enum.to_list()
- assert result == [{:error, "msg", :not_alive}]
+ assert result == [{:error, :process_not_alive}]
end
end
@@ -165,6 +165,138 @@ defmodule GRPC.StreamTest do
end
end
+ describe "ask/3 error handling" do
+ test "returns timeout error if response not received in time" do
+ pid =
+ spawn_link(fn ->
+ Process.sleep(:infinity)
+ end)
+
+ result =
+ GRPC.Stream.from([:hello])
+ # very short timeout
+ |> GRPC.Stream.ask(pid, 10)
+ |> GRPC.Stream.to_flow()
+ |> Enum.to_list()
+
+ assert result == [{:error, :timeout}]
+ end
+ end
+
+ describe "safe_invoke/2 handling {:ok, value} and direct value" do
+ test "maps {:ok, value} to value" do
+ stream =
+ GRPC.Stream.from([1, 2])
+ |> GRPC.Stream.map(fn x -> {:ok, x * 10} end)
+
+ result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+ assert result == [10, 20]
+ end
+
+ test "keeps direct values as is" do
+ stream =
+ GRPC.Stream.from([1, 2])
+ |> GRPC.Stream.map(fn x -> x * 5 end)
+
+ result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+ assert result == [5, 10]
+ end
+ end
+
+ describe "safe_invoke/2 catches errors" do
+ test "map/2 handles function returning {:error, reason}" do
+ stream =
+ GRPC.Stream.from([1, 2, 3])
+ |> GRPC.Stream.map(fn
+ 2 -> {:error, :fail}
+ x -> x
+ end)
+
+ results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+ assert results == [1, {:error, :fail}, 3]
+ end
+
+ test "map/2 catches exceptions" do
+ stream =
+ GRPC.Stream.from([1, 2])
+ |> GRPC.Stream.map(fn
+ 2 -> raise "boom"
+ x -> x
+ end)
+
+ results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+ assert match?([1, {:error, {:exception, %RuntimeError{message: "boom"}}}], results)
+ end
+
+ test "flat_map/2 catches thrown values" do
+ stream =
+ GRPC.Stream.from([1, 2])
+ |> GRPC.Stream.flat_map(fn
+ 2 -> throw(:fail)
+ x -> [x]
+ end)
+
+ results = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+ assert results == [1, {:error, {:throw, :fail}}]
+ end
+ end
+
+ describe "map_error/2" do
+ test "transforms {:error, reason} tuples" do
+ stream =
+ GRPC.Stream.from([{:error, :invalid_input}, {:ok, 42}, 100])
+ |> GRPC.Stream.map_error(fn
+ {:error, :invalid_input} -> {:error, :mapped_error}
+ msg -> msg
+ end)
+
+ result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+
+ assert Enum.sort(result) == [
+ 42,
+ 100,
+ {:error,
+ %GRPC.RPCError{
+ __exception__: true,
+ details: nil,
+ message: ":mapped_error",
+ status: nil
+ }}
+ ]
+ end
+
+ test "transforms exceptions raised inside previous map" do
+ stream =
+ GRPC.Stream.from([1, 2])
+ |> GRPC.Stream.map(fn
+ 2 -> raise "boom"
+ x -> x
+ end)
+ |> GRPC.Stream.map_error(fn
+ {:error, %RuntimeError{message: "boom"}} ->
+ GRPC.RPCError.exception(message: "Validation or runtime error")
+
+ msg ->
+ msg
+ end)
+
+ result = stream |> GRPC.Stream.to_flow() |> Enum.to_list()
+
+ assert match?(
+ [
+ 1,
+ {:error,
+ %GRPC.RPCError{
+ status: nil,
+ message: "{:exception, %RuntimeError{message: \"boom\"}}",
+ details: nil
+ }}
+ ],
+ result
+ )
+ end
+ end
+
describe "map/2, flat_map/2, filter/2" do
test "maps values correctly" do
result =
@@ -197,6 +329,44 @@ defmodule GRPC.StreamTest do
end
end
+ describe "effect/2" do
+ test "applies side effects without altering values" do
+ parent = self()
+
+ result =
+ GRPC.Stream.from([1, 2, 3])
+ |> GRPC.Stream.effect(fn x -> send(parent, {:effect_called, x}) end)
+ |> GRPC.Stream.to_flow()
+ |> Enum.to_list()
+
+ assert Enum.sort(result) == [1, 2, 3]
+
+ assert_receive {:effect_called, 1}
+ assert_receive {:effect_called, 2}
+ assert_receive {:effect_called, 3}
+ end
+
+ test "continues pipeline even if effect function raises an error" do
+ parent = self()
+
+ result =
+ GRPC.Stream.from([1, 2, 3])
+ |> GRPC.Stream.effect(fn
+ 2 -> raise "boom"
+ x -> send(parent, {:effect_called, x})
+ end)
+ |> GRPC.Stream.to_flow()
+ |> Enum.to_list()
+
+ # Even with error 2, the pipeline should continue and return all elements
+ assert Enum.sort(result) == [1, 2, 3]
+
+ # The effect should have been called for 1 and 3
+ assert_receive {:effect_called, 1}
+ assert_receive {:effect_called, 3}
+ end
+ end
+
describe "test complex operations" do
test "pipeline with all GRPC.Stream operators" do
target =
@@ -277,19 +447,19 @@ defmodule GRPC.StreamTest do
end
end
- defmodule MyGRPCService do
- use GRPC.Server, service: Routeguide.RouteGuide.Service
+ describe "run/1" do
+ defmodule MyGRPCService do
+ use GRPC.Server, service: Routeguide.RouteGuide.Service
- def get_feature(input, materializer) do
- GRPC.Stream.unary(input, materializer: materializer)
- |> GRPC.Stream.map(fn point ->
- %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
- end)
- |> GRPC.Stream.run()
+ def get_feature(input, materializer) do
+ GRPC.Stream.unary(input, materializer: materializer)
+ |> GRPC.Stream.map(fn point ->
+ %Routeguide.Feature{location: point, name: "#{point.latitude},#{point.longitude}"}
+ end)
+ |> GRPC.Stream.run()
+ end
end
- end
- describe "run/1" do
test "runs a unary stream" do
run_server([MyGRPCService], fn port ->
point = %Routeguide.Point{latitude: 409_146_138, longitude: -746_188_906}