Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle a connection closed by the client when streaming a response? #669

Closed
joaquimadraz opened this issue Feb 23, 2018 · 13 comments

Comments

Projects
None yet
2 participants
@joaquimadraz
Copy link
Contributor

commented Feb 23, 2018

I'm implementing a feature where I'm sending chunks of a CSV file from a stream and currently, I'm using Enum.into to traverse the stream into the connection.

I'm getting an error when the connection is closed on the client side:

** (MatchError) no match of right hand side value: {:error, :closed}
(plug) lib/plug/conn.ex:1239: anonymous fn/2 in Collectable.Plug.Conn.into/1

Looking into the Plug.Conn implementation of the Collectable protocol explains the error because there's only a match when things go well.

I was wondering what would be the best approach to handle the case when the connection is closed by the client.

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Feb 23, 2018

Btw, I'm currently traversing the stream using Enum.reduce_while instead of using Enum.into because we can stop it when matching the error:

stream
|> Enum.reduce_while(conn, fn (chunk, conn) ->
  case Plug.Conn.chunk(conn, chunk) do
    {:ok, conn} ->
      {:cont, conn}
    {:error, :closed} ->
      # Connection closed by the client
      {:halt, conn}
  end
end)

Is this a good approach?

@josevalim

This comment has been minimized.

Copy link
Member

commented Feb 23, 2018

Yup, your approach is great. We will provide a fix for plug too, thanks for the report.

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Feb 23, 2018

Cool. I'm glad to help if you think this is something I can work on.

@josevalim

This comment has been minimized.

Copy link
Member

commented Feb 23, 2018

Oh, a PR would be great too!

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Feb 24, 2018

I will give it a try then :)

I was reading the code and ended up on Elixir’s source code for Enum.into which is using reduce.
From what I could grasp, reduce and reduce_while use the same underlying function, but the difference is that reduce has a fixed flow returning {:cont, fun.(x, acc)} from the reduce function while on reduce_while we can control the execution with {:cont, acc} and {:halt, acc}.

There are two possible solutions that I can think of.

The first one is to use reduce_while on the Enum.into implementation, this would require a change to the Collectable protocol that is backward incompatible.

The second one is to use Exceptions as control the flow on Enum.into. A Collectable could throw {:halt, reason} to stop the execution:

  defp into(enumerable, initial, fun, callback) do
    try do
      reduce(enumerable, initial, callback)
    catch
      :throw, {:halt, reason} ->
        fun.(initial, :halt)
        {:error, reason}
      kind, reason ->
        fun.(initial, :halt)
        :erlang.raise(kind, reason, __STACKTRACE__)
    else
      acc -> fun.(acc, :done)
    end
  end

What do you think?

@josevalim

This comment has been minimized.

Copy link
Member

commented Feb 24, 2018

Hrm, you are right, I forgot the Collectable protocol doesn't allow the data ingestion to abort. At this point it may be best for us to deprecate the implementation of the Collectable protocol for Plug.Conn since it doesn't work with side-effects and we can point people to the right direction, in this case your reduce_while example.

Thoughts?

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Feb 26, 2018

I’m ok with the deprecation because it's not a perfect fit. However, I think it would be useful to provide a way to stream a response from a List or a Stream that holds iodata and encapsulate this behavior. Do you see a benefit on that?

@josevalim

This comment has been minimized.

Copy link
Member

commented Feb 26, 2018

Since we have already been bitten by trying to package too much in a single place, I would rather keep it simple and document how to roll your own.

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Feb 27, 2018

Ok, fair enough 👍I will push a PR for the deprecation. Where should I put the documentation to use reduce_while? on the deprecation warning?

@josevalim

This comment has been minimized.

Copy link
Member

commented Feb 27, 2018

@josevalim

This comment has been minimized.

Copy link
Member

commented Mar 4, 2018

Ping. :)

@joaquimadraz

This comment has been minimized.

Copy link
Contributor Author

commented Mar 4, 2018

I'm actually working on this right now. Sorry, last week was busy for me :)

You said to write the deprecation warning inside the into function, is it because @deprecated does not throw a warning inside protocol implementation?

@josevalim

This comment has been minimized.

Copy link
Member

commented Mar 4, 2018

@joaquimadraz yup. :) The protocol is invoked dynamically at runtime.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.