New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add multiple file upload directive #1033
Add multiple file upload directive #1033
Conversation
Hi @adlawson, Thank you for your contribution! We really value the time you've taken to put this together. Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement: |
Can one of the repo owners verify this patch? |
* for streaming the file contents somewhere. If there is no such field the request will be rejected. | ||
*/ | ||
def fileMultiUpload(fieldName: String, inner: JFunction[JList[JMap.Entry[FileInfo, Source[ByteString, Any]]], Route]): Route = RouteAdapter { | ||
def toJava[F <: FileInfo](info: F): FileInfo = info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super happy about this bit, but without it I'd need to asInstanceOf
to make it work with Map.Entry
.
OK TO TEST |
Test FAILed. |
Test PASSed. |
.filter(part ⇒ part.filename.isDefined && part.name == fieldName) | ||
.map(part ⇒ (FileInfo(part.name, part.filename.get, part.entity.contentType), part.entity.dataBytes)) | ||
|
||
val partF = partSource.runWith(Sink.seq[(FileInfo, Source[ByteString, Any])]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, this approach will not work if the file uploads are big enough. You try to collect ongoing sources for the parts into a seq. However, without reading the first source you will not get the second source, so that the seq
will never complete and the stream will deadlock. This is an unfortunate consequence of the squential multipart
format. It will work with smaller payloads that fit into all the buffers but if the whole request doesn't fit into the buffers it will deadlock.
I think we need to do the same that other web servers / frameworks are doing and buffer the incoming files on the disk so that we can drain the whole request body before giving out any data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a real shame. I've been trying to get this to work with larger files today and hitting timeouts, but couldn't trace the root of the problem. Super obvious when you think about it.
If you'd like I can continue working on it and submit updated changes, since it's something I need anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, of course. We'll be glad if you keep working on it. I think buffering to disk isn't completely trivial to do but would definitely be a worthwhile addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs some solution to the deadlocking problem, commented above. (Just adding the "Changes Requested" marker for administrative reasons)
Test PASSed. |
I've updated the implementation to buffer to disk and squashed the commits into one. The changes now include 2 directives, where the first depends on the second. def fileMultiUpload(fieldName: String): Directive1[immutable.Seq[(FileInfo, Source[ByteString, Any])]]
def uploadedFiles(fieldName: String): Directive1[immutable.Seq[(FileInfo, File)]] I had started to go down the route of a custom GraphStage to buffer to disk, just like Alpakka's akka.stream.alpakka.s3.impl.DiskBuffer but it proved harder to implement and more complex than necessary. I also couldn't figure out a way to guarantee reading from a written-to file. Maybe chunking writes may have helped but I still don't see any guarantees. Another solution I tried was The final solution looks too simple to me, but I've tested it with multiple large files and they stream over perfectly fine. I suppose the only downside is that we effectively have to wait for all ByteString sinks to complete before we can begin sourcing again from disk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking quite good (even it seems to simple, streams should make things simple ;) ).
Apart from the few comments, it also needs documentation before it can be merged.
* | ||
* @group fileupload | ||
*/ | ||
def uploadedFiles(fieldName: String): Directive1[immutable.Seq[(FileInfo, File)]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a name which better describes what it does (maybe storeUploadedFilesToTempFiles
? better ideas?). It should have a parameter to specify the directory to put the temporary files into.
.filter(part ⇒ part.filename.isDefined && part.name == fieldName) | ||
.mapAsync(1) { part ⇒ | ||
val fileInfo = FileInfo(part.name, part.filename.get, part.entity.contentType) | ||
val dest = File.createTempFile("akka-http-upload", ".tmp") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this part should be completely abstracted away, so that the user can choose the destination freely based on the file info.
implicit val ec = ctx.executionContext | ||
|
||
uploadedFiles(fieldName).flatMap { files ⇒ | ||
val partsSource: Source[(FileInfo, Source[ByteString, Any]), Any] = Source(files).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be even simpler: no need for Source(files)
when source and target datastructures are a Seq
. You can just map the files
datastructure.
* | ||
* @group fileupload | ||
*/ | ||
def fileMultiUpload(fieldName: String): Directive1[immutable.Seq[(FileInfo, Source[ByteString, Any])]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be marked as @ApiMayChange
.
* | ||
* @group fileupload | ||
*/ | ||
def uploadedFiles(fieldName: String): Directive1[immutable.Seq[(FileInfo, File)]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be marked as @ApiMayChange
.
(And sorry for keeping it around for so long. It's not because the PR wasn't good, we were just busy.) |
Thanks for the feedback. I stuck with similar names to the single file uploads, just for consistency, but I'm happy to change them to be more descriptive. I'll also get onto your other points like separating temp file construction etc. too. Don't worry about the delay - I've been buried under a mountain of work too. I'll try to get this updated within the next week. |
Hey @jrudolph, I've made some updates since your last review
I've also taken a couple of further steps
Let me know what you think, and whether you agree with |
Test PASSed. |
- Add fileUploadAll directive, buffering multiple files on disk - Add storeUploadedFiles directive to stream multiple files to disk - Add mima-filter exclusions for new directives - Add destFn argument to directives to support custom destinations - Replace uploadedFile directive with storeUploadedFile
- Document fileUploadAll directive - Document storeUploadedFile directive - Document storeUploadedFiles directive
Test FAILed. |
Test PASSed. |
I've added the In addition to the change, I've squashed the commits into more reasonable, safe, chunks of work. |
storeUploadedFile has a more descriptive name and also adds the possibility to select the target file.
Directive1[T] = Directive[Tuple1[T]], so don't nest another Tuple2 inside. This also allows to get rid of the `case` syntax when using the directive. Slightly improved implementation as well .
Before only Strict BodyParts/HttpEntity were tested which are not susceptible to deadlocks. The new version should test the more common streaming uploads more comprehensively.
@adlawson looking very good now. I added a few commits with smallish cleanups and the deprecation of |
Test PASSed. |
Changes look good, @jrudolph. Thanks for the updates. |
Great, will give it a final read then :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great effort, thanks a lot 👍
* @group fileupload | ||
*/ | ||
@ApiMayChange | ||
def storeUploadedFile(fieldName: String, destFn: FileInfo ⇒ File): Directive[(FileInfo, File)] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice name btw 👍
Docs added and Johanness's PR also merged, everything looks good
Resolved #1273 Thanks! |
/** | ||
* Streams the bytes of the file submitted using multipart with the given field name into designated files on disk. | ||
* If there is an error writing to disk the request will be failed with the thrown exception, if there is no such | ||
* field the request will be rejected. Stored files are cleaned up on exit but not on failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does storeUploadedFiles Directive reject request if field is empty?
It seems there is no implementation of it
Adds support for multiple file uploads with the same field name.
I've followed the same implementation and testing approach as
fileUpload
. I did think about changing thefileUpload
implementation to take the head offileUploadAll
but it would remove any benefitSink.headOption
gives you overSink.seq
.Tasks
Following review at #1033 (review) there are a few outstanding pieces of work that need to be done before merge
Source(files).map
to justfiles.map
DeprecateTodo in a following PRuploadedFile
directive(?)