Skip to content

Conversation

@sleipnir
Copy link
Collaborator

@sleipnir sleipnir commented Oct 21, 2025

🎯 Context

This PR introduces a new error handling and side-effect model for the GRPC.Stream module, covering both unary and streaming flows.
The implementation improves safety, consistency, and expressiveness across Flow-based pipelines used in gRPC message processing, ensuring that exceptions, errors, and inconsistent return values are handled uniformly and predictably.


🚀 New Features

🧩 1. effect/2

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

GRPC.Stream.from(....)
|> GRPC.Stream.effect(fn x -> Logger.debug("Processing #{x}") end)
|> GRPC.Stream.run()
  • Ensures exceptions raised inside the effect function do not break the stream.
  • Uses the internal safe_invoke/2 mechanism to capture exceptions and thrown values.
  • Behaves like a safe tap/2, preserving all original stream data.

⚙️ 2. map_error/2

Adds a declarative mechanism for mapping or transforming errors ({:error, reason}) within a stream into structured GRPC.RPCError values.

GRPC.Stream.from([1, 2])
|> GRPC.Stream.map(fn
  2 -> raise "boom"
  x -> x
end)
|> GRPC.Stream.map_error(fn
  {:error, reason} ->
    GRPC.RPCError.exception(message: "Validation or runtime error: #{inspect(reason)}")
  other ->
    other
end)

Behavior:

  • {:error, reason} → transformed into %GRPC.RPCError{}
  • {:ok, value} → unwrapped to value
  • exceptions from previous operators (e.g. map/2) → can be transformed into domain-specific GRPC.RPCError
  • normal values are propagated unchanged

This enables localized error recovery and translation, useful for both input validation and unexpected runtime exceptions.


🔄 3. Unified Error Matching and Propagation

All stream operators now use the internal safe_invoke/2 wrapper, which standardizes how functions inside the pipeline behave.

Function result Propagated value Description
{:ok, value} value Normalized success
{:error, reason} {:error, reason} Propagated as-is
raise (exception) {:error, exception} Caught and wrapped
throw value {:error, {:throw, value}} Uniformly captured
raw value same Passed through unchanged

This makes all pipelines exception-safe — any operator can raise or throw, but the stream will continue gracefully with well-defined {:error, reason} items.


💡 4. Integration with run/1 and run_with/3

Both stream finalizers (run/1, run_with/3) have been updated to:

  • Handle {:error, reason} tuples and convert them to %GRPC.RPCError{}.
  • Support both unary and streaming flows.
  • Ensure that errors in one element do not interrupt subsequent elements in the stream.
  • Allow dry_run mode to test flow execution without emitting gRPC responses.

✅ 5. Test Coverage

New and extended tests added in GRPC.StreamTest include:

  • effect/2: ensures side effects are applied safely and do not affect data flow, even when exceptions occur inside the callback.
  • map_error/2: verifies correct error mapping and recovery.
  • Validation of error propagation across all operators (map, flat_map, filter, ask, etc.).
  • Resilience against process failures (:process_not_alive, :timeout) in ask/3.
  • Full pipeline integration tests combining multiple operators and error recovery logic.

🧠 Impact

This refactor:

  • Unifies and simplifies error handling across the entire streaming subsystem.
  • Makes pipelines more robust — no unexpected crashes due to raised errors.
  • Improves observability: all errors now flow through %GRPC.RPCError{}.
  • Adds new extensibility points for metrics, tracing, and recovery.

🔮 Next Steps

  • Integrate tracing hooks to improve runtime observability.
  • Update the docs.

@sleipnir sleipnir requested a review from polvalente October 21, 2025 03:55
@yordis
Copy link
Contributor

yordis commented Oct 21, 2025

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

Other ecosystem call it https://doc.rust-lang.org/std/result/enum.Result.html#method.inspect_err

Eitherway, fine by me

@sleipnir
Copy link
Collaborator Author

