-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cannot push port (Balance.out8) twice #20943
Comments
Thanks for reporting, it could be a bug in Balance itself. |
Not really, the balancer works fine in some other scenarios. I am using val path = Paths.get(s"/tmp/foo.txt")
val sink = Sink.headOption[Boolean]
val g = RunnableGraph fromGraph GraphDSL.create(sink) { implicit b => sink =>
val source = FileIO.fromPath(path)
.via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength=8192, allowTruncation=true))
.map(_.utf8String)
// Generates a 'JobResult'
val compute = Flow.fromFunction(Producer produce).withAttributes(ActorAttributes dispatcher "compute-dispatcher")
// Using same example balancer from cookbook
val balanced = b add balancer(compute, numWorkers).async
// Publishes boolean values
val publish = b add Flow[JobResult].mapAsyncUnordered(1024) { Publisher.publish(_, system) }
.dropWhile(_.result.result == true)
.map(_.result.result)
source ~> balanced ~> publish ~> sink
ClosedShape
} So basically I want to check if there is some file line which does not match some condition. Since I expect this not to happen very often, I am using |
This is almost surely a bug. Thanks for the reproducer, I will try to reproduce the issue. We have a nice compile-time flag that allows me to look at the actual events that happen, maybe that will shed some light on what is happening. |
@jarandaf there is some details missing in the reproducer, how are you creating your Balance instance (how many outputs, and |
Hi @johanandren, this is the balancer I used: // Deals with balancing workload over the given number of workers.
private def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
Flow fromGraph GraphDSL.create() { implicit b =>
val balancer = b add Balance[In](workerCount, waitForAllDownstreams = false)
val merge = b add Merge[Out](workerCount)
1 to workerCount foreach { _ => balancer ~> worker.async ~> merge }
FlowShape(balancer.in, merge.out)
}
} As per number of outputs, I did use the number of cores available in my box. Hope it helps! |
Ah you took it @johanandren . Then I look for something else. |
You know me @drewhk, I just love these pull-push-twice tickets. ;) |
It builds character, so they say. |
Figured it out:
If we filter the closed out when the upstream element arrives, there may not be any out to receive the pulled element, so we'd need to buffer it. But then the rest of the outs may close before ever pulling and the element is lost. Not sure what to do here. |
No need to do an extra buffer, just don't grab it from the port.
What do you mean by lost? If all the outputs have been closed then there is no "loss". I mean, we could have sent just before the "cancel" arrived and it will be "lost", too. |
Thanks @drewhk, went down a rabbit hole with my first try solving it. Your comment made it obvious what to do. |
I'm getting this error using Akka Streams 2.4.8. I am not sure where it comes from, the stacktrace does not tell much:
I am using a balancer to distribute lines processing from a file source.
The text was updated successfully, but these errors were encountered: