Skip to content

Commit

Permalink
=str - Adds finals and reorganizes some StreamOfStreams code
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorklang committed Dec 9, 2015
1 parent 654fa41 commit 38fa306
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.actor.ActorPublisherMessage
import akka.stream.actor.ActorPublisherMessage._
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.{ util ju }
import scala.concurrent._
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext

final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source[T, M], T]] {
private val in = Inlet[Source[T, M]]("flatten.in")
Expand All @@ -22,16 +20,18 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source

override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {

import StreamOfStreams.{ LocalSink, LocalSource }

var sources = Set.empty[LocalSource[T]]
def activeSources = sources.size

private trait Queue {
private sealed trait Queue {
def hasData: Boolean
def enqueue(src: LocalSource[T]): Unit
def dequeue(): LocalSource[T]
}

private class FixedQueue extends Queue {
private final class FixedQueue extends Queue {
final val Size = 16
final val Mask = 15

Expand Down Expand Up @@ -59,7 +59,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source
}
}

private class DynamicQueue extends ju.LinkedList[LocalSource[T]] with Queue {
private final class DynamicQueue extends ju.LinkedList[LocalSource[T]] with Queue {
def hasData = !isEmpty()
def enqueue(src: LocalSource[T]): Unit = add(src)
def dequeue(): LocalSource[T] = remove()
Expand Down Expand Up @@ -131,83 +131,84 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source
}
}

// TODO possibly place the Local* classes in a companion object depending on where they are reused

/**
* INTERNAL API
*/
private[fusing] final class LocalSinkSubscription[T](sub: ActorPublisherMessage Unit) {
def pull(): Unit = sub(Request(1))
def cancel(): Unit = sub(Cancel)
}

/**
* INTERNAL API
*/
private[fusing] final class LocalSource[T] {
private var subF: Future[LocalSinkSubscription[T]] = _
private var sub: LocalSinkSubscription[T] = _
private[fusing] object StreamOfStreams {
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
private val RequestOne = Request(1) // No need to frivolously allocate these
private type LocalSinkSubscription = ActorPublisherMessage Unit
/**
* INTERNAL API
*/
private[fusing] final class LocalSource[T] {
private var subF: Future[LocalSinkSubscription] = _
private var sub: LocalSinkSubscription = _

var elem: T = null.asInstanceOf[T]

def isActive: Boolean = sub ne null

def deactivate(): Unit = {
sub = null
subF = null
}

var elem: T = null.asInstanceOf[T]
def activate(f: Future[LocalSinkSubscription]): Unit = {
subF = f
/*
* The subscription is communicated to the FlattenMerge stage by way of completing
* the future. Encoding it like this means that the `sub` field will be written
* either by us (if the future has already been completed) or by the LocalSink (when
* it eventually completes the future in its `preStart`). The important part is that
* either way the `sub` field is populated before we get the first `OnNext` message
* and the value is safely published in either case as well (since AsyncCallback is
* based on an Actor message send).
*/
f.foreach(s sub = s)(sameThreadExecutionContext)
}

def isActive: Boolean = sub ne null
def deactivate(): Unit = {
sub = null
subF = null
}
def activate(f: Future[LocalSinkSubscription[T]]): Unit = {
subF = f
/*
* The subscription is communicated to the FlattenMerge stage by way of completing
* the future. Encoding it like this means that the `sub` field will be written
* either by us (if the future has already been completed) or by the LocalSink (when
* it eventually completes the future in its `preStart`). The important part is that
* either way the `sub` field is populated before we get the first `OnNext` message
* and the value is safely published in either case as well (since AsyncCallback is
* based on an Actor message send).
*/
f.foreach(s sub = s)(sameThreadExecutionContext)
}
def pull(): Unit = {
if (sub ne null) sub(RequestOne)
else throw new IllegalStateException("not yet initialized, subscription future has " + subF.value)
}

def pull(): Unit = {
if (sub eq null)
throw new IllegalStateException("not yet initialized, subscription future has " + subF.value)
sub.pull()
def cancel(): Unit =
if (subF ne null)
subF.foreach(_(Cancel))(sameThreadExecutionContext)
}

def cancel(): Unit =
if (subF ne null)
subF.foreach(_.cancel())(sameThreadExecutionContext)
}

/**
* INTERNAL API
*/
private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage Unit)
extends GraphStageWithMaterializedValue[SinkShape[T], Future[LocalSinkSubscription[T]]] {

private val in = Inlet[T]("LocalSink.in")
override val shape = SinkShape(in)

override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[LocalSinkSubscription[T]]) = {
val sub = Promise[LocalSinkSubscription[T]]
val logic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = notifier(OnNext(grab(in)))
override def onUpstreamFinish(): Unit = notifier(OnComplete)
override def onUpstreamFailure(ex: Throwable): Unit = notifier(OnError(ex))
})

override def preStart(): Unit = {
pull(in)
sub.success(
new LocalSinkSubscription[T](getAsyncCallback[ActorPublisherMessage] {
case Request(1) tryPull(in)
case Cancel completeStage()
case _
}.invoke))
/**
* INTERNAL API
*/
private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage Unit)
extends GraphStageWithMaterializedValue[SinkShape[T], Future[LocalSinkSubscription]] {

private val in = Inlet[T]("LocalSink.in")
override val shape = SinkShape(in)

override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[LocalSinkSubscription]) = {
val sub = Promise[LocalSinkSubscription]
val logic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = notifier(OnNext(grab(in)))

override def onUpstreamFinish(): Unit = notifier(OnComplete)

override def onUpstreamFailure(ex: Throwable): Unit = notifier(OnError(ex))
})

override def preStart(): Unit = {
pull(in)
sub.success(
getAsyncCallback[ActorPublisherMessage] {
case RequestOne tryPull(in)
case Cancel completeStage()
case _ throw new IllegalStateException("Bug")
}.invoke)
}
}
logic -> sub.future
}
logic -> sub.future
}
}
}
14 changes: 7 additions & 7 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Merge {
*
* '''Cancels when''' downstream cancels
*/
class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Merge must have more than 1 input port")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
Expand Down Expand Up @@ -139,7 +139,7 @@ object MergePreferred {
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
override val shape: MergePreferred.MergePreferredShape[T] =
new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred")
Expand Down Expand Up @@ -252,7 +252,7 @@ object Broadcast {
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*
*/
class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
require(outputPorts > 1, "A Broadcast must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Broadast.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Broadcast.out" + i))
Expand Down Expand Up @@ -350,7 +350,7 @@ object Balance {
*
* '''Cancels when''' all downstreams cancel
*/
class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
require(outputPorts > 1, "A Balance must have more than 1 output ports")
val in: Inlet[T] = Inlet[T]("Balance.in")
val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i Outlet[T]("Balance.out" + i))
Expand Down Expand Up @@ -437,7 +437,7 @@ object Zip {
*
* '''Cancels when''' downstream cancels
*/
class Zip[A, B] extends ZipWith2[A, B, (A, B)](Pair.apply) {
final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Pair.apply) {
override def toString = "Zip"
}

Expand Down Expand Up @@ -477,7 +477,7 @@ object Unzip {
/**
* Combine the elements of multiple streams into a stream of the combined elements.
*/
class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIdentityFunction) {
final class Unzip[A, B]() extends UnzipWith2[(A, B), A, B](ConstantFun.scalaIdentityFunction) {
override def toString = "Unzip"
}

Expand Down Expand Up @@ -516,7 +516,7 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Concat must have more than 1 input ports")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
Expand Down

0 comments on commit 38fa306

Please sign in to comment.