Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

formFields and Error in stage [unknown-operation]: Promise already completed #73

Open
akka-ci opened this issue Sep 8, 2016 · 27 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted bug
Projects

Comments

@akka-ci
Copy link

akka-ci commented Sep 8, 2016

Issue by s12v
Wednesday Mar 09, 2016 at 12:22 GMT
Originally opened as akka/akka#19981


I'm having intermittent exception when using akka-http and formFields:

ERROR] [03/09/2016 13:14:34.252] [SNS-akka.actor.default-dispatcher-4] [akka://SNS/user/StreamSupervisor-0/flow-3-0-unknown-operation] Error in stage [unknown-operation]: Promise already completed.
java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Promise$class.failure(Promise.scala:104)
    at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:153)
    at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:76)
    at akka.http.impl.util.StreamUtils$$anon$2.onUpstreamFailure(StreamUtils.scala:69)
    at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onUpstreamFailure(Stage.scala:62)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:621)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:535)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:443)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:364)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:485)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)
    at akka.actor.ActorCell.create(ActorCell.scala:590)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:461)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It's one of 10 or 100 requests, pretty random, other requests are fine.

This is the code (I think only handler is relevant):

package me.snov.sns

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse}
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.util.Timeout
import me.snov.sns.api._
import me.snov.sns.utils.Config

import scala.concurrent.ExecutionContext

object Main extends App with Config with HealthCheck {
  implicit val system = ActorSystem("SNS")
  implicit val executor: ExecutionContext = system.dispatcher
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val logger: LoggingAdapter = Logging(system, getClass)
  implicit val timeout = new Timeout(1000, TimeUnit.MILLISECONDS)

  val xml =
    <CreateTopicResponse>
      <CreateTopicResult><TopicArn>foo</TopicArn></CreateTopicResult>
      <ResponseMetadata><RequestId>bar</RequestId></ResponseMetadata>
    </CreateTopicResponse>

  val r = HttpResponse(
    entity = HttpEntity(ContentTypes.`text/xml(UTF-8)`, scala.xml.Utility.trim(xml).toString())
  )

  Http().bindAndHandle(
    handler = logRequestResult("akka-http")(
      pathSingleSlash {
        formFields('Action ! "CreateTopic", 'Name) { (name) =>
          complete(r)
        }
      } 
    ),
    interface = config.getString("http.interface"),
    port = config.getInt("http.port")
  )
}

If I remove formFields (and have only pathSingleSlash {complete(r)}) the issue is gone.

What can be a reason, am I doing something wrong? A bug?

@akka-ci akka-ci added this to the 2.4.11 milestone Sep 8, 2016
@akka-ci akka-ci added 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding bug t:http labels Sep 8, 2016
@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Wednesday Mar 09, 2016 at 14:24 GMT


Thanks for reporting, looks weird indeed.
Can you confirm this is on Akka (HTTP) 2.4.2 and what you're using to issue the requests (something weird or simple curl? Pasting it would help too, thanks)?

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by s12v
Wednesday Mar 09, 2016 at 15:44 GMT


