Skip to content

Commit

Permalink
core: simplify HeaderCompression (#3871)
Browse files Browse the repository at this point in the history
Co-authored-by: Arnout Engelen <arnout@engelen.eu>
  • Loading branch information
jrudolph and raboof committed Oct 12, 2021
1 parent d91964e commit 61d3b3f
Showing 1 changed file with 45 additions and 47 deletions.
Expand Up @@ -9,7 +9,7 @@ import akka.annotation.InternalApi
import akka.http.impl.engine.http2.Http2Protocol.SettingIdentifier
import akka.http.impl.engine.http2._
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging }
import akka.util.ByteString

import scala.collection.immutable
Expand All @@ -25,60 +25,58 @@ private[http2] object HeaderCompression extends GraphStage[FlowShape[FrameEvent,

val shape = FlowShape(eventsIn, eventsOut)

def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new HandleOrPassOnStage[FrameEvent, FrameEvent](shape) with StageLogging {
val currentMaxFrameSize = Http2Protocol.InitialMaxFrameSize
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging with InHandler with OutHandler { logic =>
setHandlers(eventsIn, eventsOut, this)
private val currentMaxFrameSize = Http2Protocol.InitialMaxFrameSize

val encoder = new akka.http.shaded.com.twitter.hpack.Encoder(Http2Protocol.InitialMaxHeaderTableSize)
val os = new ByteArrayOutputStream(128)

become(Idle)

object Idle extends State {
val handleEvent: PartialFunction[FrameEvent, Unit] = {
case ack @ SettingsAckFrame(s) =>
applySettings(s)
push(eventsOut, ack)

case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo) =>
kvs.foreach {
case (key, value: String) =>
encoder.encodeHeader(os, key, value, false)
case (key, value) =>
throw new IllegalStateException(s"Didn't expect key-value-pair [$key] -> [$value](${value.getClass}) here.")
}
val result = ByteString.fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy
os.reset()
if (result.size <= currentMaxFrameSize) push(eventsOut, HeadersFrame(streamId, endStream, endHeaders = true, result, prioInfo))
else {
val first = HeadersFrame(streamId, endStream, endHeaders = false, result.take(currentMaxFrameSize), prioInfo)
def onPull(): Unit = pull(eventsIn)
def onPush(): Unit = grab(eventsIn) match {
case ack @ SettingsAckFrame(s) =>
applySettings(s)
push(eventsOut, ack)
case ParsedHeadersFrame(streamId, endStream, kvs, prioInfo) =>
kvs.foreach {
case (key, value: String) =>
encoder.encodeHeader(os, key, value, false)
case (key, value) =>
throw new IllegalStateException(s"Didn't expect key-value-pair [$key] -> [$value](${value.getClass}) here.")
}
val result = ByteString.fromArrayUnsafe(os.toByteArray) // BAOS.toByteArray always creates a copy
os.reset()
if (result.size <= currentMaxFrameSize) push(eventsOut, HeadersFrame(streamId, endStream, endHeaders = true, result, prioInfo))
else {
val first = HeadersFrame(streamId, endStream, endHeaders = false, result.take(currentMaxFrameSize), prioInfo)

emit(eventsOut, first)
setHandler(eventsOut, new OutHandler {
var remainingData = result.drop(currentMaxFrameSize)
push(eventsOut, first)
setHandler(eventsOut, new OutHandler {
private var remainingData = result.drop(currentMaxFrameSize)

def onPull(): Unit = {
val thisFragment = remainingData.take(currentMaxFrameSize)
val rest = remainingData.drop(currentMaxFrameSize)
val last = rest.isEmpty
def onPull(): Unit = {
val thisFragment = remainingData.take(currentMaxFrameSize)
val rest = remainingData.drop(currentMaxFrameSize)
val last = rest.isEmpty

push(eventsOut, ContinuationFrame(streamId, endHeaders = last, thisFragment))
if (last) become(Idle)
else remainingData = rest
}
})
}
}

def applySettings(s: immutable.Seq[Setting]): Unit =
s foreach {
case Setting(SettingIdentifier.SETTINGS_HEADER_TABLE_SIZE, size) =>
log.debug("Applied SETTINGS_HEADER_TABLE_SIZE({}) in header compression", size)
// 'size' is strictly spoken unsigned, but the encoder is allowed to
// pick any size equal to or less than this value (6.5.2)
if (size >= 0) encoder.setMaxHeaderTableSize(os, size)
else encoder.setMaxHeaderTableSize(os, Int.MaxValue)
case _ => // ignore, not applicable to this stage
push(eventsOut, ContinuationFrame(streamId, endHeaders = last, thisFragment))
if (last) setHandler(eventsOut, logic)
else remainingData = rest
}
})
}
case x => push(eventsOut, x)
}

def applySettings(s: immutable.Seq[Setting]): Unit =
s foreach {
case Setting(SettingIdentifier.SETTINGS_HEADER_TABLE_SIZE, size) =>
log.debug("Applied SETTINGS_HEADER_TABLE_SIZE({}) in header compression", size)
// 'size' is strictly spoken unsigned, but the encoder is allowed to
// pick any size equal to or less than this value (6.5.2)
if (size >= 0) encoder.setMaxHeaderTableSize(os, size)
else encoder.setMaxHeaderTableSize(os, Int.MaxValue)
case _ => // ignore, not applicable to this stage
}
}
}

0 comments on commit 61d3b3f

Please sign in to comment.