Skip to content

Commit

Permalink
Merge pull request #57 from http4s/stages-improvements
Browse files Browse the repository at this point in the history
Clean up a few warts of the Stages api
  • Loading branch information
rossabaker committed Jan 10, 2017
2 parents d693777 + e3807c0 commit f13edc7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
43 changes: 36 additions & 7 deletions core/src/main/scala/org/http4s/blaze/pipeline/Stages.scala
Expand Up @@ -79,30 +79,35 @@ sealed trait Tail[I] extends Stage {
} catch { case t: Throwable => return Future.failed(t) }
}

/** Write a single outbound message to the pipeline */
def channelWrite(data: I): Future[Unit] = {
if (_prevStage != null) {
try _prevStage.writeRequest(data)
catch { case t: Throwable => Future.failed(t) }
} else _stageDisconnected()
}

/** Write a single outbound message to the pipeline with a timeout */
final def channelWrite(data: I, timeout: Duration): Future[Unit] = {
val f = channelWrite(data)
checkTimeout(timeout, f)
}

/** Write a collection of outbound messages to the pipeline */
def channelWrite(data: Seq[I]): Future[Unit] = {
if (_prevStage != null) {
try _prevStage.writeRequest(data)
catch { case t: Throwable => Future.failed(t) }
} else _stageDisconnected()
}

/** Write a collection of outbound messages to the pipeline with a timeout */
final def channelWrite(data: Seq[I], timeout: Duration): Future[Unit] = {
val f = channelWrite(data)
checkTimeout(timeout, f)
}