@ktoso, yes, 2.4.2. I indeed use something weird for requests - AWS Ruby SDK (I'm trying to emulate SNS) or curl+Charles proxy. When I use curl directly it always works:

for i in {1..50}; do curl -H "Content-Type: application/x-www-form-urlencoded; charset=utf-8" --data-binary "Action=CreateTopic&Name=cucumber1-851816188252&Version=2010-03-31" http://localhost:9911/; done

when I use the same curl command via Charles (another port) sometimes it doesn't work.
Response is "The form field 'Action' was malformed: Promise already completed."

In Charles it looks like this (same requests):
screenshot 2016-03-09 16 39 14
Error 400 corresponds to the exception.

Also fails in Travis: https://travis-ci.org/s12v/sns/builds/114896929 (log.txt in the end)

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by benen
Thursday Mar 10, 2016 at 15:31 GMT


I'm in the process of porting a spray application to akka-http (2.4.2) and I'm seeing the same exception intermittently across multiple routes handling POST requests. As with s12v I don't see it when I'm making requests against the application directly, in my case it occurs when the application is placed behind an AWS Elastic Load Balancer.

The stack trace is the same though I'm also seeing when using the entity directive to unmarshall json as well as when using the formFields and formFieldsMap directives. In my attempts to debug it I've seen it occur with the the content types multipart/form-data, application/x-www-form-urlencoded, and application/json. When using the entity directive the message returned is "The request content was malformed: Promise already completed."

The only other thing I've noticed is that the incidence rate increases with the number of requests being concurrently handled by the application, peaking at about one every three requests when I'm flooding the application with test requests.

edit: It seems like the defaultEntity is getting set by the parser in the error cases. Is content length being mangled by AWS?

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by johanandren
Monday Mar 14, 2016 at 14:24 GMT


The entity source has actually been completed successfully but is then materialized again for some reason, the actual exception happening when the promise is already completed is:
java.lang.IllegalStateException: Substream Source cannot be materialized more than once

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by johanandren
Tuesday Mar 15, 2016 at 09:54 GMT


Already reported before as #19506

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by johanandren
Tuesday Mar 15, 2016 at 09:55 GMT


And #18591

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by johanandren
Tuesday Mar 15, 2016 at 09:57 GMT


What happens, as far as I have understood it, is that the form fields will not share a single unmarshalling of the entity into a StrictForm but instead unmarshalls the entity once for every field, which works (albeit a bit wasteful) for strict entities, but as soon as they actually are a stream (chunked?) then this fails on the second field.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by mpilquist
Tuesday Mar 15, 2016 at 15:53 GMT


Also running in to this issue, but on the client-side when there are multiple reads of the response entity.

One simple way to reproduce the Substream source cannot be ... error is using a server side route like this:

val handler = extractRequest { req => complete { HttpResponse(entity = req.entity) } }

It isn't clear to me whether we are allowed to re-use entity.dataBytes like this. If not, a comment on HttpEntity#dataBytes would be useful.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Tuesday Mar 15, 2016 at 15:56 GMT


AFAICS the route you posted should be totally fine @mpilquist – you only consume the databytes once, for writing the response back as I see this.
How does your client side look though? It is true (and correct) that the entity may only be consumed once (unless it is toStrict() first, but that's potentially unbounded in memory)

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by mpilquist
Tuesday Mar 15, 2016 at 16:00 GMT


@ktoso In the reproduction case I created using the echo route on the server, my client side was:

val chunks = Source(List.fill(10000)(ByteString("Hello, world! ")))
val request = HttpRequest(uri = "...", entity = HttpEntity.Chunked.fromData(ContentTypes.`text/plain(UTF-8)`, chunks))
Http().singleRequest(request)

In a production app where I've experienced this issue, it occurs when a chunked response entity is both unmarshalled and logged using a custom response logger.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by jrudolph
Tuesday Mar 15, 2016 at 16:05 GMT


It isn't clear to me whether we are allowed to re-use entity.dataBytes like this. If not, a comment on HttpEntity#dataBytes would be useful.

No, it's not reusable. Would make sense to document that more explicitly.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by jrudolph
Tuesday Mar 15, 2016 at 17:20 GMT


IIRC there may have been a conscious decision to support formFields properly only for strict entities (maybe because it's quite hard to fix it properly, see below). On the other hand, the current solution seems to be quite undecided in what it wants to support (full support for Strict entities, support for a single formField directive with a single field for streamed ones, no clues whether streamed entities should be supported and also no tests for the streamed case).

Forbid formField for non-strict entities?

So, general answer could be not allowing formFields for non-strict entities, i.e. forcing the user to surround formFields usages with a toStrict directive. We would have to document that

  • a) the entity has to be strict for use with formFields
  • b) strictification has to be done on a level in the routing tree that spans all possible alternative formField occurrences, so that toStrict is only run once regardless which path the request takes through the routing tree

In practice, this restriction might be hard to fulfill if you want some branches to deal with the entity in a streaming fashion). Also, it contradicts the philosophy of the routing DSL of being flexible and composable without giving you too many constraints.

