Skip to content

Commit

Permalink
Add Plug.Conn.read_part_headers/2 and read_part_body/2
Browse files Browse the repository at this point in the history
  • Loading branch information
José Valim committed Jun 6, 2017
1 parent 142ec1a commit c52b2f3
Show file tree
Hide file tree
Showing 10 changed files with 781 additions and 263 deletions.
92 changes: 0 additions & 92 deletions lib/plug/adapters/cowboy/conn.ex
Expand Up @@ -62,28 +62,6 @@ defmodule Plug.Adapters.Cowboy.Conn do
:cowboy_req.body(req, opts)
end

def parse_req_multipart(req, opts, callback) do
# We need to remove the length from the list
# otherwise cowboy will attempt to load the
# whole length at once.
{limit, opts} = Keyword.pop(opts, :length, 8_000_000)

# We need to construct the header opts using defaults here,
# since once opts are passed cowboy defaults are not applied anymore.
{headers_opts, opts} = Keyword.pop(opts, :headers, [])
headers_opts = headers_opts ++ [length: 64_000, read_length: 64_000, read_timeout: 5000]

{:ok, limit, acc, req} = parse_multipart(:cowboy_req.part(req, headers_opts), limit, opts, headers_opts, [], callback)

params = Enum.reduce(acc, %{}, &Plug.Conn.Query.decode_pair/2)

if limit > 0 do
{:ok, params, req}
else
{:more, params, req}
end
end

## Helpers

defp scheme(:tcp), do: :http
Expand All @@ -93,74 +71,4 @@ defmodule Plug.Adapters.Cowboy.Conn do
segments = :binary.split(path, "/", [:global])
for segment <- segments, segment != "", do: segment
end

## Multipart

defp parse_multipart({:ok, headers, req}, limit, opts, headers_opts, acc, callback) when limit >= 0 do
case callback.(headers) do
{:binary, name} ->
{:ok, limit, body, req} =
parse_multipart_body(:cowboy_req.part_body(req, opts), limit, opts, "")

Plug.Conn.Utils.validate_utf8!(body, Plug.Parsers.BadEncodingError, "multipart body")
parse_multipart(:cowboy_req.part(req, headers_opts), limit, opts, headers_opts,
[{name, body}|acc], callback)

{:file, name, path, %Plug.Upload{} = uploaded} ->
{:ok, file} = File.open(path, [:write, :binary, :delayed_write, :raw])

{:ok, limit, req} =
parse_multipart_file(:cowboy_req.part_body(req, opts), limit, opts, file)

:ok = File.close(file)
parse_multipart(:cowboy_req.part(req, headers_opts), limit, opts, headers_opts,
[{name, uploaded}|acc], callback)

:skip ->
parse_multipart(:cowboy_req.part(req, headers_opts), limit, opts, headers_opts,
acc, callback)
end
end

defp parse_multipart({:ok, _headers, req}, limit, _opts, _headers_opts, acc, _callback) do
{:ok, limit, acc, req}
end

defp parse_multipart({:done, req}, limit, _opts, _headers_opts, acc, _callback) do
{:ok, limit, acc, req}
end

defp parse_multipart_body({:more, tail, req}, limit, opts, body) when limit >= byte_size(tail) do
parse_multipart_body(:cowboy_req.part_body(req, opts), limit - byte_size(tail), opts, body <> tail)
end

defp parse_multipart_body({:more, tail, req}, limit, _opts, body) do
{:ok, limit - byte_size(tail), body, req}
end

defp parse_multipart_body({:ok, tail, req}, limit, _opts, body) when limit >= byte_size(tail) do
{:ok, limit - byte_size(tail), body <> tail, req}
end

defp parse_multipart_body({:ok, tail, req}, limit, _opts, body) do
{:ok, limit - byte_size(tail), body, req}
end

defp parse_multipart_file({:more, tail, req}, limit, opts, file) when limit >= byte_size(tail) do
IO.binwrite(file, tail)
parse_multipart_file(:cowboy_req.part_body(req, opts), limit - byte_size(tail), opts, file)
end

defp parse_multipart_file({:more, tail, req}, limit, _opts, _file) do
{:ok, limit - byte_size(tail), req}
end

defp parse_multipart_file({:ok, tail, req}, limit, _opts, file) when limit >= byte_size(tail) do
IO.binwrite(file, tail)
{:ok, limit - byte_size(tail), req}
end

defp parse_multipart_file({:ok, tail, req}, limit, _opts, _file) do
{:ok, limit - byte_size(tail), req}
end
end
40 changes: 3 additions & 37 deletions lib/plug/adapters/test/conn.ex
Expand Up @@ -86,18 +86,6 @@ defmodule Plug.Adapters.Test.Conn do
{tag, data, %{state | req_body: rest}}
end

def parse_req_multipart(%{params: params} = state, _opts, _callback) do
{:ok, params, state}
end

