Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement backpressure for stream requests #1

Closed
the-mikedavis opened this issue Apr 5, 2021 · 6 comments · Fixed by #3
Closed

implement backpressure for stream requests #1

the-mikedavis opened this issue Apr 5, 2021 · 6 comments · Fixed by #3
Assignees
Labels

Comments

@the-mikedavis
Copy link
Collaborator

the-mikedavis commented Apr 5, 2021

currently Spear.append/4 with an infinite stream fails (expected) but not for the expected reason (GenServer timeout)

instead it gives

Stream.iterate(0, &(&1 + 1))
|> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
|> Spear.append(conn, "InfiniteCounter", expect: :empty)
# =>
{:error,
 %Mint.HTTPError{
   module: Mint.HTTP2,
   reason: {:exceeds_window_size, :connection, 26}
 }}

Currently the setup for all out-bound requests is like so:

defp request_and_stream_body(conn, request) do
  with {:ok, conn, request_ref} <-
         Mint.HTTP.request(conn, @post, request.path, request.headers, :stream),
       {:ok, conn} <- stream_body(conn, request_ref, request.messages),
       {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, :eof) do
    {:ok, conn, request_ref}
  else
    {:error, conn, reason} -> {:error, conn, reason}
  end
end

and stream_body/3 is implemented like so:

defp stream_body(conn, request_ref, messages) do
  Enum.reduce_while(messages, {:ok, conn}, fn message, {:ok, conn} ->
    {message, _byte_size} = Request.to_wire_data(message)

    stream_result =
      Mint.HTTP.stream_request_body(
        conn,
        request_ref,
        message
      )

    case stream_result do
      {:ok, conn} -> {:cont, {:ok, conn}}
      error -> {:halt, error}
    end
  end)
end

As it turns out, this is actually very similar to the streaming implementation in finch! (PR that added streaming support: https://github.com/keathley/finch/pull/107/files#diff-48431cc1d91063480b5006d7585c96ea39433e319aca2b5e3a6c597fdbd7e10fR153-R158)

If we add some IO.inspect/2s of the window sizes in that Enum.reduce_while/3

conn |> Mint.HTTP2.get_window_size(:connection) |> IO.inspect(label: "connection window")
conn |> Mint.HTTP2.get_window_size({:request, request_ref}) |> IO.inspect(label: "request window")

we can see the window size for the connection and request gradually decreasing down to (in this case) 26, which is not enough to send the next message.

We can cheaply detect the window sizes as we reduce, but it's not immediately clear how to halt the stream temporarily while we Mint.HTTP.stream/2 and await a WINDOW_UPDATE frame (once we realize that our window is not large enough).

@the-mikedavis the-mikedavis self-assigned this Apr 5, 2021
@the-mikedavis
Copy link
Collaborator Author

the-mikedavis commented Apr 5, 2021

We cannot exactly "split" arbitrary streams in Elixir (see this issue) because of the lack of built-in thunks and that streams tend to wrap stateful resources like files.

I would say that this is accomplishable with piping messages through Stream.with_index and using that to halt the stream and pick up again from where it left off with Stream.drop/2, but that will re-process the messages which we have already sent. Probably not that big of a deal for most streams but I could see that being a real problem over stateful resources like a very large file, which this function is likely to handle

@the-mikedavis
Copy link
Collaborator Author

the-mikedavis commented Apr 5, 2021

the other approach I think has legs is to detect that the message is going to exceed a window and synchronously waiting in the Enum.reduce_while/3 for messages from the server, either with Kernel.SpecialForms.receive/1 or by switching the connection to passive mode and reading out responses with Mint.HTTP.recv/3.

Enum.reduce_while(messages, {:ok, conn}, fn message, {:ok, conn} ->
  {wire_data, byte_size} = Request.to_wire_data(message)
  connection_window = Mint.HTTP2.get_window_size(conn, :connection)
  request_window = Mint.HTTP2.get_window_size(conn, {:request, request_ref})
  smaller_window = min(connection_window, request_window)

  with false <- byte_size > smaller_window,
       {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, wire_data) do
    {:cont, {:ok, conn}}
  else
    true -> get_until_window_increase(conn, smaller_window)
    error -> {:halt, error}
  end
end)

with get_until_window_increase/2 being implemented something like

defp get_until_window_increase(conn, smaller_window) do
  # TODO handle `responses`
  with {:ok, conn, responses} <- Mint.HTTP.recv(conn, 0, 5_000),
       connection_window = Mint.HTTP2.get_window_size(conn, :connection),
       request_window = Mint.HTTP2.get_window_size(conn, {:request, request_ref}),
       new_smaller_window when new_smaller_window > smaller_window <-
         min(connection_window, request_window) do
    {:cont, {:ok, conn}}
  else
    ^smaller_window -> get_until_window_increase(conn, smaller_window)
    error -> {:halt, error}
  end
end

In either case, the connection may receive messages for other request_refs while synchronously waiting for the WINDOW_UPDATE frame, so those responses would need to be passed through the process_response/2 function so that other read requests and subscriptions be processed and not dropped. This isn't the end of the world but would require a small refactor to pass around state instead of just conn

@the-mikedavis
Copy link
Collaborator Author

looks like this is also an issue in the finch implementation https://github.com/keathley/finch/issues/88

@the-mikedavis
Copy link
Collaborator Author

worth checking if this is just the EventStore telling the client to stop trying to send so much data, but it appears as though the EventStore is properly telling the client to expand window size in the connection (stream 0)

spear-infinite-connection-window-update

and also later in the request (stream 5)

spear-infinite-request-window-update

but as the code is not directing mint to listen for these, mint is not magically adjusting the window

so it appears as though this is standard HTTP2 request back-pressure

@the-mikedavis
Copy link
Collaborator Author

the Mint.HTTP.recv/3 approach is probably wiser as a receive/1 will also catch incoming genserver calls/casts

the-mikedavis added a commit that referenced this issue Apr 5, 2021
this is the small refactor alluded to in
#1 (comment)
@the-mikedavis the-mikedavis changed the title appending an inifinite stream of events fails with Mint window size error implement backpressure for stream requests Apr 6, 2021
@the-mikedavis
Copy link
Collaborator Author

the-mikedavis commented Apr 9, 2021

See #3 for the implementation that 🔪d this

instead of synchronously blocking in the request_and_stream_body/2 (now request_and_stream_body/3) with either Kernel.SpecialForms.receive/1 or Mint.HTTP.recv/3, we use t:Enumerable.continuation/0s from Enumerable.reduce/3 to

  • suspend the input stream when we exhaust the smallest window
  • put the suspended stream into state
  • go back to being a regular ole' GenServer waiting for incoming messages and handling other requests
  • when we detect that the window size has increased, try continuing any suspended streams

this has the potential for hanging a suspended stream if the server never replenishes our window, but luckily Spear is only written to interact with one kind of server: an EventStoreDB, which we can show to be conformant with proper window refill behavior. I.e. I think it's an accetable risk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant