From 5f23219f7dcdde508fd0585c6bac9f859f892cc9 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 16 Oct 2013 17:38:15 +0200 Subject: [PATCH] ! io: improve DynamicPipelines trait - introduce `DynamicPipelines.State` type alias - remove `initialPipelines` method - introduce `process` method for easy wrapping of pipeline stage execution --- .../scala/spray/io/BackPressureHandling.scala | 11 +++++------ .../scala/spray/io/ConnectionTimeouts.scala | 6 +++--- .../src/main/scala/spray/io/Pipelines.scala | 19 ++++++++----------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/spray-io/src/main/scala/spray/io/BackPressureHandling.scala b/spray-io/src/main/scala/spray/io/BackPressureHandling.scala index b4c28d2447..de999ca2b5 100644 --- a/spray-io/src/main/scala/spray/io/BackPressureHandling.scala +++ b/spray-io/src/main/scala/spray/io/BackPressureHandling.scala @@ -56,8 +56,7 @@ object BackPressureHandling { new DynamicPipelines { effective ⇒ import context.log - def initialPipeline = - writeThrough(new OutQueue(ackRate), isReading = true, closeCommand = None) + become(writeThrough(new OutQueue(ackRate), isReading = true, closeCommand = None)) /** * In this state all incoming write requests have already been relayed to the connection. There's a buffer @@ -66,7 +65,7 @@ object BackPressureHandling { * Invariant: * * we've not experienced any failed writes */ - def writeThrough(out: OutQueue, isReading: Boolean, closeCommand: Option[Tcp.CloseCommand]): Pipelines = new Pipelines { + def writeThrough(out: OutQueue, isReading: Boolean, closeCommand: Option[Tcp.CloseCommand]): State = new State { def resumeReading(): Unit = { commandPL(Tcp.ResumeReading) become(writeThrough(out, isReading = true, closeCommand)) @@ -128,13 +127,13 @@ object BackPressureHandling { * The state where writing is suspended and we are waiting for WritingResumed. Reading will be suspended * if it currently isn't and if the connection isn't already going to be closed. */ - def buffering(out: OutQueue, failedSeq: Int, isReading: Boolean, closeCommand: Option[CloseCommand]): Pipelines = { + def buffering(out: OutQueue, failedSeq: Int, isReading: Boolean, closeCommand: Option[CloseCommand]): State = { def isClosing = closeCommand.isDefined if (!isClosing && isReading) { commandPL(Tcp.SuspendReading) buffering(out, failedSeq, isReading = false, closeCommand) - } else new Pipelines { + } else new State { def commandPipeline = { case w: Tcp.Write ⇒ if (isClosing) log.warning("Can't process more writes when closing. Dropping...") @@ -177,7 +176,7 @@ object BackPressureHandling { } } - def closed(): Pipelines = new Pipelines { + def closed(): State = new State { def commandPipeline = { case c @ (_: Tcp.Write | _: Tcp.CloseCommand) ⇒ log.warning(s"Connection is already closed, dropping command $c") case c ⇒ commandPL(c) diff --git a/spray-io/src/main/scala/spray/io/ConnectionTimeouts.scala b/spray-io/src/main/scala/spray/io/ConnectionTimeouts.scala index c486546902..f3d805e157 100644 --- a/spray-io/src/main/scala/spray/io/ConnectionTimeouts.scala +++ b/spray-io/src/main/scala/spray/io/ConnectionTimeouts.scala @@ -41,9 +41,9 @@ object ConnectionTimeouts { var idleDeadline = Timestamp.never def resetDeadline() = idleDeadline = Timestamp.now + timeout - def initialPipeline = atWork(writePossiblyPending = false) + become(atWork(writePossiblyPending = false)) - def atWork(writePossiblyPending: Boolean): Pipelines = new Pipelines { + def atWork(writePossiblyPending: Boolean): State = new State { resetDeadline() val commandPipeline: CPL = { case write: Tcp.WriteCommand ⇒ @@ -64,7 +64,7 @@ object ConnectionTimeouts { case ev ⇒ eventPL(ev) } } - def checkForPendingWrite(): Pipelines = new Pipelines { + def checkForPendingWrite(): State = new State { resetDeadline() commandPL(TestWrite) diff --git a/spray-io/src/main/scala/spray/io/Pipelines.scala b/spray-io/src/main/scala/spray/io/Pipelines.scala index ab70a2d836..82c6447285 100644 --- a/spray-io/src/main/scala/spray/io/Pipelines.scala +++ b/spray-io/src/main/scala/spray/io/Pipelines.scala @@ -40,18 +40,15 @@ object Pipelines { } trait DynamicPipelines extends Pipelines { - def initialPipeline: Pipelines - private[this] var _cpl: Pipeline[Command] = _ - private[this] var _epl: Pipeline[Event] = _ + type State = Pipelines + private[this] var _cpl: Pipeline[Command] = Pipeline.Uninitialized + private[this] var _epl: Pipeline[Event] = Pipeline.Uninitialized + + def commandPipeline = cmd ⇒ process(cmd, _cpl) + def eventPipeline = ev ⇒ process(ev, _epl) + + protected def process[T](msg: T, pl: Pipeline[T]): Unit = pl(msg) - def commandPipeline = { - if (_cpl eq null) become(initialPipeline) - cmd ⇒ _cpl(cmd) - } - def eventPipeline = { - if (_epl eq null) become(initialPipeline) - event ⇒ _epl(event) - } def become(newPipes: Pipelines): Unit = { _cpl = newPipes.commandPipeline _epl = newPipes.eventPipeline