Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce MultipartReceiver for custom, fail-fast multipart decoding #7411

Open
wants to merge 17 commits into
base: series/0.23
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 130 additions & 32 deletions core/shared/src/main/scala/org/http4s/multipart/MultipartDecoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,98 @@
package org.http4s
package multipart

import cats.data.EitherT
import cats.effect.Concurrent
import cats.effect.Resource
import cats.effect.std.Supervisor
import cats.syntax.all._
import fs2.Collector
import fs2.Pipe
import fs2.Stream
import fs2.io.file.Files

private[http4s] object MultipartDecoder {

/** Creates an EntityDecoder for `multipart/form-data` bodies, using
* the given `MultipartReceiver` to decide how to handle each Part.
*
* The creation of the EntityDecoder happens as a Resource which,
* when release, will release any underlying Resources that may
* have been allocated during the decoding process (e.g. temp files
* used for buffering Part bodies). Since the underlying Resources
* will not be released until this Resource is released, it is not
* recommended to reuse the allocated EntityDecoder.
*
* The `MultipartReceiver` is responsible for deciding the decoding
* logic for each Part, on a case-by-case basis. Some `MultipartReceiver`s
* are uniform, processing each part the same way. Others may be fully
* custom, processing each part in a different way. See the [[PartReceiver]]
* companion for examples.
*
* This allows the decoder to avoid unnecessary work in many cases.
* For example, a MultipartReceiver can be defined to only expect
* a specific set of parts, such that a request containing a data
* in unexpected parts would cause decoding to halt with an error,
* rather than continuing to read each part and potentially write
* their data to buffers.
*
* This also allows for an explicit choice of how each part is
* buffered, if at all. As opposed to [[mixedMultipartResource]],
* which buffers Part bodies opaquely and exposes them as a
* `Stream[F, Byte]`, a `MultipartReceiver` can make explicit choices
* about the destination of, and representation of, its Part bodies.
*
* Furthermore, since each `Part` is subject to decoding as it is
* encountered (i.e. before any potential buffering), when the decoding
* of a Part fails, the overall decoding process can halt early, without
* bothering to pull the rest of the stream.
*
* @param recv A MultipartReceiver which dictates that decoding logic for each Part
* @param headerLimit Maximum acceptable size of any Part's headers
* @param maxParts Maximum acceptable number of parts
* @param failOnLimit If `true`, exceeding the `maxParts` will cause a decoding failure.
* If `false`, any parts beyond the limit will be ignored.
* @return A resource which allocates an EntityDecoder for `multipart/form-data`
* media and releases any resources allocated by that decoder.
*/
def fromReceiver[F[_]: Concurrent, A](
recv: MultipartReceiver[F, A],
headerLimit: Int = 1024,
maxParts: Option[Int] = Some(50),
failOnLimit: Boolean = false,
): Resource[F, EntityDecoder[F, A]] =
Supervisor[F].map { supervisor =>
makeDecoder[F, recv.Partial, List, A](
MultipartParser.decodePartsSupervised[F, recv.Partial](
supervisor,
_,
part => EitherT(recv.decideOrReject(part.headers).receive(part)),
limit = headerLimit,
maxParts = maxParts,
failOnLimit = failOnLimit,
),
List,
(partials, _) => recv.assemble(partials),
)
}

/** A decoder for `multipart/form-data` content, where each "part"
* is stored to an in-memory buffer which can be read repeatedly
* as a `Stream[F, Byte]`.
*
* Note that this decoder should typically be avoided when expecting
* to receive file uploads, as this would cause the content of each
* uploaded file to be loaded into memory.
*
* @return A decoder for `multipart/form-data` content, with part
* bodies buffered in memory.
*/
def decoder[F[_]: Concurrent]: EntityDecoder[F, Multipart[F]] =
makeDecoder(MultipartParser.parseToPartsStream[F](_))
makeDecoder[F, Part[F], Vector, Multipart[F]](
MultipartParser.parseToPartsStream[F](_).andThen(_.map(Right(_))),
Vector,
(parts, boundary) => Right(Multipart(parts, boundary)),
)

/** Multipart decoder that streams all parts past a threshold
* (anything above `maxSizeBeforeWrite`) into a temporary file.
Expand Down Expand Up @@ -68,16 +150,20 @@ private[http4s] object MultipartDecoder {
chunkSize: Int = 8192,
): Resource[F, EntityDecoder[F, Multipart[F]]] =
Supervisor[F].map { supervisor =>
makeDecoder(
MultipartParser.parseToPartsSupervisedFile[F](
val partReceiver = PartReceiver
.toMixedBuffer[F](maxSizeBeforeWrite, chunkSize)
.mapWithHeaders(Part.apply[F])
makeDecoder[F, Part[F], Vector, Multipart[F]](
MultipartParser.decodePartsSupervised[F, Part[F]](
supervisor,
_,
headerLimit,
maxSizeBeforeWrite,
maxParts,
failOnLimit,
chunkSize,
)
part => EitherT(partReceiver.receive(part)),
limit = headerLimit,
maxParts = Some(maxParts),
failOnLimit = failOnLimit,
),
Vector,
(parts, boundary) => Right(Multipart[F](parts, boundary)),
)
}

Expand All @@ -86,7 +172,7 @@ private[http4s] object MultipartDecoder {
*
* Note: (BIG NOTE) Using this decoder for multipart decoding is good for the sake of
* not holding all information in memory, as it will never have more than
* `maxSizeBeforeWrite` in memory before writing to a temporary file. On top of this,
* `maxSizeBeforeWrite` in memory per part before writing to a temporary file. On top of this,
* you can gate the # of parts to further stop the quantity of parts you can have.
* That said, because after a threshold it writes into a temporary file, given
* bincompat reasons on 0.18.x, there is no way to make a distinction about which `Part[F]`
Expand Down Expand Up @@ -114,34 +200,46 @@ private[http4s] object MultipartDecoder {
maxParts: Int = 50,
failOnLimit: Boolean = false,
): EntityDecoder[F, Multipart[F]] =
makeDecoder(
MultipartParser.parseToPartsStreamedFile[F](
_,
headerLimit,
maxSizeBeforeWrite,
maxParts,
failOnLimit,
)
makeDecoder[F, Part[F], Vector, Multipart[F]](
MultipartParser
.parseToPartsStreamedFile[F](
_,
headerLimit,
maxSizeBeforeWrite,
maxParts,
failOnLimit,
)
.andThen(_.map(Right(_))),
Vector,
(parts, boundary) => Right(Multipart(parts, boundary)),
)

private def makeDecoder[F[_]: Concurrent](
impl: Boundary => Pipe[F, Byte, Part[F]]
): EntityDecoder[F, Multipart[F]] =
private def makeDecoder[F[_]: Concurrent, P, C[_], A](
pipe: Boundary => Pipe[F, Byte, Either[DecodeFailure, P]],
collector: Collector.Aux[P, C[P]],
assemble: (C[P], Boundary) => Either[DecodeFailure, A],
): EntityDecoder[F, A] =
EntityDecoder.decodeBy(MediaRange.`multipart/*`) { msg =>
msg.contentType.flatMap(_.mediaType.extensions.get("boundary")) match {
case Some(boundary) =>
DecodeResult {
case Some(boundaryStr) =>
val boundary = Boundary(boundaryStr)
EitherT(
msg.body
.through(impl(Boundary(boundary)))
// pipe to get individual parts or decode failures
.through(pipe(boundary))
// lift effect to EitherT[F, DecodeFailure, *] so that
// `.eval`-ing each value can short-circuit the stream
// when we get a `DecodeFailure`
.translate(EitherT.liftK[F, DecodeFailure])
.flatMap(r => Stream.eval(EitherT.fromEither[F](r)))
.compile
.toVector
.map[Either[DecodeFailure, Multipart[F]]](parts =>
Right(Multipart(parts, Boundary(boundary)))
)
.recover { case e: MessageBodyFailure =>
Left(e)
}
}
.to(collector)
.subflatMap(assemble(_, boundary))
.value
// catch DecodeErrors that were thrown, moving them to
// the EitherT's Left side
.recover { case e: DecodeFailure => Left(e) }
)
case None =>
DecodeResult.failureT(
InvalidMessageBodyFailure("Missing boundary extension to Content-Type")
Expand Down