New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fusing should be optimized #20218

Closed
akarnokd opened this Issue Apr 4, 2016 · 2 comments

Comments

Projects
None yet
3 participants
@akarnokd

akarnokd commented Apr 4, 2016

This benchmark running on my machine (i7 4790, Windows 7 x64, Java 8u77) runs relatively slow compared to other Reactive-Streams compliant libraries (Reactor, Rsc, RxJava 2).

Benchmark              (maxConcurrent)  (times)   Mode  Cnt      Score      Error  Units
rangeFlatMapJust_akka                1        1  thrpt    5  12311,357 �  457,117  ops/s
rangeFlatMapJust_akka                1     1000  thrpt    5     68,105 �    1,164  ops/s
rangeFlatMapJust_akka                1  1000000  thrpt    5      0,072 �    0,003  ops/s
rangeFlatMapJust_akka                2        1  thrpt    5  12036,598 �  787,357  ops/s
rangeFlatMapJust_akka                2     1000  thrpt    5     70,084 �    6,139  ops/s
rangeFlatMapJust_akka                2  1000000  thrpt    5      0,072 �    0,007  ops/s
rangeFlatMapJust_akka               16        1  thrpt    5  11913,276 � 1371,289  ops/s
rangeFlatMapJust_akka               16     1000  thrpt    5     69,442 �    1,552  ops/s
rangeFlatMapJust_akka               16  1000000  thrpt    5      0,066 �    0,022  ops/s

JFR

image

image

"sys-akka.actor.default-dispatcher-3" #16 prio=5 os_prio=0 tid=0x000000002015a800 nid=0xd64 runnable [0x000000001f52e000]
   java.lang.Thread.State: RUNNABLE
    at java.lang.Object.hashCode(Native Method)
    at akka.stream.InPort.hashCode(Shape.scala:18)
    at java.util.HashMap.hash(HashMap.java:338)
    at java.util.HashMap.containsKey(HashMap.java:595)
    at akka.stream.impl.fusing.Fusing$BuildStructuralInfo.akka$stream$impl$fusing$Fusing$BuildStructuralInfo$$addMapping(Fusing.scala:512)
    at akka.stream.impl.fusing.Fusing$BuildStructuralInfo$$anonfun$rewire$1.apply(Fusing.scala:685)
    at akka.stream.impl.fusing.Fusing$BuildStructuralInfo$$anonfun$rewire$1.apply(Fusing.scala:684)
    at scala.collection.Iterator$class.foreach(Iterator.scala:742)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
    at akka.stream.impl.fusing.Fusing$BuildStructuralInfo.rewire(Fusing.scala:684)
    at akka.stream.impl.fusing.Fusing$.akka$stream$impl$fusing$Fusing$$descend(Fusing.scala:357)
    at akka.stream.impl.fusing.Fusing$.akka$stream$impl$fusing$Fusing$$descend(Fusing.scala:368)
    at akka.stream.impl.fusing.Fusing$.doAggressive(Fusing.scala:44)
    at akka.stream.impl.fusing.Fusing$.aggressive(Fusing.scala:34)
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:84)
    at akka.stream.impl.SubFusingActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:227)
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:367)
    at akka.stream.scaladsl.Source.runWith(Source.scala:90)
    at akka.stream.impl.fusing.FlattenMerge$$anon$4.akka$stream$impl$fusing$FlattenMerge$$anon$$addSource(StreamOfStreams.scala:84)
    at akka.stream.impl.fusing.FlattenMerge$$anon$4$$anon$6.onPush(StreamOfStreams.scala:51)
    at akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:587)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:598)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:539)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:469)
    at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:421)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:593)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:584)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:605)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:482)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:519)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

@drewhk drewhk added this to the 2.4.x milestone Apr 4, 2016

@drewhk drewhk changed the title from Source.flatMapMerge of an 1M source into Singles runs slow to Fusing should be optimized Apr 4, 2016

@drewhk

This comment has been minimized.

Show comment
Hide comment
@drewhk

drewhk Apr 4, 2016

Member

Yep, fusing as expected. I changed the name of the ticket. Thanks for reporting!

Member

drewhk commented Apr 4, 2016

Yep, fusing as expected. I changed the name of the ticket. Thanks for reporting!

@johanandren

This comment has been minimized.

Show comment
Hide comment
@johanandren

johanandren Sep 6, 2017

Member

Closing this as we rewrote the materialiser since.

Member

johanandren commented Sep 6, 2017

Closing this as we rewrote the materialiser since.

@johanandren johanandren closed this Sep 6, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment