Skip to content

Commit

Permalink
Source location in a few more stages (#30039)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Feb 22, 2021
1 parent 59364c0 commit 8dfb4a7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
1 change: 1 addition & 0 deletions akka-stream/src/main/scala/akka/stream/impl/Stages.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ import akka.stream.Attributes._
val queueSink = name("queueSink")
val lazySink = name("lazySink")
val lazyFlow = name("lazyFlow")
val futureFlow = name("futureFlow")
val lazySource = name("lazySource")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package akka.stream.impl.fusing
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal

import akka.annotation.InternalApi
import akka.stream.Attributes.SourceLocation
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.scaladsl.{ Flow, Keep, Source }
Expand All @@ -20,11 +20,11 @@ import akka.util.OptionVal

require(n >= 0, s"FlatMapPrefix: n ($n) must be non-negative.")

val in = Inlet[In](s"${this}.in")
val out = Outlet[Out](s"${this}.out")
val in = Inlet[In]("FlatMapPrefix.in")
val out = Outlet[Out]("FlatMapPrefix.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)

override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix
override def initialAttributes: Attributes = DefaultAttributes.flatMapPrefix and SourceLocation.forLambda(f)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
val propagateToNestedMaterialization =
Expand Down Expand Up @@ -106,7 +106,7 @@ import akka.util.OptionVal
try {
val prefix = accumulated.toVector
accumulated.clear()
subSource = OptionVal.Some(new SubSourceOutlet[In](s"${this}.subSource"))
subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource"))
val OptionVal.Some(theSubSource) = subSource
theSubSource.setHandler {
new OutHandler {
Expand All @@ -123,7 +123,7 @@ import akka.util.OptionVal
}
}
}
subSink = OptionVal.Some(new SubSinkInlet[Out](s"${this}.subSink"))
subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))
val OptionVal.Some(theSubSink) = subSink
theSubSink.setHandler {
new InHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package akka.stream.impl.fusing

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.stream.Attributes.SourceLocation
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{ AbruptStageTerminationException, Attributes, FlowShape, Inlet, NeverMaterializedException, Outlet }
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler, OutHandler }
Expand All @@ -17,8 +19,13 @@ import scala.util.{ Failure, Success, Try }

@InternalApi private[akka] final class FutureFlow[In, Out, M](futureFlow: Future[Flow[In, Out, M]])
extends GraphStageWithMaterializedValue[FlowShape[In, Out], Future[M]] {
val in = Inlet[In](s"${this}.in")
val out = Outlet[Out](s"${this}.out")

val in = Inlet[In]("FutureFlow.in")
val out = Outlet[Out]("FutureFlow.out")

override protected def initialAttributes: Attributes =
DefaultAttributes.futureFlow and SourceLocation.forLambda(futureFlow)

override val shape: FlowShape[In, Out] = FlowShape(in, out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
Expand Down Expand Up @@ -87,8 +94,8 @@ import scala.util.{ Failure, Success, Try }
}

def connect(flow: Flow[In, Out, M]): Unit = {
val subSource = new SubSourceOutlet[In](s"${FutureFlow.this}.subIn")
val subSink = new SubSinkInlet[Out](s"${FutureFlow.this}.subOut")
val subSource = new SubSourceOutlet[In]("FutureFlow.subIn")
val subSink = new SubSinkInlet[Out]("FutureFlow.subOut")

subSource.setHandler {
new OutHandler {
Expand Down

0 comments on commit 8dfb4a7

Please sign in to comment.