It would be better if the current behavior could be fixed.

Approaches to fixing the problem

For the streamed case there probably exist several problems (I inferred that from the reported issues and the code, not from actual testing):

  • A) a formFields directive with multiple fields tries to access the request entity more than once
  • B) nested formFields directives try to access the request entity more than once
  • C) sibling formFields directives try to access the request entity more than once

I think the cause of issue A) is that formFields applies the StrictForm unmarshaller separately for each field anew. The StrictForm unmarshaller uses the streaming multipart/formdata unmarshaller, so it actually can read streamed entities, but then immediately slurps the whole stream to make it strict. The bug is that multiple fields will reapply the StrictForm unmarshaller which accesses the original stream several times. This issue can probably fixed quite simply by first applying the StrictForm unmarshaller and then passing the resulting StrictForm to the FieldDefs instead of redoing all the work for every FieldDef.

Case B) is already harder as it requires that the formField directive passes down the strict entity data (or StrictForm) data to the inner routes. The simplest solution would be running entity.toStrict once and then running this through this directive and also updating the incoming request with the new strict and reusable entity data before passing it to the inner route. Once this is fixed, the solution for A) above isn't strictly needed any more (but may still be a worthwhile optimization).

Case C) is much harder to fix. In that case the strict entity produced in one branch of the routing tree would have to be transported to a sibling alternative branch if the first branch rejects the request after having drained the request entity. But, alas, this is not possible because sibling branches in the routing tree cannot exchange information (but through rejections).

This case identifies a problem with the routing DSL in Akka HTTP where the request is not immutable any more (compared to spray). The routing DSL assumes that the request object is immutable and fully reusable in the route alternative operator ~ which passes the unchanged RequestContext to the second branch if the first branch was rejected.

Possible solutions could be:

    1. Try to make use of rejections to transport the already strictified entity between branches, toStrict would capture rejections in the inner route and wrap them with a new rejection that would also contain the strictified entity. The route alternative implementation checks for the wrapped rejection and adapts the request for the alternative branch to contain the already strict entity instead of the drained streamed one. (May work but may have some impact on custom rejection handlers and other code dealing with rejections.)
    1. Solve it through more mutability in the entity itself. The entity source could therefore be wrapped with another source that allows either one-time access or reusability through internal buffering on the first access. (Hard to say if this is feasible. The wrapped source would need to expose some API to specify whether an access should be one-time or buffering.)
    1. Prevent rejections coming out of formField directives from triggering route alternatives (how?)

These solutions share an ugly secret: routing a request through one branch of the routing tree may have the side-effect of slurping in the whole request entity, at least if the content-type is multipart/formdata. This means you need to be aware of the issue e.g. if you want to read a big file-upload in a streaming fashion. In that case you will have to structure your route tree in a way that the request is routed away from any formFields directives.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Tuesday Mar 15, 2016 at 22:07 GMT


Thanks for the in-depth analysis @jrudolph!

Getting more mutability into the entity while ugly may indeed be something we'll want to do for a number of reasons actually – one of them being auto-draining (akka/akka#18716), so perhaps good to consider if a special source in there would be able to solve help us address both issues...

No single of the solutions seems like "the right way to go", so we'll need to figure out the trade offs. I'll try to sleep on it and revisit soon again hm.

Related: Also related to formFields was the issue here akka/akka#19042 (comment) in which we attempted to implement formFieldMap however it too bumped into this problem of possibly needing to toStrict.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by compositor
Saturday Mar 19, 2016 at 00:27 GMT


For AWS ELB one can workaround by using TCP ELB instead of HTTP ELB