def parse_req_multipart(%{req_body: multipart} = state, opts, callback) do
boundary = Keyword.get(opts, :boundary)
params = parse_multipart(:cow_multipart.parse_headers(multipart, boundary), boundary, [], callback)
|> Enum.reduce(%{}, &Plug.Conn.Query.decode_pair/2)

{:ok, params, state}
end

## Private helpers

defp body_or_params(nil, _query, headers),
Expand All @@ -112,10 +100,11 @@ defmodule Plug.Adapters.Test.Conn do
end

defp body_or_params(params, query, headers) when is_map(params) do
content_type = List.keyfind(headers, "content-type", 0, {"content-type", "multipart/mixed; charset: utf-8"})
content_type = List.keyfind(headers, "content-type", 0,
{"content-type", "multipart/mixed; boundary=plug_conn_test"})
headers = List.keystore(headers, "content-type", 0, content_type)
params = Map.merge(Plug.Conn.Query.decode(query), stringify_params(params))
{"", params, headers}
{"--plug_conn_test--", params, headers}
end

defp stringify_params([{_, _}|_] = params),
Expand Down Expand Up @@ -146,27 +135,4 @@ defmodule Plug.Adapters.Test.Conn do
0 -> :ok
end
end

defp parse_multipart({:ok, headers, body}, boundary, acc, callback) do
{:done, content, rest} = :cow_multipart.parse_body(body, boundary)

case callback.(headers)do
{:file, name, path, %Plug.Upload{} = uploaded} ->
{:ok, file} = File.open(path, [:write, :binary, :delayed_write, :raw])
IO.binwrite(file, content)
File.close(file)

parse_multipart(:cow_multipart.parse_headers(rest, boundary), boundary, [{name, uploaded}|acc], callback)

{:binary, name} ->
parse_multipart(:cow_multipart.parse_headers(rest, boundary), boundary, [{name, content}|acc], callback)

:skip ->
parse_multipart(:cow_multipart.parse_headers(rest, boundary), boundary, acc, callback)
end
end

defp parse_multipart({:done, _rest}, _boundary, acc, _callback) do
acc
end
end
128 changes: 122 additions & 6 deletions lib/plug/conn.ex
Expand Up @@ -774,12 +774,12 @@ defmodule Plug.Conn do
## Options
* `:length` - sets the maximum number of bytes to read from the body for each
chunk, defaults to 8_000_000 bytes
* `:read_length` - sets the amount of bytes to read at one time from the
underlying socket to fill the chunk, defaults to 1_000_000 bytes
* `:read_timeout` - sets the timeout for each socket read, defaults to
15_000 ms
* `:length` - sets the maximum number of bytes to read from the body for
each chunk, defaults to 8_000_000 bytes
* `:read_length` - sets the amount of bytes to read at one time from the
underlying socket to fill the chunk, defaults to 1_000_000 bytes
* `:read_timeout` - sets the timeout for each socket read, defaults to
15_000ms
The values above are not meant to be exact. For example, setting the
length to 8_000_000 may end up reading some hundred bytes more from
Expand All @@ -804,6 +804,122 @@ defmodule Plug.Conn do
end
end

@doc """
Reads the headers of a multipart request.
It returns `{:ok, headers, conn}` with the headers or
`{:done, conn}` if there are no more parts.
Once `read_part_headers/2` is invoked, a developer may call
`read_part_body/2` to read the body associated to the headers.
If `read_part_headers/2` is called instead, the body is automatically
skipped until the next part headers.
## Options
* `:length` - sets the maximum number of bytes to read from the body for
each chunk, defaults to 64_000 bytes
* `:read_length` - sets the amount of bytes to read at one time from the
underlying socket to fill the chunk, defaults to 64_000 bytes
* `:read_timeout` - sets the timeout for each socket read, defaults to
5_000ms
"""
@spec read_part_headers(t, Keyword.t) :: {:ok, headers, t} | {:done, t}
def read_part_headers(%Conn{adapter: {adapter, state}} = conn, opts \\ []) do
opts = opts ++ [length: 64_000, read_length: 64_000, read_timeout: 5000]
case init_multipart(conn) do
{boundary, buffer} ->
{data, state} = read_multipart_from_buffer_or_adapter(buffer, adapter, state, opts)
read_part_headers(conn, data, boundary, adapter, state, opts)
:done ->
{:done, conn}
end
end

defp read_part_headers(conn, data, boundary, adapter, state, opts) do
case :plug_multipart.parse_headers(data, boundary) do
{:ok, headers, rest} ->
{:ok, headers, store_multipart(conn, {boundary, rest}, adapter, state)}
:more ->
{_, next, state} = next_multipart(adapter, state, opts)
read_part_headers(conn, data <> next, boundary, adapter, state, opts)
{:more, rest} ->
{_, next, state} = next_multipart(adapter, state, opts)
read_part_headers(conn, rest <> next, boundary, adapter, state, opts)
{:done, _} ->
{:done, store_multipart(conn, :done, adapter, state)}
end
end

