implement stream append backpressure with Enumerable.reduce/3#3
implement stream append backpressure with Enumerable.reduce/3#3the-mikedavis merged 9 commits intomainfrom
Conversation
also unrequires elixir 1.10 because we're not using the is_connection_message/2 macro
the-mikedavis
left a comment
There was a problem hiding this comment.
ultimately this came out much prettier than #2 and I believe it avoids all of the race conditions that PR made (hopefully all the race conditions!)
| HTTP2 includes a back-pressure mechanism for clients sending large amounts | ||
| of data to the server faster than the server can handle. Servers negotiate | ||
| a maximum number of bytes which the client is allowed to send called a window. | ||
| When the window has been exhausted by streaming data to the server, the client | ||
| must wait until the server replenishes the window. During the downtime between | ||
| exhausting a window and waiting for the server to replenish, Spear suspends | ||
| the exhausted request stream and handles incoming messages from the server | ||
| as normal. Since HTTP2 window sizes are relatively small (usually somewhere | ||
| around the range of 10 and 100 KB), Spear takes conceptual breaks somewhat | ||
| often during large requests. This allows Spear to efficiently multiplex large | ||
| writes with large reads and subscriptions. |
There was a problem hiding this comment.
this here is the short-doc on what's going on
hopefully it's not too dense/HTTP2y to grok
| end | ||
|
|
||
| defp request_and_stream_body(state, request) do | ||
| defp request_and_stream_body(state, request, from) do |
There was a problem hiding this comment.
there's a slight refactor in this module (and specifically this func) which changes the shape of the state struct %__MODULE__{} from having a :responses field (map) to having a :requests field (struct) which has a sub-field of :responses (struct)
this is because requests may be completed asynchronously (w.r.t. the server, the client still blocks) if they are larger than a single HTTP2 window, so we need to store some information about the requests, not just response information
| responses | ||
| |> Enum.reduce(state, &process_response/2) | ||
| |> Request.continue_requests() |
There was a problem hiding this comment.
this gets invoked whenever the connection receives network traffic from the server
it triggers new attempts at all request streams in state (because a new message from the server may have included WINDOW_UPDATE frames)
| defp stream_body(state, request_ref, messages) do | ||
| Enum.reduce_while(messages, {:ok, state}, &stream_body_message(&1, &2, request_ref)) | ||
| end | ||
|
|
||
| defp stream_body_message(message, {:ok, state}, request_ref) do | ||
| {wire_data, _byte_size} = Request.to_wire_data(message) | ||
|
|
||
| stream_result = | ||
| Mint.HTTP.stream_request_body( | ||
| state.conn, | ||
| request_ref, | ||
| wire_data | ||
| ) | ||
|
|
||
| case stream_result do | ||
| {:ok, conn} -> {:cont, {:ok, put_in(state.conn, conn)}} | ||
| {:error, conn, reason} -> {:halt, {:error, put_in(state.conn, conn), reason}} | ||
| end | ||
| end |
There was a problem hiding this comment.
old buggy streaming implementation
suffered from
- not being able to emit a single event larger than an HTTP2 window
- not being able to emit lots of events together, cumulatively larger than an HTTP2 window
still nice to read though because it's so simple compared to what has replaced it
| reducer = &reduce_with_suspend/2 | ||
| stream = Stream.map(event_stream, &Spear.Request.to_wire_data/1) | ||
| continuation = &Enumerable.reduce(stream, &1, reducer) |
There was a problem hiding this comment.
this is where our enumerable magic begins
it's worth checking out this implementation in GenStage which reads demand (integer) number of elements from a stream in a similar fashion: https://github.com/elixir-lang/gen_stage/blob/6cb08ee2e892b8aff0fcc57f45e05c8d02dddcb2/lib/gen_stage/streamer.ex#L11-L27
|
|
||
| @spec emit_messages(%Spear.Connection{}, %__MODULE__{}) :: | ||
| {:ok, %Spear.Connection{}} | {:error, %Spear.Connection{}, reason :: any()} | ||
| def emit_messages(state, %__MODULE__{buffer: <<>>, continuation: continuation} = request) do |
There was a problem hiding this comment.
it's not super obvious (especially because it doesn't work like this for %{}) but <<>> is a match on only the empty binary/string
iex> match?(<<>>, "hello")
false
iex> match?(<<>>, "")
true
iex> match?(%{}, %{"fizz" => "buzz"})
true
iex> match?(%{}, %{})
trueWhy do we need to buffer stuff at all?
We have the request.continuation function which will unfold more messages/iodatas out of the stream, but the way that reduce_with_suspend/2 is currently written (and must be written unless I'm mistaken) is that we read until we exceed our smallest window
Why wait until we exceed?
- a single message may itself exceed one or more window(s)
- there's no way to read-ahead in the stream or put an element we've already reduced out of the stream back into the stream
- it's not intuitively obvious why this is the case, I can expound if you're interested
so we
- read the stream until we exceed the window
- we suspend the stream upon exceeding the window
- write all available bytes up to the window size
- we can and do write partial messages
- according to the gRPC spec: "DATA frame boundaries have no relation to Length-Prefixed-Message boundaries and implementations should make no assumptions about their alignment." so this is allowed, possible, and correct
- buffer remaining bytes and the next continuation function in the
%__MODULE__{}request struct - go back to happily waiting as GenServers do for more info messages
- if we receive an info message off the wire, the connection will give each stream request in state another attempt
|
|
||
| def emit_messages( | ||
| state, | ||
| %__MODULE__{buffer: buffer, continuation: continuation} = request |
There was a problem hiding this comment.
if we have some bytes buffered in the our request in state
- see if the buffer will fit in the new window
- if it does fit in the new window
- continue the stream using the current buffer as the first message in the reductions new accumulator
- if it does not fit in the new window
- take as much of the buffer as we can and say that the
restis our new buffer - go back to waiting for the server to tell us we have a new window
- take as much of the buffer as we can and say that the
and this works! I have some events in my local EventStore now that are exactly 1048576B (~1MB) (where the typical HTTP2 window is 10-100KB)
| state, | ||
| request | ||
| ) | ||
| when finished in [:done, :halted] do |
There was a problem hiding this comment.
I wasn't clear on exactly what the difference is here. Best I can tell
:done: finite enumerables like lists:halted: streams
| When appending multiple events, Spear will fit as many messages as possible | ||
| into the same HTTP2 DATA frame. This is valid according to the gRPC | ||
| specification and has the potential to improve performance. |

closes #1
uses continuations (
t:Enumerable.continuation/0) to suspend a stream temporarily while our little client here waits for the server to give us more WINDOW_UPDATE frames (the HTTP2 client->server request backpressure mechanism)