Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace QuietTimeoutStage with IdleTimeoutStage in server #2244

Merged
merged 2 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final private[http4s] class IdleTimeoutStage[A](
override def run(): Unit = {
val t = new TimeoutException(s"Idle timeout after ${timeout.toMillis} ms.")
logger.debug(t.getMessage)
cb(Left(t))
cb(Right(t))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets us distinguish our timeout from someone else's timeout.

removeStage()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import org.http4s.blaze.channel.SocketConnection
import org.http4s.blaze.channel.nio1.NIO1SocketServerGroup
import org.http4s.blaze.channel.nio2.NIO2SocketServerGroup
import org.http4s.blaze.http.http2.server.ALPNServerSelector
import org.http4s.syntax.all._
import org.http4s.blaze.pipeline.LeafBuilder
import org.http4s.blaze.pipeline.stages.{QuietTimeoutStage, SSLStage}
import org.http4s.blazecore.tickWheelResource
import org.http4s.server.SSLKeyStoreSupport.StoreInfo
import org.http4s.syntax.all._
import org.log4s.getLogger
import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -175,7 +176,7 @@ class BlazeBuilder[F[_]](
def withBanner(banner: immutable.Seq[String]): Self =
copy(banner = banner)

def resource: Resource[F, Server[F]] =
def resource: Resource[F, Server[F]] = tickWheelResource.flatMap { scheduler =>
Resource(F.delay {
val aggregateService: HttpApp[F] =
Router(serviceMounts.map(mount => mount.prefix -> mount.service): _*).orNotFound
Expand Down Expand Up @@ -210,7 +211,9 @@ class BlazeBuilder[F[_]](
maxRequestLineLen,
maxHeadersLen,
serviceErrorHandler,
Duration.Inf
Duration.Inf,
idleTimeout,
scheduler
)

def http2Stage(engine: SSLEngine): ALPNServerSelector =
Expand All @@ -222,7 +225,9 @@ class BlazeBuilder[F[_]](
requestAttributes(secure = true),
executionContext,
serviceErrorHandler,
Duration.Inf
Duration.Inf,
idleTimeout,
scheduler
)

def prependIdleTimeout(lb: LeafBuilder[ByteBuffer]) =
Expand Down Expand Up @@ -289,6 +294,7 @@ class BlazeBuilder[F[_]](

server -> shutdown
})
}

private def getContext(): Option[(SSLContext, Boolean)] = sslBits.map {
case KeyStoreBits(keyStore, keyManagerPassword, protocol, trustStore, clientAuth) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import org.http4s.blaze.channel.nio1.NIO1SocketServerGroup
import org.http4s.blaze.channel.nio2.NIO2SocketServerGroup
import org.http4s.blaze.http.http2.server.ALPNServerSelector
import org.http4s.blaze.pipeline.LeafBuilder
import org.http4s.blaze.pipeline.stages.{QuietTimeoutStage, SSLStage}
import org.http4s.blaze.pipeline.stages.SSLStage
import org.http4s.blazecore.tickWheelResource
import org.http4s.server.SSLKeyStoreSupport.StoreInfo
import org.log4s.getLogger
import scala.collection.immutable
Expand Down Expand Up @@ -175,7 +176,7 @@ class BlazeServerBuilder[F[_]](
def withBanner(banner: immutable.Seq[String]): Self =
copy(banner = banner)

def resource: Resource[F, Server[F]] =
def resource: Resource[F, Server[F]] = tickWheelResource.flatMap { scheduler =>
Resource(F.delay {

def resolveAddress(address: InetSocketAddress) =
Expand Down Expand Up @@ -208,7 +209,9 @@ class BlazeServerBuilder[F[_]](
maxRequestLineLen,
maxHeadersLen,
serviceErrorHandler,
responseHeaderTimeout
responseHeaderTimeout,
idleTimeout,
scheduler
)

def http2Stage(engine: SSLEngine): ALPNServerSelector =
Expand All @@ -220,33 +223,27 @@ class BlazeServerBuilder[F[_]](
requestAttributes(secure = true),
executionContext,
serviceErrorHandler,
responseHeaderTimeout
responseHeaderTimeout,
idleTimeout,
scheduler
)

def prependIdleTimeout(lb: LeafBuilder[ByteBuffer]) =
if (idleTimeout.isFinite) lb.prepend(new QuietTimeoutStage[ByteBuffer](idleTimeout))
else lb

Future.successful {
getContext() match {
case Some((ctx, clientAuth)) =>
val engine = ctx.createSSLEngine()
engine.setUseClientMode(false)
engine.setNeedClientAuth(clientAuth)

var lb = LeafBuilder(
LeafBuilder(
if (isHttp2Enabled) http2Stage(engine)
else http1Stage(secure = true)
)
lb = prependIdleTimeout(lb)
lb.prepend(new SSLStage(engine))
).prepend(new SSLStage(engine))

case None =>
if (isHttp2Enabled)
logger.warn("HTTP/2 support requires TLS. Falling back to HTTP/1.")
var lb = LeafBuilder(http1Stage(secure = false))
lb = prependIdleTimeout(lb)
lb
LeafBuilder(http1Stage(secure = false))
}
}
}
Expand Down Expand Up @@ -287,6 +284,7 @@ class BlazeServerBuilder[F[_]](

server -> shutdown
})
}

private def getContext(): Option[(SSLContext, Boolean)] = sslBits.map {
case KeyStoreBits(keyStore, keyManagerPassword, protocol, trustStore, clientAuth) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ package blaze
import cats.effect.{ConcurrentEffect, IO, Sync, Timer}
import cats.implicits._
import java.nio.ByteBuffer
import java.util.concurrent.TimeoutException
import org.http4s.blaze.http.parser.BaseExceptions.{BadMessage, ParserException}
import org.http4s.blaze.pipeline.Command.EOF
import org.http4s.blaze.pipeline.{TailStage, Command => Cmd}
import org.http4s.blaze.util.BufferTools
import org.http4s.blaze.util.{BufferTools, TickWheelExecutor}
import org.http4s.blaze.util.BufferTools.emptyBuffer
import org.http4s.blaze.util.Execution._
import org.http4s.blazecore.Http1Stage
import org.http4s.blazecore.{Http1Stage, IdleTimeoutStage}
import org.http4s.blazecore.util.{BodylessWriter, Http1Writer}
import org.http4s.headers.{Connection, `Content-Length`, `Transfer-Encoding`}
import org.http4s.internal.unsafeRunAsync
Expand All @@ -31,7 +32,9 @@ private[blaze] object Http1ServerStage {
maxRequestLineLen: Int,
maxHeadersLen: Int,
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration)(
responseHeaderTimeout: Duration,
idleTimeout: Duration,
scheduler: TickWheelExecutor)(
implicit F: ConcurrentEffect[F],
timer: Timer[F]): Http1ServerStage[F] =
if (enableWebSockets)
Expand All @@ -42,7 +45,9 @@ private[blaze] object Http1ServerStage {
maxRequestLineLen,
maxHeadersLen,
serviceErrorHandler,
responseHeaderTimeout) with WebSocketSupport[F]
responseHeaderTimeout,
idleTimeout,
scheduler) with WebSocketSupport[F]
else
new Http1ServerStage(
routes,
Expand All @@ -51,7 +56,9 @@ private[blaze] object Http1ServerStage {
maxRequestLineLen,
maxHeadersLen,
serviceErrorHandler,
responseHeaderTimeout)
responseHeaderTimeout,
idleTimeout,
scheduler)
}

private[blaze] class Http1ServerStage[F[_]](
Expand All @@ -61,7 +68,9 @@ private[blaze] class Http1ServerStage[F[_]](
maxRequestLineLen: Int,
maxHeadersLen: Int,
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration)(implicit protected val F: ConcurrentEffect[F], timer: Timer[F])
responseHeaderTimeout: Duration,
idleTimeout: Duration,
scheduler: TickWheelExecutor)(implicit protected val F: ConcurrentEffect[F], timer: Timer[F])
extends Http1Stage[F]
with TailStage[ByteBuffer] {

Expand Down Expand Up @@ -89,9 +98,26 @@ private[blaze] class Http1ServerStage[F[_]](
// Will act as our loop
override def stageStartup(): Unit = {
logger.debug("Starting HTTP pipeline")
initIdleTimeout()
requestLoop()
}

private def initIdleTimeout() =
idleTimeout match {
case f: FiniteDuration =>
val cb: Callback[TimeoutException] = {
case Left(t) =>
fatalError(t, "Error in idle timeout callback")
case Right(_) =>
logger.debug("Shutting down due to idle timeout")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could argue logging level here. Should perhaps be consistent with the "accepted connection" level.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems ok.

closePipeline(None)
}
val stage = new IdleTimeoutStage[ByteBuffer](f, cb, scheduler, executionContext)
spliceBefore(stage)
stage.stageStartup()
case _ =>
}

private val handleReqRead: Try[ByteBuffer] => Unit = {
case Success(buff) => reqLoopCallback(buff)
case Failure(Cmd.EOF) => closeConnection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import cats.implicits._
import fs2._
import fs2.Stream._
import java.util.Locale
import java.util.concurrent.TimeoutException
import org.http4s.{Headers => HHeaders, Method => HMethod}
import org.http4s.Header.Raw
import org.http4s.blaze.http.{HeaderNames, Headers}
import org.http4s.blaze.http.http2._
import org.http4s.blaze.pipeline.{TailStage, Command => Cmd}
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.IdleTimeoutStage
import org.http4s.blazecore.util.{End, Http2Writer}
import org.http4s.syntax.string._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand All @@ -26,7 +29,9 @@ private class Http2NodeStage[F[_]](
attributes: AttributeMap,
httpApp: HttpApp[F],
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration)(implicit F: ConcurrentEffect[F], timer: Timer[F])
responseHeaderTimeout: Duration,
idleTimeout: Duration,
scheduler: TickWheelExecutor)(implicit F: ConcurrentEffect[F], timer: Timer[F])
extends TailStage[StreamFrame] {

// micro-optimization: unwrap the service and call its .run directly
Expand All @@ -36,9 +41,27 @@ private class Http2NodeStage[F[_]](

override protected def stageStartup(): Unit = {
super.stageStartup()
initIdleTimeout()
readHeaders()
}

private def initIdleTimeout() =
idleTimeout match {
case f: FiniteDuration =>
val cb: Callback[TimeoutException] = {
case Left(t) =>
logger.error(t)("Error in idle timeout callback")
closePipeline(Some(t))
case Right(_) =>
logger.debug("Shutting down due to idle timeout")
closePipeline(None)
}
val stage = new IdleTimeoutStage[StreamFrame](f, cb, scheduler, executionContext)
spliceBefore(stage)
stage.stageStartup()
case _ =>
}

private def readHeaders(): Unit =
channelRead(timeout = timeout).onComplete {
case Success(HeadersFrame(_, endStream, hs)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import javax.net.ssl.SSLEngine
import org.http4s.blaze.http.http2.{DefaultFlowStrategy, Http2Settings}
import org.http4s.blaze.http.http2.server.{ALPNServerSelector, ServerPriorKnowledgeHandshaker}
import org.http4s.blaze.pipeline.{LeafBuilder, TailStage}
import org.http4s.blaze.util.TickWheelExecutor
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration

Expand All @@ -21,7 +22,9 @@ private[blaze] object ProtocolSelector {
requestAttributes: AttributeMap,
executionContext: ExecutionContext,
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration)(
responseHeaderTimeout: Duration,
idleTimeout: Duration,
scheduler: TickWheelExecutor)(
implicit F: ConcurrentEffect[F],
timer: Timer[F]): ALPNServerSelector = {

Expand All @@ -35,7 +38,10 @@ private[blaze] object ProtocolSelector {
requestAttributes,
httpApp,
serviceErrorHandler,
responseHeaderTimeout))
responseHeaderTimeout,
idleTimeout,
scheduler
))
}

val localSettings =
Expand All @@ -58,7 +64,9 @@ private[blaze] object ProtocolSelector {
maxRequestLineLen,
maxHeadersLen,
serviceErrorHandler,
responseHeaderTimeout
responseHeaderTimeout,
idleTimeout,
scheduler
)

def preference(protos: Set[String]): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ import java.nio.charset.StandardCharsets
import org.http4s.{headers => H, _}
import org.http4s.blaze._
import org.http4s.blaze.pipeline.Command.Connected
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.{ResponseParser, SeqTestHead}
import org.http4s.dsl.io._
import org.http4s.headers.{Date, `Content-Length`, `Transfer-Encoding`}
import org.specs2.specification.AfterAll
import org.specs2.specification.core.Fragment
import scala.concurrent.Await
import scala.concurrent.duration._

class Http1ServerStageSpec extends Http4sSpec {
class Http1ServerStageSpec extends Http4sSpec with AfterAll {
val tickWheel = new TickWheelExecutor()

def afterAll = tickWheel.shutdown()

def makeString(b: ByteBuffer): String = {
val p = b.position()
val a = new Array[Byte](b.remaining())
Expand Down Expand Up @@ -47,7 +53,10 @@ class Http1ServerStageSpec extends Http4sSpec {
maxReqLine,
maxHeaders,
DefaultServiceErrorHandler,
30.seconds)
30.seconds,
30.seconds,
tickWheel
)

pipeline.LeafBuilder(httpStage).base(head)
head.sendInboundCommand(Connected)
Expand Down