@doc """
Reads the body of a multipart request.
Returns `{:ok, body, conn}` if all body has been read,
`{:more, binary, conn}` otherwise.
It accepts the same options as `read_body/2`.
"""
@spec read_part_body(t, Keyword.t) :: {:ok, binary, t} | {:more, binary, t}
def read_part_body(%{adapter: {adapter, state}} = conn, opts) do
case init_multipart(conn) do
{boundary, buffer} ->
length = Keyword.get(opts, :length, 8_000_000)
{data, state} = read_multipart_from_buffer_or_adapter(buffer, adapter, state, opts)
read_part_body(conn, data, "", length, boundary, adapter, state, opts)
:done ->
{:done, conn}
end
end

defp read_part_body(conn, data, acc, length, boundary, adapter, state, _opts) when byte_size(acc) > length do
{:more, acc, store_multipart(conn, {boundary, data}, adapter, state)}
end
defp read_part_body(conn, data, acc, length, boundary, adapter, state, opts) do
case :plug_multipart.parse_body(data, boundary) do
{:ok, body} ->
{_, next, state} = next_multipart(adapter, state, opts)
read_part_body(conn, next, acc <> body, length, boundary, adapter, state, opts)
{:ok, body, rest} ->
{_, next, state} = next_multipart(adapter, state, opts)
read_part_body(conn, rest <> next, acc <> body, length, boundary, adapter, state, opts)
:done ->
{:ok, acc, store_multipart(conn, {boundary, ""}, adapter, state)}
{:done, body} ->
{:ok, acc <> body, store_multipart(conn, {boundary, ""}, adapter, state)}
{:done, body, rest} ->
{:ok, acc <> body, store_multipart(conn, {boundary, rest}, adapter, state)}
end
end

defp init_multipart(%{private: %{plug_multipart: plug_multipart}}) do
plug_multipart
end
defp init_multipart(%{req_headers: req_headers}) do
{_, content_type} = List.keyfind(req_headers, "content-type", 0)
{:ok, "multipart", _, %{"boundary" => boundary}} = Plug.Conn.Utils.content_type(content_type)
{boundary, ""}
end

defp next_multipart(adapter, state, opts) do
case adapter.read_req_body(state, opts) do
{:ok, "", _} -> raise "invalid multipart, body terminated too soon"
valid -> valid
end
end

defp store_multipart(conn, multipart, adapter, state) do
%{put_in(conn.private[:plug_multipart], multipart) | adapter: {adapter, state}}
end

defp read_multipart_from_buffer_or_adapter("", adapter, state, opts) do
{_, data, state} = adapter.read_req_body(state, opts)
{data, state}
end
defp read_multipart_from_buffer_or_adapter(buffer, _adapter, state, _opts) do
{buffer, state}
end

@doc """
Fetches cookies from the request headers.
"""
Expand Down
22 changes: 0 additions & 22 deletions lib/plug/conn/adapter.ex
Expand Up @@ -72,26 +72,4 @@ defmodule Plug.Conn.Adapter do
{:ok, data :: binary, payload} |
{:more, data :: binary, payload} |
{:error, term}

@doc """
Parses a multipart request.
This function receives the payload, the body limit and a callback.
When parsing each multipart segment, the parser should invoke the
given fallback passing the headers for that segment, before consuming
the body. The callback will return one of the following values:
* `{:binary, name}` - the current segment must be treated as a regular
binary value with the given `name`
* `{:file, name, file, upload}` - the current segment is a file upload with `name`
and contents should be written to the given `file`
* `:skip` - this multipart segment should be skipped
This function may return a `:ok` or `:more` tuple. The first one is
returned when there is no more multipart data to be processed.
For the supported options, please read `Plug.Conn.read_body/2` docs.
"""
@callback parse_req_multipart(payload, options :: Keyword.t, fun) ::
{:ok, Conn.params, payload} | {:more, Conn.params, payload}
end
14 changes: 7 additions & 7 deletions lib/plug/parsers.ex
Expand Up @@ -104,13 +104,13 @@ defmodule Plug.Parsers do
Plug ships with the following parsers:
* `Plug.Parsers.URLENCODED` - parses `application/x-www-form-urlencoded`
requests (can be used as `:urlencoded` as well in the `:parsers` option)
* `Plug.Parsers.MULTIPART` - parses `multipart/form-data` and
`multipart/mixed` requests (can be used as `:multipart` as well in the
`:parsers` option)
* `Plug.Parsers.JSON` - parses `application/json` requests with the given
`:json_decoder` (can be used as `:json` as well in the `:parsers` option)
* `Plug.Parsers.URLENCODED` - parses `application/x-www-form-urlencoded`
requests (can be used as `:urlencoded` as well in the `:parsers` option)
* `Plug.Parsers.MULTIPART` - parses `multipart/form-data` and
`multipart/mixed` requests (can be used as `:multipart` as well in the
`:parsers` option)
* `Plug.Parsers.JSON` - parses `application/json` requests with the given
`:json_decoder` (can be used as `:json` as well in the `:parsers` option)
## File handling
Expand Down

0 comments on commit c52b2f3

Please sign in to comment.