"Listeners": [ {
          "LoadBalancerPort" : "443",
-           "Protocol" : "HTTPS",
+           "Protocol" : "SSL",
           "SSLCertificateId" : .....,
           "InstancePort" : "80",
-           "InstanceProtocol" : "HTTP"
+           "InstanceProtocol" : "TCP"
        }]

I tested it and see no issues anymore

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by compositor
Tuesday Mar 22, 2016 at 01:10 GMT


Unfortunately switching to TCP ELB does not help. Certain big POST requests still fail, and 100% reproducible. If I go via nginx (rather ELB), requests work fine.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by compositor
Monday Mar 28, 2016 at 21:25 GMT


Workaround: put service behind nginx which would response with HTTP 100 on behalf of the service.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by seanmcl
Tuesday Apr 19, 2016 at 21:17 GMT


FWIW, I'm having same issue as @compositor. Behind ELB I get the error regularly, but not hitting the service directly.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by s12v
Monday Apr 25, 2016 at 21:20 GMT


So, general answer could be not allowing formFields for non-strict entities, i.e. forcing the user to surround formFields usages with a toStrict directive. We would have to document that

@jrudolph, could you please give an example, how to do it?

I've also tried entity(as[FormData]) { entity => like suggested in another issue, but it didn't help

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by mlangc
Thursday Apr 28, 2016 at 07:54 GMT


Hmm, I tried

val toStrict = mapInnerRoute { innerRoute =>
      extractRequest { req =>
        onSuccess(req.toStrict(1.second)) { strictReq =>
          mapRequest(_ => strictReq) {
            innerRoute
          }
        }
      }
    }

    // Workaround for https://github.com/akka/akka/issues/19981
    val safePost = mapInnerRoute { innerRoute =>
      post {
        toStrict {
          innerRoute
        }
      }
    }

but that didn't help me either....

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by mlangc
Thursday Apr 28, 2016 at 08:24 GMT


OK, my error, I'm now applying the toStrict operator from above to the outermost route, and the error is gone.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by s12v
Wednesday May 04, 2016 at 18:21 GMT


@mlangc, thank you, with toStrict the error is gone

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by saifellafi
Thursday Jun 09, 2016 at 22:10 GMT


Hello.

I have also been affected by this issue. I Could reproduce this issue 100% of the times through R package "httr" to submit POST request of two form fields. Using the toStrict solution solves the issue.

Is there any insights on a permanent solution? can this be solved? what does the current workaround imply?

I am using Akka 2.4.6 and scala 2.11
Thanks,
Saif

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by johanandren
Friday Jun 10, 2016 at 08:34 GMT


@saifellafi Reproducing this bug is not a problem, fixing it however is far from trivial.
Please see Johannes reply from 15 mars, it describes the options for solving the issue.

Using toStrict as a workaround will force the entire request entity into memory rather than handle it as a stream, this protects against trying to consume it multiple times.

@jrudolph jrudolph added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Sep 8, 2016
@ktoso ktoso removed the t:http label Sep 12, 2016
@ktoso ktoso removed this from the 2.4.11 milestone Sep 12, 2016
@ghost
Copy link

ghost commented Jun 26, 2017

Hello, any follow up on this one?

@jrudolph
Copy link
Member

My comment above is still accurate. Some time ago I started working on the ii.) solution above which isn't trivial to get right. I hope to get to it at some point. When that is done, we should also simplify the current formField infrastructure which is much more sophisticated than it needs to be (especially given that it basically doesn't work right now for streamed entities).

@jrudolph
Copy link
Member

Thanks to #2283, at least the non-nested variant with multiple fields (formField('a, 'b), case A above) will work in the next version.

@raboof raboof removed 2 - pick next Used to mark issues which are next up in the queue to be worked on. The tag is non-binding labels Jun 27, 2019
@raboof
Copy link
Member

raboof commented Jul 2, 2019

With #2524 I think both scenario 'A' and 'B' should now be covered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted bug
Projects
No open projects
Development

No branches or pull requests

4 participants