Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement stream append backpressure with Enumerable.reduce/3 #3

Merged
merged 9 commits into from
Apr 7, 2021
70 changes: 70 additions & 0 deletions guides/writing_events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Writing Events

This guide covers specifics about the `Spear.append/4` function and general
information about the event-writing functionality.

## Enumeration Characteristics

`event_stream` is an `t:Enumerable.t()` which will be lazily written to the
EventStore as elements of the stream are computed and serialized on to the
wire.

This means a few things:

First, you can efficiently emit events from a stream over a large source
such as a CSV file with many lines:

```elixir
File.stream!("large.csv", read_ahead: 100_000)
|> MyCsvParser.parse_stream()
|> Stream.map(&MyCsvParser.turn_csv_line_into_spear_event/1)
|> Spear.append(conn, "ChargesFromCsvs", batch_size: 25)
# => :ok
```

The stream is only fully run after the last bytes have been written to
the gRPC network request: the stream is never computed entirely in memory.

Second, you may (but are _not_ encouraged to) write events via an infinite
stream. A trivial counter mechanism could be implemented like so

```elixir
iex> Stream.iterate(0, &(&1 + 1))
...> |> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
...> |> Spear.append(conn, "InfiniteCounter", timeout: :infinity, expect: :empty)
{:error,
{:grpc_failure, [code: 3, message: "Maximum Append Size of 1048576 Exceeded."]}}
```

Note that while EventStore streams can in theory store infinitely long
streams, they are not practically able to do so. EventStore limits the size
of a single write to `1_048_576` cumulative bytes. This budget can be spent
on one very large event or, as shown above, many tiny events in a single
call to `Spear.append/4`. Attempting to write more than the budget will fail
the request with the above signature and no events in the request will be
written to the EventStore. This value is configurable in the EventStoreDB
configuration.

## Blocking

While `Spear.append/4` blocks the caller for the duration of the request,
it does not fully block the connection. The connection will write chunks of
data over the wire as allowed by HTTP2 window sizes.

HTTP2 includes a back-pressure mechanism for clients sending large amounts
of data to the server faster than the server can handle. Servers negotiate
a maximum number of bytes which the client is allowed to send called a window.
When the window has been exhausted by streaming data to the server, the client
must wait until the server replenishes the window. During the downtime between
exhausting a window and waiting for the server to replenish, Spear suspends
the exhausted request stream and handles incoming messages from the server
as normal. Since HTTP2 window sizes are relatively small (usually somewhere
around the range of 10 and 100 KB), Spear takes conceptual breaks somewhat
often during large requests. This allows Spear to efficiently multiplex large
writes with large reads and subscriptions.
Comment on lines +54 to +64
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this here is the short-doc on what's going on

hopefully it's not too dense/HTTP2y to grok


## Batching

When appending multiple events, Spear will fit as many messages as possible
into the same HTTP2 DATA frame. This is valid according to the gRPC
specification and has the potential to improve performance.
Comment on lines +68 to +70
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notable that this PR makes Spear very greedy about sending as much as possible as soon as possible in as few DATA frames as possible

103 changes: 30 additions & 73 deletions lib/spear.ex
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,11 @@ defmodule Spear do
they will be lazily mapped to `AppendReq` structs before being encoded to
wire data.

See the [Writing Events](TODO) guide for more information about writing
events.

## Options

* `:batch_size` - (default: `1`) the number of messages to write in each
HTTP2 DATA frame. Increasing this number may improve write performance
when writing a very large number of events, but is generally recommended
to keep at `1`. This does not guarantee that each batch will be written in
the same DATA frame. The HTTP2 client may split messages however it sees
fit.
* `:expect` - (default: `:any`) the expectation to set on the
status of the stream. The write will fail if the expectation fails. See
`Spear.ExpectationViolation` for more information about expectations.
Expand All @@ -510,65 +507,14 @@ defmodule Spear do
through the simplified return API, such as the stream's revision number
after writing the events.

## Enumeration Characteristics

`event_stream` is an `t:Enumerable.t()` which will be lazily computed as
events are emitted over-the-wire to the EventStore via gRPC. The procedure
for emitting events roughly follows this pseudo-code

```elixir
initiate_grpc_request()

event_stream
|> Stream.map(&encode_to_wire_format/1)
|> Enum.each(&emit_event/1)

conclude_grpc_request()
```

This means a few things:

First, you can efficiently emit events from a stream over a large source
such as a large CSV file

```elixir
File.stream!("large.csv", read_ahead: 100_000)
|> MyCsvParser.parse_stream()
|> Stream.map(&MyCsvParser.turn_csv_line_into_spear_event/1)
|> Spear.append(conn, "ChargesFromCsvs", batch_size: 25)
# => :ok
```

This pipeline will only run the stream as the events are being written to
the network. Realize, however, that this may not be ideal for all
use-cases: should the `MyCsvParser.turn_csv_line_into_spear_event/1` function
raise an error on the last line of the CSV, the final line of the CSV will
not be written as an event to the EventStore but all events produced prior
to the final line will be.

Second, you may (but are _not_ encouraged to) write events via an infinite
stream. A trivial counter mechanism could be implemented like so
## Examples

```elixir
iex> Stream.iterate(0, &(&1 + 1))
...> |> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
...> |> Spear.append(conn, "InfiniteCounter", timeout: :infinity, expect: :empty)
{:error,
%Mint.HTTPError{
module: Mint.HTTP2,
reason: {:exceeds_window_size, :connection, 26}
}}
```

