Skip to content

Commit

Permalink
=ht2 consolidate http2 switches and instantiation code in Http2
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed Jun 6, 2019
1 parent a56564c commit 6164fd8
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 324 deletions.
Expand Up @@ -18,35 +18,29 @@ import akka.stream._
/** INTERNAL API */
@InternalApi
private[http] object AlpnSwitch {
type HttpServerBidiFlow = BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed]
type HttpServerFlow = Flow[SslTlsInbound, SslTlsOutbound, NotUsed]

def apply(
chosenProtocolAccessor: () => String,
http1Stack: HttpServerBidiFlow,
http2Stack: HttpServerBidiFlow): HttpServerBidiFlow =
BidiFlow.fromGraph(
new GraphStage[BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest]] {
chosenProtocolAccessor: SessionBytes => String,
http1Stack: HttpServerFlow,
http2Stack: HttpServerFlow): HttpServerFlow =
Flow.fromGraph(
new GraphStage[FlowShape[SslTlsInbound, SslTlsOutbound]] {

// --- outer ports ---
val netIn = Inlet[SslTlsInbound]("AlpnSwitch.netIn")
val netOut = Outlet[SslTlsOutbound]("AlpnSwitch.netOut")

val requestOut = Outlet[HttpRequest]("AlpnSwitch.requestOut")
val responseIn = Inlet[HttpResponse]("AlpnSwitch.responseIn")
// --- end of outer ports ---

val shape: BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest] =
BidiShape(responseIn, netOut, netIn, requestOut)
val shape: FlowShape[SslTlsInbound, SslTlsOutbound] =
FlowShape(netIn, netOut)

def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
logic =>

// --- inner ports, bound to actual server in install call ---
val serverDataIn = new SubSinkInlet[SslTlsOutbound]("ServerImpl.netIn")
val serverDataOut = new SubSourceOutlet[SslTlsInbound]("ServerImpl.netOut")

val serverRequestIn = new SubSinkInlet[HttpRequest]("ServerImpl.serverRequestIn")
val serverResponseOut = new SubSourceOutlet[HttpResponse]("ServerImpl.serverResponseOut")
// --- end of inner ports ---

override def preStart(): Unit = pull(netIn)
Expand All @@ -55,7 +49,7 @@ private[http] object AlpnSwitch {
def onPush(): Unit =
grab(netIn) match {
case first @ SessionBytes(session, bytes) =>
val chosen = chosenProtocolAccessor()
val chosen = chosenProtocolAccessor(first)
chosen match {
case "h2" => install(http2Stack.addAttributes(HttpAttributes.tlsSessionInfo(session)), first)
case _ => install(http1Stack, first)
Expand All @@ -68,23 +62,17 @@ private[http] object AlpnSwitch {
private val failPush = new InHandler { def onPush(): Unit = throw new IllegalStateException("Wasn't pulled yet") }

setHandler(netOut, ignorePull)
setHandler(requestOut, ignorePull)
setHandler(responseIn, failPush)

def install(serverImplementation: HttpServerBidiFlow, firstElement: SslTlsInbound): Unit = {
def install(serverImplementation: HttpServerFlow, firstElement: SslTlsInbound): Unit = {
val networkSide = Flow.fromSinkAndSource(serverDataIn.sink, serverDataOut.source)
val userSide = Flow.fromSinkAndSource(serverRequestIn.sink, serverResponseOut.source)

connect(netIn, serverDataOut, Some(firstElement))
connect(responseIn, serverResponseOut, None)

connect(serverDataIn, netOut)
connect(serverRequestIn, requestOut)

serverImplementation
.addAttributes(inheritedAttributes) // propagate attributes to "real" server (such as HttpAttributes)
.join(networkSide)
.join(userSide)
.run()(interpreter.subFusingMaterializer)
}

Expand Down
Expand Up @@ -11,6 +11,7 @@ import akka.http.impl.engine.http2.framing.{ Http2FrameParsing, Http2FrameRender
import akka.http.impl.engine.http2.hpack.{ HeaderCompression, HeaderDecompression }
import akka.http.impl.engine.parsing.HttpHeaderParser
import akka.http.impl.util.StreamUtils
import akka.http.impl.util.LogByteStringTools.logTLSBidiBySetting
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.http2.Http2StreamIdHeader
import akka.http.scaladsl.settings.{ Http2ServerSettings, ParserSettings, ServerSettings }
Expand All @@ -20,6 +21,8 @@ import akka.util.ByteString
import scala.concurrent.{ ExecutionContext, Future }

import FrameEvent._
import akka.http.scaladsl.Http2
import akka.stream.TLSProtocol._

/**
* Represents one direction of an Http2 substream.
Expand All @@ -44,6 +47,11 @@ private[http2] final case class ChunkedHttp2SubStream(
@InternalApi
private[http] object Http2Blueprint {

def serverStackTls(settings: ServerSettings, log: LoggingAdapter): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
serverStack(settings, log) atop
unwrapTls atop
logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)

// format: OFF
def serverStack(settings: ServerSettings, log: LoggingAdapter): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, NotUsed] =
httpLayer(settings, log) atop
Expand Down Expand Up @@ -124,4 +132,9 @@ private[http] object Http2Blueprint {
case ParserSettings.ErrorLoggingVerbosity.Simple => log.warning(info.summary)
case ParserSettings.ErrorLoggingVerbosity.Full => log.warning(info.formatPretty)
}

private[http2] val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] =
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect {
case SessionBytes(_, bytes) => bytes
})
}

This file was deleted.

0 comments on commit 6164fd8

Please sign in to comment.