-
Couldn't load subscription status.
- Fork 232
[Feat] Error handler in streams #451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
781a7cc
ce286dc
dae7deb
9fe3c55
2a76e93
3af0153
4feee64
bae8b92
0cfb96d
db695ec
4465815
93517c8
b03b837
b7af0d3
5ed5a8f
eb6fdd4
0933617
c73eba3
9bf8fd2
c1daaea
acaf147
f6bffae
140e1a9
4ba7367
e9d27fd
34c8dd4
28ed00c
015fadf
a5c72f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,6 +17,10 @@ | |||||
| - [Unary RPC using Stream API](#unary-rpc-using-stream-api) | ||||||
| - [Server-Side Streaming](#server-side-streaming) | ||||||
| - [Bidirectional Streaming](#bidirectional-streaming) | ||||||
| - [Effects and Error Handling](#effects-and-error-handling) | ||||||
| - [Side Effects](#side-effects-with-effect2) | ||||||
| - [Recovery from errors](#recovery-from-errors) | ||||||
| - [Unified Error Matching and Propagation](#unified-error-matching-and-propagation) | ||||||
| - [Application Startup](#application-startup) | ||||||
| - [Client Usage](#client-usage) | ||||||
| - [Basic Connection and RPC](#basic-connection-and-rpc) | ||||||
|
|
@@ -101,8 +105,8 @@ defmodule HelloworldStreams.Server do | |||||
| alias Helloworld.HelloReply | ||||||
|
|
||||||
| @spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any() | ||||||
| def say_unary_hello(request, _materializer) do | ||||||
| GRPC.Stream.unary(request) | ||||||
| def say_unary_hello(request, materializer) do | ||||||
| GRPC.Stream.unary(request, materializer: materializer) | ||||||
| |> GRPC.Stream.map(fn %HelloReply{} = reply -> | ||||||
| %HelloReply{message: "[Reply] #{reply.message}"} | ||||||
| end) | ||||||
|
|
@@ -144,28 +148,104 @@ def say_bid_stream_hello(request, materializer) do | |||||
| |> GRPC.Stream.run_with(materializer) | ||||||
| end | ||||||
| ``` | ||||||
| The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below: | ||||||
|
|
||||||
| | Function | Description | Parameters / Options | | ||||||
| |:---------------------------------|:-------------|:----------------------| | ||||||
| | **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**<br>โข `input` โ stream, list, or gRPC struct.<br>**Options:**<br>โข `:join_with` โ PID or name of an external `GenStage` producer.<br>โข `:dispatcher` โ dispatcher module (default: `GenStage.DemandDispatcher`).<br>โข `:propagate_context` โ if `true`, propagates the materializer context.<br>โข `:materializer` โ the current `%GRPC.Server.Stream{}`.<br>โข Other options supported by `Flow`. | | ||||||
| | **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**<br>โข `input` โ single gRPC message.<br>**Options:** same as `from/2`. | | ||||||
| | **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}` struct. | | ||||||
| | **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}` with `unary: true` option. | | ||||||
| | **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `materializer` โ `%GRPC.Server.Stream{}`.<br>**Options:**<br>โข `:dry_run` โ if `true`, responses are not sent. | | ||||||
| | **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `target` โ PID or atom.<br>โข `timeout` โ in milliseconds. | | ||||||
| | **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. | | ||||||
| | **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `fun` โ function `(item -> boolean)`. | | ||||||
| | **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `fun` โ `(item -> Enumerable.t())`. | | ||||||
| | **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `fun` โ `(item -> term)`. | | ||||||
| | **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `fun` โ `(context, item -> term)`. | | ||||||
| | **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `opts` โ partitioning options (`Flow.partition/2`). | | ||||||
| | **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `acc_fun` โ initializer function `() -> acc`.<br>โข `reducer_fun` โ `(item, acc -> acc)`. | | ||||||
| | **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`. | | ||||||
| | **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**<br>โข `stream` โ `%GRPC.Stream{}`.<br>โข `fun` โ `(item -> term)` for uniqueness determination. | | ||||||
| | **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**<br>โข `stream` โ `%GRPC.Server.Stream{}`.<br>**Returns:** `map` containing decoded headers. | | ||||||
|
|
||||||
| For a complete list of available operators see [here](lib/grpc/stream.ex). | ||||||
| The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex). | ||||||
|
|
||||||
| --- | ||||||
|
|
||||||
| ### Effects and Error Handling | ||||||
|
|
||||||
| #### Side Effects | ||||||
|
|
||||||
| The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications. | ||||||
|
|
||||||
| Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values โ it preserves the original stream while executing the provided callback safely for each emitted element. | ||||||
|
|
||||||
| ```elixir | ||||||
| iex> parent = self() | ||||||
| iex> stream = | ||||||
| ...> GRPC.Stream.from([1, 2, 3]) | ||||||
| ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end) | ||||||
| ...> |> GRPC.Stream.to_flow() | ||||||
| ...> |> Enum.to_list() | ||||||
| iex> assert_receive {:seen, 2} | ||||||
| iex> assert_receive {:seen, 4} | ||||||
| iex> assert_receive {:seen, 6} | ||||||
| iex> stream | ||||||
| [1, 2, 3] | ||||||
| ``` | ||||||
|
|
||||||
| Key characteristics: | ||||||
|
|
||||||
| * The callback function (`effect_fun`) is invoked for each item emitted downstream. | ||||||
| * The result of the callback is ignored, ensuring that the streamโs structure and values remain unchanged. | ||||||
| * Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1. | ||||||
| * Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow. | ||||||
|
|
||||||
| This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream. | ||||||
|
|
||||||
| --- | ||||||
|
|
||||||
| #### Recovery from errors | ||||||
|
|
||||||
| The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline. | ||||||
|
|
||||||
| It provides a unified mechanism for handling: | ||||||
|
|
||||||
| * Expected errors, such as validation or domain failures (`{:error, reason}`) | ||||||
| * Unexpected runtime errors, including raised or thrown exceptions inside other operators. | ||||||
|
|
||||||
| ```elixir | ||||||
| iex> GRPC.Stream.from([1, 2]) | ||||||
| ...> |> GRPC.Stream.map(fn | ||||||
| ...> 2 -> raise "boom" | ||||||
| ...> x -> x | ||||||
| ...> end) | ||||||
| ...> |> GRPC.Stream.map_error(fn | ||||||
| ...> {:error, {:exception, _reason}} -> | ||||||
| ...> {:error, GRPC.RPCError.exception(message: "Booomm")} | ||||||
| ...> end) | ||||||
| ``` | ||||||
|
|
||||||
| In this example: | ||||||
|
|
||||||
| * The function inside `map/2` raises an exception for the value `2`. | ||||||
| * map_error/2 captures and transforms that error into a structured `GRPC.RPCError` response. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| * The stream continues processing without being interrupted. | ||||||
|
|
||||||
| This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines. | ||||||
|
|
||||||
| --- | ||||||
|
|
||||||
| #### Unified Error Matching and Propagation | ||||||
|
|
||||||
| All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline. | ||||||
|
|
||||||
| This ensures that user-defined functions within the stream โ whether pure transformations, side effects, or external calls โ always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors. | ||||||
|
|
||||||
| ```elixir | ||||||
| def say_unary_hello(request, _materializer) do | ||||||
| GRPCStream.unary(request) | ||||||
| |> GRPCStream.ask(Transformer) | ||||||
| |> GRPCStream.map(fn | ||||||
| %HelloReply{} = reply -> | ||||||
| %HelloReply{message: "[Reply] #{reply.message}"} | ||||||
|
|
||||||
| {:error, reason} -> | ||||||
| {:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")} | ||||||
|
|
||||||
| error -> | ||||||
| Logger.error("Unknown error") | ||||||
| error | ||||||
| end) | ||||||
| |> GRPCStream.run() | ||||||
| end | ||||||
| ``` | ||||||
|
|
||||||
| By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution. | ||||||
|
|
||||||
| This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors. | ||||||
|
|
||||||
| >_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context. | ||||||
|
|
||||||
| --- | ||||||
|
|
||||||
|
|
@@ -175,7 +255,7 @@ Add the server supervisor to your application's supervision tree: | |||||
|
|
||||||
| ```elixir | ||||||
| defmodule Helloworld.Application do | ||||||
| @ false | ||||||
| @moduledoc false | ||||||
| use Application | ||||||
|
|
||||||
| @impl true | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -158,7 +158,7 @@ defmodule GRPC.Stream do | |||||
| "GRPC.Stream.run/1 requires a materializer to be set in the GRPC.Stream" | ||||||
| end | ||||||
|
|
||||||
| send_response(materializer, Enum.at(flow, 0)) | ||||||
| send_response(materializer, Enum.at(flow, 0), opts) | ||||||
|
|
||||||
| :noreply | ||||||
| end | ||||||
|
|
@@ -193,19 +193,71 @@ defmodule GRPC.Stream do | |||||
| raise ArgumentError, "run_with/3 is not supported for unary streams" | ||||||
| end | ||||||
|
|
||||||
| dry_run? = Keyword.get(opts, :dry_run, false) | ||||||
|
|
||||||
| flow | ||||||
| |> Flow.map(fn msg -> | ||||||
| if not dry_run? do | ||||||
| send_response(from, msg) | ||||||
| end | ||||||
|
|
||||||
| flow | ||||||
| |> Flow.map(fn | ||||||
| {:ok, msg} -> | ||||||
| send_response(from, msg, opts) | ||||||
| flow | ||||||
|
|
||||||
| {:error, %GRPC.RPCError{} = reason} -> | ||||||
| send_response(from, reason, opts) | ||||||
| flow | ||||||
|
|
||||||
| {:error, reason} -> | ||||||
| msg = GRPC.RPCError.exception(message: "#{inspect(reason)}") | ||||||
| send_response(from, msg, opts) | ||||||
| flow | ||||||
|
|
||||||
| msg -> | ||||||
| send_response(from, msg, opts) | ||||||
| flow | ||||||
| end) | ||||||
| |> Flow.run() | ||||||
| end | ||||||
|
|
||||||
| @doc """ | ||||||
| Applies a **side-effecting function** to each element of the stream **without altering** its values. | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Let's remove all of these bold highlights |
||||||
|
|
||||||
| The `effect/2` function is useful for performing **imperative or external actions** | ||||||
sleipnir marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| (such as logging, sending messages, collecting metrics, or debugging) | ||||||
| while preserving the original stream data. | ||||||
|
|
||||||
| It behaves like `Enum.each/2`, but returns the stream itself so it can continue in the pipeline. | ||||||
|
|
||||||
| ## Examples | ||||||
|
|
||||||
| ```elixir | ||||||
| iex> parent = self() | ||||||
| iex> stream = | ||||||
| ...> GRPC.Stream.from([1, 2, 3]) | ||||||
| ...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x*2}) end) | ||||||
| ...> |> GRPC.Stream.to_flow() | ||||||
| ...> |> Enum.to_list() | ||||||
| iex> assert_receive {:seen, 2} | ||||||
| iex> assert_receive {:seen, 4} | ||||||
| iex> assert_receive {:seen, 6} | ||||||
| iex> stream | ||||||
| [1, 2, 3] | ||||||
| ``` | ||||||
| In this example, the effect/2 function sends a message to the current process | ||||||
| for each element in the stream, but the resulting stream values remain unchanged. | ||||||
|
|
||||||
| ## Parameters | ||||||
|
|
||||||
| - `stream` โ The input `GRPC.Stream`. | ||||||
| - `effect_fun` โ A function that receives each item and performs a side effect | ||||||
| (e.g. IO.inspect/1, Logger.info/1, send/2, etc.). | ||||||
|
|
||||||
| ### Notes | ||||||
|
|
||||||
| - This function is **lazy** โ the `effect_fun` will only run once the stream is materialized | ||||||
| (e.g. via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`). | ||||||
| - The use of `effect/2` ensures that the original item is returned unchanged, | ||||||
| enabling seamless continuation of the pipeline. | ||||||
| """ | ||||||
| @spec effect(t(), (term -> any)) :: t() | ||||||
| defdelegate effect(stream, effect_fun), to: Operators | ||||||
|
|
||||||
| @doc """ | ||||||
| Sends a request to an external process and awaits a response. | ||||||
|
|
||||||
|
|
@@ -223,9 +275,9 @@ defmodule GRPC.Stream do | |||||
| ## Returns | ||||||
|
|
||||||
| - Updated stream if successful. | ||||||
| - `{:error, item, reason}` if the request fails or times out. | ||||||
| - `{:error, reason}` if the request fails or times out. | ||||||
|
Comment on lines
275
to
+278
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit redundant with the typespec |
||||||
| """ | ||||||
| @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, item(), reason()} | ||||||
| @spec ask(t(), pid | atom, non_neg_integer) :: t() | {:error, reason()} | ||||||
| defdelegate ask(stream, target, timeout \\ 5000), to: Operators | ||||||
|
|
||||||
| @doc """ | ||||||
|
|
@@ -261,6 +313,64 @@ defmodule GRPC.Stream do | |||||
| @spec map(t(), (term -> term)) :: t() | ||||||
| defdelegate map(stream, mapper), to: Operators | ||||||
|
|
||||||
| @doc """ | ||||||
| Intercepts and transforms error tuples or unexpected exceptions that occur | ||||||
| within a gRPC stream pipeline. | ||||||
|
|
||||||
| `map_error/3` allows graceful handling or recovery from errors produced by previous | ||||||
| operators (e.g. `map/2`, `flat_map/2`) or from validation logic applied to incoming data. | ||||||
|
|
||||||
| The provided `handler/1` function receives the error reason (or the exception struct) like: | ||||||
|
|
||||||
| {:error, reason} -> failure | ||||||
| {:error, {:exception, exception}} -> failure due to exception | ||||||
| {:error, {kind, reason}} -> failure due to throw or exit | ||||||
|
|
||||||
| And can either: | ||||||
|
|
||||||
| * Return a new error tuple โ e.g. `{:error, new_reason}` โ to re-emit a modified error. | ||||||
| * Return any other value to recover from the failure and continue the pipeline. | ||||||
|
|
||||||
| This makes it suitable for both input validation and capturing unexpected runtime errors | ||||||
| in stream transformations. | ||||||
|
|
||||||
| ## Parameters | ||||||
|
|
||||||
| - `stream` โ The input stream or `Flow` pipeline. | ||||||
| - `func` โ A function that takes an error reason or exception and returns either a new value or an error tuple. | ||||||
|
|
||||||
| ## Returns | ||||||
|
|
||||||
| - A new stream where all error tuples and raised exceptions are processed by `func/1`. | ||||||
|
|
||||||
| ## Examples | ||||||
|
|
||||||
| iex> GRPC.Stream.from([1, 2]) | ||||||
| ...> |> GRPC.Stream.map(fn | ||||||
| ...> 2 -> raise "boom" | ||||||
| ...> x -> x | ||||||
| ...> end) | ||||||
| ...> |> GRPC.Stream.map_error(fn | ||||||
| ...> {:error, {:exception, _reason}} -> | ||||||
| ...> {:error, GRPC.RPCError.exception(message: "Validation or runtime error")} | ||||||
| ...> end) | ||||||
|
|
||||||
| In this example: | ||||||
|
|
||||||
| * The call to `GRPC.Stream.map/2` raises an exception for value `2`. | ||||||
| * `map_error/3` catches the error and wraps it in a `GRPC.RPCError` struct with a custom message. | ||||||
| * The pipeline continues execution, transforming errors into structured responses. | ||||||
|
|
||||||
| ## Notes | ||||||
|
|
||||||
| - `map_error/3` is **lazy** and only executes when the stream is materialized | ||||||
| (via `GRPC.Stream.run/1` or `GRPC.Stream.run_with/3`). | ||||||
|
|
||||||
| - Use this operator to implement **robust error recovery**, **input validation**, or | ||||||
| to normalize exceptions from downstream Flow stages into well-defined gRPC errors. | ||||||
| """ | ||||||
| defdelegate map_error(stream, func), to: Operators | ||||||
|
|
||||||
| @doc """ | ||||||
| Applies a transformation function to each stream item, passing the context as an additional argument. | ||||||
| This is useful for operations that require access to the stream's headers. | ||||||
|
|
@@ -386,7 +496,11 @@ defmodule GRPC.Stream do | |||||
| %__MODULE__{flow: flow, options: opts} | ||||||
| end | ||||||
|
|
||||||
| defp send_response(from, msg) do | ||||||
| GRPC.Server.send_reply(from, msg) | ||||||
| defp send_response(from, msg, opts) do | ||||||
| dry_run? = Keyword.get(opts, :dry_run, false) | ||||||
|
|
||||||
| if not dry_run? do | ||||||
| GRPC.Server.send_reply(from, msg) | ||||||
| end | ||||||
| end | ||||||
| end | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.