-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement stream append backpressure with Enumerable.reduce/3 #3
Changes from all commits
6168ce5
68dcf2e
369e44f
18d6b91
3c17338
98c1dc1
0051550
72a58e9
88ae1d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# Writing Events | ||
|
||
This guide covers specifics about the `Spear.append/4` function and general | ||
information about the event-writing functionality. | ||
|
||
## Enumeration Characteristics | ||
|
||
`event_stream` is an `t:Enumerable.t()` which will be lazily written to the | ||
EventStore as elements of the stream are computed and serialized on to the | ||
wire. | ||
|
||
This means a few things: | ||
|
||
First, you can efficiently emit events from a stream over a large source | ||
such as a CSV file with many lines: | ||
|
||
```elixir | ||
File.stream!("large.csv", read_ahead: 100_000) | ||
|> MyCsvParser.parse_stream() | ||
|> Stream.map(&MyCsvParser.turn_csv_line_into_spear_event/1) | ||
|> Spear.append(conn, "ChargesFromCsvs", timeout: :infinity) | ||
# => :ok | ||
``` | ||
|
||
The stream is only fully run after the last bytes have been written to | ||
the gRPC network request: the stream is never computed entirely in memory. | ||
|
||
Second, you may (but are _not_ encouraged to) write events via an infinite | ||
stream. A trivial counter mechanism could be implemented like so | ||
|
||
```elixir | ||
iex> Stream.iterate(0, &(&1 + 1)) | ||
...> |> Stream.map(fn n -> Spear.Event.new("incremented", n) end) | ||
...> |> Spear.append(conn, "InfiniteCounter", timeout: :infinity, expect: :empty) | ||
{:error, | ||
{:grpc_failure, [code: 3, message: "Maximum Append Size of 1048576 Exceeded."]}} | ||
``` | ||
|
||
Note that while EventStore streams can in theory store infinitely long | ||
streams, they are not practically able to do so. EventStore limits the size | ||
of a single write to `1_048_576` cumulative bytes. This budget can be spent | ||
on one very large event or, as shown above, many tiny events in a single | ||
call to `Spear.append/4`. Attempting to write more than the budget will fail | ||
the request with the above signature and no events in the request will be | ||
written to the EventStore. This value is configurable in the EventStoreDB | ||
configuration. | ||
|
||
## Blocking | ||
|
||
While `Spear.append/4` blocks the caller for the duration of the request, | ||
it does not fully block the connection. The connection will write chunks of | ||
data over the wire as allowed by HTTP2 window sizes. | ||
|
||
HTTP2 includes a back-pressure mechanism for clients sending large amounts | ||
of data to the server faster than the server can handle. Servers negotiate | ||
a maximum number of bytes which the client is allowed to send called a window. | ||
When the window has been exhausted by streaming data to the server, the client | ||
must wait until the server replenishes the window. During the downtime between | ||
exhausting a window and waiting for the server to replenish, Spear suspends | ||
the exhausted request stream and handles incoming messages from the server | ||
as normal. Since HTTP2 window sizes are relatively small (usually somewhere | ||
around the range of 10 and 100 KB), Spear takes conceptual breaks somewhat | ||
often during large requests. This allows Spear to efficiently multiplex large | ||
writes with large reads and subscriptions. | ||
|
||
## Batching | ||
|
||
When appending multiple events, Spear will fit as many messages as possible | ||
into the same HTTP2 DATA frame. This is valid according to the gRPC | ||
specification and has the potential to improve performance. | ||
Comment on lines
+68
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ defmodule Spear.Connection do | |
# https://github.com/elixir-mint/mint/blob/796b8db097d69ede7163acba223ab2045c2773a4/pages/Architecture.md | ||
|
||
use GenServer | ||
alias Spear.Request | ||
alias Spear.Connection.Request | ||
|
||
defstruct [:conn, requests: %{}] | ||
|
||
|
@@ -44,25 +44,25 @@ defmodule Spear.Connection do | |
|
||
@impl GenServer | ||
def handle_call({:request, request}, from, state) do | ||
case request_and_stream_body(state, request) do | ||
{:ok, state, request_ref} -> | ||
state = put_in(state.requests[request_ref], %{from: from, response: %{}}) | ||
|
||
case request_and_stream_body(state, request, from) do | ||
{:ok, state} -> | ||
{:noreply, state} | ||
|
||
{:error, state, reason} -> | ||
{:reply, {:error, reason}, state} | ||
end | ||
end | ||
|
||
defp request_and_stream_body(state, request) do | ||
defp request_and_stream_body(state, request, from) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's a slight refactor in this module (and specifically this func) which changes the shape of the state struct this is because requests may be completed asynchronously (w.r.t. the server, the client still blocks) if they are larger than a single HTTP2 window, so we need to store some information about the requests, not just response information |
||
with {:ok, conn, request_ref} <- | ||
Mint.HTTP.request(state.conn, @post, request.path, request.headers, :stream), | ||
request = Request.new(request.messages, request_ref, from), | ||
state = put_in(state.conn, conn), | ||
{:ok, state} <- stream_body(state, request_ref, request.messages), | ||
{:ok, conn} <- Mint.HTTP.stream_request_body(state.conn, request_ref, :eof) do | ||
{:ok, put_in(state.conn, conn), request_ref} | ||
state = put_in(state.requests[request_ref], request), | ||
{:ok, state} <- Request.emit_messages(state, request) do | ||
{:ok, state} | ||
else | ||
{:error, %__MODULE__{} = state, reason} -> {:error, state, reason} | ||
{:error, conn, reason} -> {:error, put_in(state.conn, conn), reason} | ||
end | ||
end | ||
|
@@ -83,24 +83,26 @@ defmodule Spear.Connection do | |
|
||
@spec handle_responses(%__MODULE__{}, list()) :: %__MODULE__{} | ||
defp handle_responses(state, responses) do | ||
Enum.reduce(responses, state, &process_response/2) | ||
responses | ||
|> Enum.reduce(state, &process_response/2) | ||
|> Request.continue_requests() | ||
Comment on lines
+86
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this gets invoked whenever the connection receives network traffic from the server it triggers new attempts at all request streams in state (because a new message from the server may have included WINDOW_UPDATE frames) |
||
end | ||
|
||
defp process_response({:status, request_ref, status}, state) do | ||
put_in(state.requests[request_ref].response[:status], status) | ||
put_in(state.requests[request_ref].response.status, status) | ||
end | ||
|
||
defp process_response({:headers, request_ref, new_headers}, state) do | ||
update_in( | ||
state.requests[request_ref].response[:headers], | ||
fn headers -> (headers || []) ++ new_headers end | ||
state.requests[request_ref].response.headers, | ||
fn headers -> headers ++ new_headers end | ||
) | ||
end | ||
|
||
defp process_response({:data, request_ref, new_data}, state) do | ||
update_in( | ||
state.requests[request_ref].response[:data], | ||
fn data -> (data || <<>>) <> new_data end | ||
state.requests[request_ref].response.data, | ||
fn data -> data <> new_data end | ||
) | ||
end | ||
|
||
|
@@ -113,24 +115,4 @@ defmodule Spear.Connection do | |
end | ||
|
||
defp process_response(_unknown, state), do: state | ||
|
||
defp stream_body(state, request_ref, messages) do | ||
Enum.reduce_while(messages, {:ok, state}, &stream_body_message(&1, &2, request_ref)) | ||
end | ||
|
||
defp stream_body_message(message, {:ok, state}, request_ref) do | ||
{wire_data, _byte_size} = Request.to_wire_data(message) | ||
|
||
stream_result = | ||
Mint.HTTP.stream_request_body( | ||
state.conn, | ||
request_ref, | ||
wire_data | ||
) | ||
|
||
case stream_result do | ||
{:ok, conn} -> {:cont, {:ok, put_in(state.conn, conn)}} | ||
{:error, conn, reason} -> {:halt, {:error, put_in(state.conn, conn), reason}} | ||
end | ||
end | ||
Comment on lines
-117
to
-135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. old buggy streaming implementation suffered from
still nice to read though because it's so simple compared to what has replaced it |
||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this here is the short-doc on what's going on
hopefully it's not too dense/HTTP2y to grok