Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=htp Fix cancellation and head-of-line blocking in fileUpload directive #2224

Merged
merged 1 commit into from Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -184,7 +184,6 @@ class FileUploadDirectivesSpec extends RoutingSpec {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual str1
}

}

"stream the first file upload if multiple with the same name are posted" in {
Expand Down Expand Up @@ -238,6 +237,74 @@ class FileUploadDirectivesSpec extends RoutingSpec {

}

"not cancel the stream after providing the expected part" in {
val route = echoAsAService
val str1 = "some data"

@volatile var secondWasFullyRead = false
val secondSource =
Source.fromIterator(() ⇒ Iterator.from(1))
.take(100)
.map { i ⇒
if (i == 100) secondWasFullyRead = true
akka.util.ByteString("abcdefghij")
}

val multipartForm =
Multipart.FormData(
Source(
Vector(
Multipart.FormData.BodyPart.Strict(
"field1",
HttpEntity(str1),
Map("filename" → "data1.txt")
),
Multipart.FormData.BodyPart(
"field2",
HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, secondSource)
)
)
)
)

Post("/", multipartForm) ~> route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual str1
secondWasFullyRead shouldEqual true
}
}

"not be head-of-line-blocked if there is another big part before the part we are interested in" in {
val route = echoAsAService
val str1 = "some data"

val firstSource =
Source.repeat(ByteString("abcdefghij" * 100))
.take(1000) // 1MB

val multipartForm =
Multipart.FormData(
Source(
Vector(
// big part comes before the one we are interested in
Multipart.FormData.BodyPart(
"field2",
HttpEntity.IndefiniteLength(ContentTypes.`application/octet-stream`, firstSource)
),
Multipart.FormData.BodyPart.Strict(
"field1",
HttpEntity(str1),
Map("filename" → "data1.txt")
)
)
)
)

Post("/", multipartForm) ~> route ~> check {
status shouldEqual StatusCodes.OK
responseAs[String] shouldEqual str1
}
}
}

"the fileUploadAll directive" should {
Expand Down
Expand Up @@ -15,6 +15,7 @@ import scala.collection.immutable
import scala.concurrent.Future
import akka.stream.scaladsl._
import akka.http.javadsl
import akka.http.scaladsl.server.RouteResult

/**
* @groupname fileupload File upload directives
Expand All @@ -23,7 +24,6 @@ import akka.http.javadsl
trait FileUploadDirectives {

import BasicDirectives._
import RouteDirectives._
import FutureDirectives._
import MarshallingDirectives._

Expand Down Expand Up @@ -118,22 +118,28 @@ trait FileUploadDirectives {
*/
def fileUpload(fieldName: String): Directive1[(FileInfo, Source[ByteString, Any])] =
entity(as[Multipart.FormData]).flatMap { formData ⇒
extractRequestContext.flatMap { ctx ⇒
implicit val mat = ctx.materializer

val onePartSource: Source[(FileInfo, Source[ByteString, Any]), Any] = formData.parts
.filter(part ⇒ part.filename.isDefined && part.name == fieldName)
.map(part ⇒ (FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes))
.take(1)

val onePartF = onePartSource.runWith(Sink.headOption[(FileInfo, Source[ByteString, Any])])

onSuccess(onePartF)
Directive[Tuple1[(FileInfo, Source[ByteString, Any])]] { inner ⇒ ctx ⇒
import ctx.materializer
import ctx.executionContext

// Streamed multipart data must be processed in a certain way, that is, before you can expect the next part you
// must have fully read the entity of the current part.
// That means, we cannot just do `formData.parts.runWith(Sink.seq)` and then look for the part we are interested in
// but instead, we must actively process all the parts, regardless of whether we are interested in the data or not.
// Fortunately, continuation passing style of routing allows adding pre- and post-processing quite naturally.
formData.parts
.runFoldAsync(Option.empty[RouteResult]) {
case (None, part) if part.filename.isDefined && part.name == fieldName ⇒

val data = (FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes)
inner(Tuple1(data))(ctx).map(Some(_))

case (res, part) ⇒
part.entity.discardBytes()
Future.successful(res)
}
.map(_.getOrElse(RouteResult.Rejected(MissingFormFieldRejection(fieldName) :: Nil)))
}

}.flatMap {
case Some(tuple) ⇒ provide(tuple)
case None ⇒ reject(MissingFormFieldRejection(fieldName))
}

/**
Expand Down