/** Insert the `MidStage` before this `Stage` */
final def spliceBefore(stage: MidStage[I, I]): Unit = {
if (_prevStage != null) {
stage._prevStage = _prevStage
Expand All @@ -116,6 +121,7 @@ sealed trait Tail[I] extends Stage {
}
}

/** Send a command to the next outbound `Stage` of the pipeline */
final def sendOutboundCommand(cmd: OutboundCommand): Unit = {
logger.debug(s"Stage ${getClass.getSimpleName} sending outbound command: $cmd")
if (_prevStage != null) {
Expand All @@ -128,6 +134,7 @@ sealed trait Tail[I] extends Stage {
}
}

/** Find the next outbound `Stage` with the given name, if it exists. */
final def findOutboundStage(name: String): Option[Stage] = {
if (this.name == name) Some(this)
else if (_prevStage == null) None
Expand All @@ -138,6 +145,7 @@ sealed trait Tail[I] extends Stage {
}
}

/** Find the next outbound `Stage` of type `C`, if it exists. */
final def findOutboundStage[C <: Stage](clazz: Class[C]): Option[C] = {
if (clazz.isAssignableFrom(this.getClass)) Some(this.asInstanceOf[C])
else if (_prevStage == null) None
Expand All @@ -149,7 +157,11 @@ sealed trait Tail[I] extends Stage {
}
}

final def replaceInline(leafBuilder: LeafBuilder[I], startup: Boolean = true): this.type = {
/** Replace all downstream `Stage`s, including this `Stage`.
*
* If this was a `MidStage`, its inbound element is notified via a `Disconnected` Command.
*/
final def replaceTail(leafBuilder: LeafBuilder[I], startup: Boolean): this.type = {
stageShutdown()

if (this._prevStage == null) return this
Expand Down Expand Up @@ -217,21 +229,35 @@ sealed trait Tail[I] extends Stage {
sealed trait Head[O] extends Stage {
private[pipeline] var _nextStage: Tail[O] = null

/** Called by the next inbound `Stage` to signal interest in reading data.
*
* @param size Hint as to the size of the message intended to be read. May not be meaningful or honored.
* @return `Future` that will resolve with the requested inbound data, or an error.
*/
def readRequest(size: Int): Future[O]

/** Data that the next inbound `Stage` wants to send outbound.
*
* @return a `Future` that resolves when the data has been handled.
*/
def writeRequest(data: O): Future[Unit]

/** A simple default that serializes the write requests into the
* single element form. It almost certainly should be overwritten
* @param data sequence of elements which are to be written
* @return Future which will resolve when pipeline is ready for more data or fails
/** Collection of data that the next inbound `Stage` wants to sent outbound.
*
* It is generally assumed that the order of elements has meaning.
* @return a `Future` that resolves when the data has been handled.
*/
def writeRequest(data: Seq[O]): Future[Unit] = {
data.foldLeft[Future[Unit]](Future.successful(())){ (f, d) =>
f.flatMap(_ => writeRequest(d))(directec)
}
}

/** Replace all remaining inbound `Stage`s of the pipeline, not including this `Stage`. */
final def replaceNext(stage: LeafBuilder[O], startup: Boolean): Tail[O] =
_nextStage.replaceTail(stage, startup)

/** Send a command to the next inbound `Stage` of the pipeline */
final def sendInboundCommand(cmd: InboundCommand): Unit = {
logger.debug(s"Stage ${getClass.getSimpleName} sending inbound command: $cmd")
if (_nextStage != null) {
Expand Down Expand Up @@ -260,6 +286,7 @@ sealed trait Head[O] extends Stage {
case cmd => logger.warn(s"$name received unhandled outbound command: $cmd")
}

/** Insert the `MidStage` after `this` */
final def spliceAfter(stage: MidStage[O, O]): Unit = {
if (_nextStage != null) {
stage._nextStage = _nextStage
Expand All @@ -273,6 +300,7 @@ sealed trait Head[O] extends Stage {
}
}

/** Find the next outbound `Stage` with the given name, if it exists. */
final def findInboundStage(name: String): Option[Stage] = {
if (this.name == name) Some(this)
else if (_nextStage == null) None
Expand All @@ -282,6 +310,7 @@ sealed trait Head[O] extends Stage {
}
}

/** Find the next inbound `Stage` of type `C`, if it exists. */
final def findInboundStage[C <: Stage](clazz: Class[C]): Option[C] = {
if (clazz.isAssignableFrom(this.getClass)) Some(this.asInstanceOf[C])
else if (_nextStage == null) None
Expand All @@ -307,6 +336,7 @@ trait MidStage[I, O] extends Tail[I] with Head[O] {
sendOutboundCommand(cmd)
}

/** Replace this `MidStage` with the provided `MidStage` of the same type */
final def replaceInline(stage: MidStage[I, O]): this.type = {
stageShutdown()

Expand All @@ -320,8 +350,7 @@ trait MidStage[I, O] extends Tail[I] with Head[O] {
this
}

final def replaceNext(stage: LeafBuilder[O]): Tail[O] = _nextStage.replaceInline(stage)

/** Remove this `MidStage` from the pipeline */
final def removeStage(implicit ev: MidStage[I,O] =:= MidStage[I, I]): Unit = {
stageShutdown()

Expand Down
Expand Up @@ -165,7 +165,7 @@ class HttpServerStage(maxReqBody: Long, maxNonBody: Int, ec: ExecutionContext)(h
channelWrite(ByteBuffer.wrap(sb.result().getBytes(StandardCharsets.ISO_8859_1))).map{ _ =>
logger.debug("Switching pipeline segments for upgrade")
val segment = wsBuilder.prepend(new WebSocketDecoder(false))
this.replaceInline(segment)
this.replaceTail(segment, true)
Upgrade
}
}
Expand Down
Expand Up @@ -42,7 +42,7 @@ class ALPNSelector(engine: SSLEngine,
private def selectPipeline(): Unit = {
try {
val b = builder(selected.getOrElse(selector(Nil)))
this.replaceInline(b, true)
this.replaceTail(b, true)
} catch {
case t: Throwable =>
logger.error(t)("Failure building pipeline")
Expand Down

0 comments on commit f13edc7

Please sign in to comment.