-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement backpressure for enumerable/stream requests #2
Conversation
For a comparison of the wireshark screen-caps in #1 see this gif since all the DATA and WINDOW_UPDATE frames couldn't possibly fit on one page: |
with {:ok, %{status: 200, headers: headers} = response} <- | ||
GenServer.call(conn, {:request, request}, Keyword.fetch!(opts, :timeout)), | ||
{{"grpc-status", "0"}, _} <- {List.keyfind(headers, "grpc-status", 0), response} do | ||
Spear.Writing.decode_append_response( | ||
response[:data] || <<>>, | ||
opts[:raw?] | ||
) | ||
else | ||
{:error, reason} -> | ||
{:error, reason} | ||
|
||
# TODO this badly needs a refactor | ||
{{"grpc-status", other_status}, response} -> | ||
{:error, | ||
{:grpc_failure, | ||
code: other_status && String.to_integer(other_status), | ||
message: | ||
Enum.find_value(response.headers, fn {header, value} -> | ||
header == "grpc-message" && value | ||
end)}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep in mind spear is still super rough so I think it's acceptable (for now) to have a crappy bit of control flow like this sitting here
eventually this may be refactored out to Spear.Grpc
or something to close over the details of the well-known headers we're reading
|> do_stage(:recv_responses, params) | ||
|> do_stage(:check_window_size, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the flow-control mechanism I was describing
initially this block of code for recv_until_window_size_increase/3
was more complicated because I was trying to switch the conn
into passive mode and then clean up by switching it back into active mode, but Mint.HTTP.set_mode/2
has a return signature of
set_mode(conn, :active | :passive) :: {:ok, conn()} | {:error, any()}
(or something like that), so even that had some tuple-handling to do
it's probably possible to refactor this function now into a with
or two
the main point of doing this in this kind of control-flow style is that we can pass some values back through this conceptual pipeline by calling do_stage/3
with new arguments from a later stage
we do this in the :check_window_size
stage so we can receive
and block as many times as we need until the server has given us more bytes in our windows
I think this pipeline is more readable than any refactor we could attempt
defp do_stage({:error, state, reason}, _stage, _params), do: {:error, state, reason} | ||
|
||
defp do_stage({:ok, state}, :recv_responses, {_current_window, _request_ref}) do | ||
case receive_next_and_stream(state.conn) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially this was written with Mint.HTTP.recv/3
, but that function didn't seem to be increasing the connection's window size, and it seemed to be consistently timing out
I looked at how recv/3
and Mint.HTTP.stream/2
are implemented and I can't see any reason why recv/3
would not adjust the connection window but /shrug receive/1
+stream/2
just works
defp receive_next_and_stream(conn) do | ||
# YARD allow customization of timeout? | ||
receive do | ||
message when Mint.HTTP.is_connection_message(conn, message) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this macro is why the mix.exs
has a bump to requiring ~> 1.10
: this macro depends on some of the guard improvements in 1.10.0
need to use this here so we only receive Mint.HTTP messages and not accidentally intercept genserver calls/casts
end | ||
|
||
defp safe_get_request_window_size(conn, request_ref) do | ||
if Map.has_key?(conn.ref_to_stream_id, request_ref) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it appears as though Mint.HTTP.get_window_size/2
throws an argument when the request is not in the conn
(which it may be if the EventStore cancels our request from the server-side)
this thing I'm doing here with conn.ref_to_stream_id
is how Mint.HTTP.get_window_size/2
is implemented for the {:request, request_ref}
input
kinda dirty to be searching a private field in a mint conn like this but /shrug didn't see anything in the public API to allow me to check if a request currently belongs to a conn
might be a good PR to mint
streams, they are not practically able to do so. More immediately, HTTP2 | ||
allows server implementations to direct clients to a maximum window size of | ||
bytes allowed to be sent in a single request. EventStore exerts a reasonably | ||
large window size per connection and request on the client, disallowing | ||
the writing of infinite streams. In cases where a client attempts to write | ||
too many events, `append/4` may fail with the `Mint.HTTPError` depicted | ||
above (though possible with different elements in the second and third | ||
elements of the `:reason` tuple). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong wrong wrong
I didn't understand windows when I wrote this
ok so I think I'm gonna close this one because I think there's a great refactor to be done with i.e. see this implementation in GenStage https://github.com/elixir-lang/gen_stage/blob/6cb08ee2e892b8aff0fcc57f45e05c8d02dddcb2/lib/gen_stage/streamer.ex#L11-L27 if we can store a continuation in state, we can probably remove the |
currently the implementation in this PR suffers from a bug where it will hit the a better implementation will need to break down messages that are too large to be sent in one window into many DATA frames |
closed in favor of #3 |
Closes #1
Spear allows one to write events using Elixir
Stream
s (t:Enumerable.t/0
s). The final link in the chain which realizes the stream is the serialization of the data over the wire viaMint.HTTP.stream_request_body/3
.As detailed in #1, initially I was seeing append requests for infinite streams fail like so:
This is a back-pressure mechanism which is meant to keep clients from writing events to an HTTP/2 server faster than the server can receive them: each stream in an HTTP/2 connection has a window of bytes it is allowed to send. If it exceeds this window, it must wait until the server sends a WINDOW_UPDATE frame allowing more bytes to be sent.
The strategy before this PR of naively appending iodata to the stream in a
Enum.reduce_while/3
overloaded the connection window (stream 0). It's not really so much that we were writing events faster than the EventStore can handle (we were), but that we weren't listening for WINDOW_UPDATE frames at all and thus not respecting the backpressure mechanism.Since mint is very low-level and doesn't handle this for you (because it's a process-less architecture), you need to write the mint client to conceptually come-up-for-air every once in a while when sending large payloads to wait for the server. That's what this PR does!