Note that while EventStore streams can in theory store infinitely long
streams, they are not practically able to do so. More immediately, HTTP2
allows server implementations to direct clients to a maximum window size of
bytes allowed to be sent in a single request. EventStore exerts a reasonably
large window size per connection and request on the client, disallowing
the writing of infinite streams. In cases where a client attempts to write
too many events, `append/4` may fail with the `Mint.HTTPError` depicted
above (though possible with different elements in the second and third
elements of the `:reason` tuple).
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :exists)
:ok
iex> [Spear.Event.new("es_supported_clients", %{})]
...> |> Spear.append(conn, expect: :empty)
{:error, %Spear.ExpectationViolation{current: 1, expected: :empty}}
"""
@spec append(
event_stream :: Enumerable.t(),
Expand All @@ -594,15 +540,26 @@ defmodule Spear do
opts
)

case GenServer.call(conn, {:request, request}, Keyword.fetch!(opts, :timeout)) do
{:ok, response} ->
Spear.Writing.decode_append_response(
response[:data] || <<>>,
opts[:raw?]
)

error ->
error
with {:ok, %{status: 200, headers: headers} = response} <-
GenServer.call(conn, {:request, request}, Keyword.fetch!(opts, :timeout)),
{{"grpc-status", "0"}, _} <- {List.keyfind(headers, "grpc-status", 0), response} do
Spear.Writing.decode_append_response(
response.data,
opts[:raw?]
)
else
{:error, reason} ->
{:error, reason}

# TODO this badly needs a refactor
{{"grpc-status", other_status}, response} ->
{:error,
{:grpc_failure,
code: other_status && String.to_integer(other_status),
message:
Enum.find_value(response.headers, fn {header, value} ->
header == "grpc-message" && value
end)}}
end
end
end
52 changes: 17 additions & 35 deletions lib/spear/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Spear.Connection do
# https://github.com/elixir-mint/mint/blob/796b8db097d69ede7163acba223ab2045c2773a4/pages/Architecture.md

use GenServer
alias Spear.Request
alias Spear.Connection.Request

defstruct [:conn, requests: %{}]

Expand Down Expand Up @@ -44,25 +44,25 @@ defmodule Spear.Connection do

@impl GenServer
def handle_call({:request, request}, from, state) do
case request_and_stream_body(state, request) do
{:ok, state, request_ref} ->
state = put_in(state.requests[request_ref], %{from: from, response: %{}})

case request_and_stream_body(state, request, from) do
{:ok, state} ->
{:noreply, state}

{:error, state, reason} ->
{:reply, {:error, reason}, state}
end
end

defp request_and_stream_body(state, request) do
defp request_and_stream_body(state, request, from) do
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a slight refactor in this module (and specifically this func) which changes the shape of the state struct %__MODULE__{} from having a :responses field (map) to having a :requests field (struct) which has a sub-field of :responses (struct)

this is because requests may be completed asynchronously (w.r.t. the server, the client still blocks) if they are larger than a single HTTP2 window, so we need to store some information about the requests, not just response information

with {:ok, conn, request_ref} <-
Mint.HTTP.request(state.conn, @post, request.path, request.headers, :stream),
request = Request.new(request.messages, request_ref, from),
state = put_in(state.conn, conn),
{:ok, state} <- stream_body(state, request_ref, request.messages),
{:ok, conn} <- Mint.HTTP.stream_request_body(state.conn, request_ref, :eof) do
{:ok, put_in(state.conn, conn), request_ref}
state = put_in(state.requests[request_ref], request),
{:ok, state} <- Request.emit_messages(state, request) do
{:ok, state}
else
{:error, %__MODULE__{} = state, reason} -> {:error, state, reason}
{:error, conn, reason} -> {:error, put_in(state.conn, conn), reason}
end
end
Expand All @@ -83,24 +83,26 @@ defmodule Spear.Connection do

@spec handle_responses(%__MODULE__{}, list()) :: %__MODULE__{}
defp handle_responses(state, responses) do
Enum.reduce(responses, state, &process_response/2)
responses
|> Enum.reduce(state, &process_response/2)
|> Request.continue_requests()
Comment on lines +86 to +88
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this gets invoked whenever the connection receives network traffic from the server

it triggers new attempts at all request streams in state (because a new message from the server may have included WINDOW_UPDATE frames)

end

defp process_response({:status, request_ref, status}, state) do
put_in(state.requests[request_ref].response[:status], status)
put_in(state.requests[request_ref].response.status, status)
end

defp process_response({:headers, request_ref, new_headers}, state) do
update_in(
state.requests[request_ref].response[:headers],
fn headers -> (headers || []) ++ new_headers end
state.requests[request_ref].response.headers,
fn headers -> headers ++ new_headers end
)
end

defp process_response({:data, request_ref, new_data}, state) do
update_in(
state.requests[request_ref].response[:data],
fn data -> (data || <<>>) <> new_data end
state.requests[request_ref].response.data,
fn data -> data <> new_data end
)
end

Expand All @@ -113,24 +115,4 @@ defmodule Spear.Connection do
end

defp process_response(_unknown, state), do: state

defp stream_body(state, request_ref, messages) do
Enum.reduce_while(messages, {:ok, state}, &stream_body_message(&1, &2, request_ref))
end

defp stream_body_message(message, {:ok, state}, request_ref) do
{wire_data, _byte_size} = Request.to_wire_data(message)

stream_result =
Mint.HTTP.stream_request_body(
state.conn,
request_ref,
wire_data
)

case stream_result do
{:ok, conn} -> {:cont, {:ok, put_in(state.conn, conn)}}
{:error, conn, reason} -> {:halt, {:error, put_in(state.conn, conn), reason}}
end
end
Comment on lines -117 to -135
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old buggy streaming implementation

suffered from

  • not being able to emit a single event larger than an HTTP2 window
  • not being able to emit lots of events together, cumulatively larger than an HTTP2 window

still nice to read though because it's so simple compared to what has replaced it

end