New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement stream append backpressure with Enumerable.reduce/3 #3
Conversation
also unrequires elixir 1.10 because we're not using the is_connection_message/2 macro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultimately this came out much prettier than #2 and I believe it avoids all of the race conditions that PR made (hopefully all the race conditions!)
HTTP2 includes a back-pressure mechanism for clients sending large amounts | ||
of data to the server faster than the server can handle. Servers negotiate | ||
a maximum number of bytes which the client is allowed to send called a window. | ||
When the window has been exhausted by streaming data to the server, the client | ||
must wait until the server replenishes the window. During the downtime between | ||
exhausting a window and waiting for the server to replenish, Spear suspends | ||
the exhausted request stream and handles incoming messages from the server | ||
as normal. Since HTTP2 window sizes are relatively small (usually somewhere | ||
around the range of 10 and 100 KB), Spear takes conceptual breaks somewhat | ||
often during large requests. This allows Spear to efficiently multiplex large | ||
writes with large reads and subscriptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this here is the short-doc on what's going on
hopefully it's not too dense/HTTP2y to grok
{:noreply, state} | ||
|
||
{:error, state, reason} -> | ||
{:reply, {:error, reason}, state} | ||
end | ||
end | ||
|
||
defp request_and_stream_body(state, request) do | ||
defp request_and_stream_body(state, request, from) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a slight refactor in this module (and specifically this func) which changes the shape of the state struct %__MODULE__{}
from having a :responses
field (map) to having a :requests
field (struct) which has a sub-field of :responses
(struct)
this is because requests may be completed asynchronously (w.r.t. the server, the client still blocks) if they are larger than a single HTTP2 window, so we need to store some information about the requests, not just response information
responses | ||
|> Enum.reduce(state, &process_response/2) | ||
|> Request.continue_requests() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this gets invoked whenever the connection receives network traffic from the server
it triggers new attempts at all request streams in state (because a new message from the server may have included WINDOW_UPDATE frames)
defp stream_body(state, request_ref, messages) do | ||
Enum.reduce_while(messages, {:ok, state}, &stream_body_message(&1, &2, request_ref)) | ||
end | ||
|
||
defp stream_body_message(message, {:ok, state}, request_ref) do | ||
{wire_data, _byte_size} = Request.to_wire_data(message) | ||
|
||
stream_result = | ||
Mint.HTTP.stream_request_body( | ||
state.conn, | ||
request_ref, | ||
wire_data | ||
) | ||
|
||
case stream_result do | ||
{:ok, conn} -> {:cont, {:ok, put_in(state.conn, conn)}} | ||
{:error, conn, reason} -> {:halt, {:error, put_in(state.conn, conn), reason}} | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
old buggy streaming implementation
suffered from
- not being able to emit a single event larger than an HTTP2 window
- not being able to emit lots of events together, cumulatively larger than an HTTP2 window
still nice to read though because it's so simple compared to what has replaced it
reducer = &reduce_with_suspend/2 | ||
stream = Stream.map(event_stream, &Spear.Request.to_wire_data/1) | ||
continuation = &Enumerable.reduce(stream, &1, reducer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is where our enumerable magic begins
it's worth checking out this implementation in GenStage which reads demand
(integer) number of elements from a stream in a similar fashion: https://github.com/elixir-lang/gen_stage/blob/6cb08ee2e892b8aff0fcc57f45e05c8d02dddcb2/lib/gen_stage/streamer.ex#L11-L27
|
||
@spec emit_messages(%Spear.Connection{}, %__MODULE__{}) :: | ||
{:ok, %Spear.Connection{}} | {:error, %Spear.Connection{}, reason :: any()} | ||
def emit_messages(state, %__MODULE__{buffer: <<>>, continuation: continuation} = request) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not super obvious (especially because it doesn't work like this for %{}
) but <<>>
is a match on only the empty binary/string
iex> match?(<<>>, "hello")
false
iex> match?(<<>>, "")
true
iex> match?(%{}, %{"fizz" => "buzz"})
true
iex> match?(%{}, %{})
true
Why do we need to buffer stuff at all?
We have the request.continuation
function which will unfold more messages/iodatas out of the stream, but the way that reduce_with_suspend/2
is currently written (and must be written unless I'm mistaken) is that we read until we exceed our smallest window
Why wait until we exceed?
- a single message may itself exceed one or more window(s)
- there's no way to read-ahead in the stream or put an element we've already reduced out of the stream back into the stream
- it's not intuitively obvious why this is the case, I can expound if you're interested
so we
- read the stream until we exceed the window
- we suspend the stream upon exceeding the window
- write all available bytes up to the window size
- we can and do write partial messages
- according to the gRPC spec: "DATA frame boundaries have no relation to Length-Prefixed-Message boundaries and implementations should make no assumptions about their alignment." so this is allowed, possible, and correct
- buffer remaining bytes and the next continuation function in the
%__MODULE__{}
request struct - go back to happily waiting as GenServers do for more info messages
- if we receive an info message off the wire, the connection will give each stream request in state another attempt
|
||
def emit_messages( | ||
state, | ||
%__MODULE__{buffer: buffer, continuation: continuation} = request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have some bytes buffered in the our request in state
- see if the buffer will fit in the new window
- if it does fit in the new window
- continue the stream using the current buffer as the first message in the reductions new accumulator
- if it does not fit in the new window
- take as much of the buffer as we can and say that the
rest
is our new buffer - go back to waiting for the server to tell us we have a new window
- take as much of the buffer as we can and say that the
and this works! I have some events in my local EventStore now that are exactly 1048576B (~1MB) (where the typical HTTP2 window is 10-100KB)
state, | ||
request | ||
) | ||
when finished in [:done, :halted] do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't clear on exactly what the difference is here. Best I can tell
:done
: finite enumerables like lists:halted
: streams
When appending multiple events, Spear will fit as many messages as possible | ||
into the same HTTP2 DATA frame. This is valid according to the gRPC | ||
specification and has the potential to improve performance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closes #1
uses continuations (
t:Enumerable.continuation/0
) to suspend a stream temporarily while our little client here waits for the server to give us more WINDOW_UPDATE frames (the HTTP2 client->server request backpressure mechanism)