A new utility function to safely perform side effects within a stream pipeline without modifying the values being processed.

Other ecosystem call it https://doc.rust-lang.org/std/result/enum.Result.html#method.inspect_err

Eitherway, fine by me

In this case, side effects aren't necessarily linked to an error; side effects in this context are any operation that shouldn't affect the flow. Think of a call to an external system (e.g., sending an email) that shouldn't affect the processing of the main flow, and is therefore a side effect. Then inspect_err would be a misnomer in this context. But I added map_error that can serve this purpose.

sleipnir and others added 3 commits October 22, 2025 00:20
Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
@sleipnir sleipnir requested a review from polvalente October 22, 2025 03:56
@sleipnir sleipnir marked this pull request as ready for review October 22, 2025 19:00
Copy link
Contributor

@yordis yordis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 💜

Comment on lines +109 to 110
GRPC.Stream.unary(request, materializer: materializer)
|> GRPC.Stream.map(fn %HelloReply{} = reply ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
GRPC.Stream.unary(request, materializer: materializer)
|> GRPC.Stream.map(fn %HelloReply{} = reply ->
request
|> GRPC.Stream.unary(materializer: materializer)
|> GRPC.Stream.map(fn %HelloReply{} = reply ->

In this example:

* The function inside `map/2` raises an exception for the value `2`.
* map_error/2 captures and transforms that error into a structured `GRPC.RPCError` response.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* map_error/2 captures and transforms that error into a structured `GRPC.RPCError` response.
* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response.

end

@doc """
Applies a **side-effecting function** to each element of the stream **without altering** its values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Applies a **side-effecting function** to each element of the stream **without altering** its values.
Applies a side-effect function to each element of the stream without altering its values.

Let's remove all of these bold highlights

Comment on lines 275 to +278
## Returns
- Updated stream if successful.
- `{:error, item, reason}` if the request fails or times out.
- `{:error, reason}` if the request fails or times out.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit redundant with the typespec

GRPCStream.t() | {:error, :timeout | :process_not_alive}
def ask(%GRPCStream{flow: flow} = stream, target, timeout \\ 5000) do
mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
# mapper = fn item -> do_ask(item, target, timeout, raise_on_error: false) end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray comment

Comment on lines +57 to +64
%GRPCStream{
stream
| flow:
Flow.map(flow, fn flow_item ->
tap(flow_item, fn item -> safe_invoke(effect_fun, item) end)
end)
}
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's break this up a bit for readability

Comment on lines +106 to +130
Flow.map(flow, fn
{:error, _reason} = item ->
res = safe_invoke(func, item)

case res do
{:error, %GRPC.RPCError{} = new_reason} ->
{:error, new_reason}

{:error, new_reason} ->
msg = "#{inspect(new_reason)}"
{:error, GRPC.RPCError.exception(message: msg)}

{:ok, other} ->
other

other ->
other
end

{:ok, other} ->
other

other ->
other
end)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extract this to a defp?

Comment on lines +48 to +50
Stream.cycle(1..5)
|> Stream.take(5)
|> GRPC.Stream.from()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Stream.cycle(1..5)
|> Stream.take(5)
|> GRPC.Stream.from()
1..5
|> GRPC.Stream.from()

Doesn't this work? If not, we should make it work in a separate PR. Suggestion for if enumerables aren't allowed directly:

Suggested change
Stream.cycle(1..5)
|> Stream.take(5)
|> GRPC.Stream.from()
1..5
|> Stream.take(5)
|> GRPC.Stream.from()

describe "ask/3 error handling" do
test "returns timeout error if response not received in time" do
pid =
spawn(fn ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
spawn(fn ->
spawn_link(fn ->

Comment on lines +172 to +175
# do not send any response
receive do
_ -> :ok
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# do not send any response
receive do
_ -> :ok
end
Process.sleep(:infinity)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also instead of spawning we could have self() be the pid in ask and maybe even have an assert_receive

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants