Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connecting timeout #2736

Merged
merged 7 commits into from Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,13 +2,18 @@ package org.http4s
package client
package blaze

import cats.implicits._
import cats.effect._
import java.nio.channels.AsynchronousChannelGroup

import javax.net.ssl.SSLContext
import org.http4s.blaze.channel.ChannelOptions
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.blazecore.{BlazeBackendBuilder, tickWheelResource}
import org.http4s.headers.{AgentProduct, `User-Agent`}
import org.http4s.internal.BackendBuilder
import org.log4s.getLogger

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

Expand All @@ -20,6 +25,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
val responseHeaderTimeout: Duration,
val idleTimeout: Duration,
val requestTimeout: Duration,
val connectTimeout: Duration,
val userAgent: Option[`User-Agent`],
val maxTotalConnections: Int,
val maxWaitQueueLimit: Int,
Expand All @@ -33,17 +39,21 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
val parserMode: ParserMode,
val bufferSize: Int,
val executionContext: ExecutionContext,
val scheduler: Resource[F, TickWheelExecutor],
val asynchronousChannelGroup: Option[AsynchronousChannelGroup],
val channelOptions: ChannelOptions
)(implicit protected val F: ConcurrentEffect[F])
extends BlazeBackendBuilder[Client[F]]
with BackendBuilder[F, Client[F]] {
type Self = BlazeClientBuilder[F]

final protected val logger = getLogger(this.getClass)

private def copy(
responseHeaderTimeout: Duration = responseHeaderTimeout,
idleTimeout: Duration = idleTimeout,
requestTimeout: Duration = requestTimeout,
connectTimeout: Duration = connectTimeout,
userAgent: Option[`User-Agent`] = userAgent,
maxTotalConnections: Int = maxTotalConnections,
maxWaitQueueLimit: Int = maxWaitQueueLimit,
Expand All @@ -57,13 +67,15 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
parserMode: ParserMode = parserMode,
bufferSize: Int = bufferSize,
executionContext: ExecutionContext = executionContext,
scheduler: Resource[F, TickWheelExecutor] = scheduler,
asynchronousChannelGroup: Option[AsynchronousChannelGroup] = asynchronousChannelGroup,
channelOptions: ChannelOptions = channelOptions
): BlazeClientBuilder[F] =
new BlazeClientBuilder[F](
responseHeaderTimeout = responseHeaderTimeout,
idleTimeout = idleTimeout,
requestTimeout = requestTimeout,
connectTimeout = connectTimeout,
userAgent = userAgent,
maxTotalConnections = maxTotalConnections,
maxWaitQueueLimit = maxWaitQueueLimit,
Expand All @@ -77,6 +89,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
parserMode = parserMode,
bufferSize = bufferSize,
executionContext = executionContext,
scheduler = scheduler,
asynchronousChannelGroup = asynchronousChannelGroup,
channelOptions = channelOptions
) {}
Expand All @@ -93,6 +106,9 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
def withRequestTimeout(requestTimeout: Duration): BlazeClientBuilder[F] =
copy(requestTimeout = requestTimeout)

def withConnectTimeout(connectTimeout: Duration): BlazeClientBuilder[F] =
copy(connectTimeout = connectTimeout)

def withUserAgentOption(userAgent: Option[`User-Agent`]): BlazeClientBuilder[F] =
copy(userAgent = userAgent)
def withUserAgent(userAgent: `User-Agent`): BlazeClientBuilder[F] =
Expand Down Expand Up @@ -151,6 +167,9 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
def withExecutionContext(executionContext: ExecutionContext): BlazeClientBuilder[F] =
copy(executionContext = executionContext)

def withScheduler(scheduler: TickWheelExecutor): BlazeClientBuilder[F] =
copy(scheduler = scheduler.pure[Resource[F, ?]])

def withAsynchronousChannelGroupOption(
asynchronousChannelGroup: Option[AsynchronousChannelGroup]): BlazeClientBuilder[F] =
copy(asynchronousChannelGroup = asynchronousChannelGroup)
Expand All @@ -164,8 +183,9 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
copy(channelOptions = channelOptions)

def resource: Resource[F, Client[F]] =
tickWheelResource.flatMap { scheduler =>
connectionManager.map { manager =>
scheduler.flatMap { scheduler =>
verifyAllTimeoutsAccuracy(scheduler)
connectionManager(scheduler).map { manager =>
BlazeClient.makeClient(
manager = manager,
responseHeaderTimeout = responseHeaderTimeout,
Expand All @@ -177,21 +197,43 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
}
}

private def connectionManager(
private def verifyAllTimeoutsAccuracy(scheduler: TickWheelExecutor): Unit = {
verifyTimeoutAccuracy(scheduler.tick, responseHeaderTimeout, "responseHeaderTimeout")
verifyTimeoutAccuracy(scheduler.tick, idleTimeout, "idleTimeout")
verifyTimeoutAccuracy(scheduler.tick, requestTimeout, "requestTimeout")
verifyTimeoutAccuracy(scheduler.tick, connectTimeout, "connectTimeout")
}

private def verifyTimeoutAccuracy(
tick: Duration,
timeout: Duration,
timeoutName: String): Unit = {
val warningThreshold = 0.1 // 10%
val inaccuracy = tick / timeout
if (inaccuracy > warningThreshold) {
logger.warn(
s"With current configuration $timeoutName ($timeout) may be up to ${inaccuracy * 100}% longer than configured. " +
s"If timeout accuracy is important, consider using a scheduler with a shorter tick (currently $tick).")
}
}

private def connectionManager(scheduler: TickWheelExecutor)(
implicit F: ConcurrentEffect[F]): Resource[F, ConnectionManager[F, BlazeConnection[F]]] = {
val http1: ConnectionBuilder[F, BlazeConnection[F]] = new Http1Support(
sslContextOption = sslContext,
bufferSize = bufferSize,
asynchronousChannelGroup = asynchronousChannelGroup,
executionContext = executionContext,
scheduler = scheduler,
checkEndpointIdentification = checkEndpointIdentification,
maxResponseLineSize = maxResponseLineSize,
maxHeaderLength = maxHeaderLength,
maxChunkSize = maxChunkSize,
chunkBufferMaxSize = chunkBufferMaxSize,
parserMode = parserMode,
userAgent = userAgent,
channelOptions = channelOptions
channelOptions = channelOptions,
connectTimeout = connectTimeout
).makeClient
Resource.make(
ConnectionManager.pool(
Expand All @@ -214,6 +256,7 @@ object BlazeClientBuilder {
responseHeaderTimeout = 10.seconds,
idleTimeout = 1.minute,
requestTimeout = 1.minute,
connectTimeout = 10.seconds,
userAgent = Some(`User-Agent`(AgentProduct("http4s-blaze", Some(BuildInfo.version)))),
maxTotalConnections = 10,
maxWaitQueueLimit = 256,
Expand All @@ -227,6 +270,7 @@ object BlazeClientBuilder {
parserMode = ParserMode.Strict,
bufferSize = 8192,
executionContext = executionContext,
scheduler = tickWheelResource,
asynchronousChannelGroup = None,
channelOptions = ChannelOptions(Vector.empty)
) {}
Expand Down
Expand Up @@ -6,6 +6,8 @@ import cats.effect._
import fs2.Stream
import org.http4s.blaze.channel.ChannelOptions

import scala.concurrent.duration.Duration

/** Create a HTTP1 client which will attempt to recycle connections */
@deprecated("Use BlazeClientBuilder", "0.19.0-M2")
object Http1Client {
Expand All @@ -21,14 +23,16 @@ object Http1Client {
bufferSize = config.bufferSize,
asynchronousChannelGroup = config.group,
executionContext = config.executionContext,
scheduler = bits.ClientTickWheel,
checkEndpointIdentification = config.checkEndpointIdentification,
maxResponseLineSize = config.maxResponseLineSize,
maxHeaderLength = config.maxHeaderLength,
maxChunkSize = config.maxChunkSize,
chunkBufferMaxSize = config.chunkBufferMaxSize,
parserMode = if (config.lenientParser) ParserMode.Lenient else ParserMode.Strict,
userAgent = config.userAgent,
channelOptions = ChannelOptions(Vector.empty)
channelOptions = ChannelOptions(Vector.empty),
connectTimeout = Duration.Inf
).makeClient

Resource
Expand Down
Expand Up @@ -2,20 +2,23 @@ package org.http4s
package client
package blaze

import cats.effect._
import cats.implicits._
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousChannelGroup

import cats.effect._
import cats.implicits._
import javax.net.ssl.SSLContext
import org.http4s.blaze.channel.ChannelOptions
import org.http4s.blaze.channel.nio2.ClientChannelFactory
import org.http4s.blaze.pipeline.{Command, LeafBuilder}
import org.http4s.blaze.pipeline.stages.SSLStage
import org.http4s.blaze.pipeline.{Command, LeafBuilder}
import org.http4s.blaze.util.TickWheelExecutor
import org.http4s.headers.`User-Agent`
import org.http4s.internal.fromFuture
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

/** Provides basic HTTP1 pipeline building
*/
Expand All @@ -24,20 +27,27 @@ final private class Http1Support[F[_]](
bufferSize: Int,
asynchronousChannelGroup: Option[AsynchronousChannelGroup],
executionContext: ExecutionContext,
scheduler: TickWheelExecutor,
checkEndpointIdentification: Boolean,
maxResponseLineSize: Int,
maxHeaderLength: Int,
maxChunkSize: Int,
chunkBufferMaxSize: Int,
parserMode: ParserMode,
userAgent: Option[`User-Agent`],
channelOptions: ChannelOptions
channelOptions: ChannelOptions,
connectTimeout: Duration
)(implicit F: ConcurrentEffect[F]) {

// SSLContext.getDefault is effectful and can fail - don't force it until we have to.
private lazy val sslContext = sslContextOption.getOrElse(SSLContext.getDefault)
private val connectionManager =
new ClientChannelFactory(bufferSize, asynchronousChannelGroup, channelOptions)
private val connectionManager = new ClientChannelFactory(
bufferSize,
asynchronousChannelGroup,
channelOptions,
scheduler,
connectTimeout
)

////////////////////////////////////////////////////

Expand All @@ -51,7 +61,7 @@ final private class Http1Support[F[_]](
requestKey: RequestKey,
addr: InetSocketAddress): Future[BlazeConnection[F]] =
connectionManager
.connect(addr, bufferSize)
.connect(addr)
.map { head =>
val (builder, t) = buildStages(requestKey)
builder.base(head)
Expand Down
Expand Up @@ -17,7 +17,7 @@ import scala.concurrent.duration.Duration
*/
trait ConnectionManager[F[_], A <: Connection[F]] {

/** Bundle of the connection and wheither its new or not */
/** Bundle of the connection and whether its new or not */
// Sealed, rather than final, because SI-4440.
sealed case class NextConnection(connection: A, fresh: Boolean)

Expand Down
7 changes: 6 additions & 1 deletion project/Http4sPlugin.scala
Expand Up @@ -7,6 +7,7 @@ import com.typesafe.sbt.SbtGit.git
import com.typesafe.sbt.SbtPgp.autoImport._
import com.typesafe.sbt.git.JGit
import com.typesafe.sbt.pgp.PgpKeys.publishSigned
import com.typesafe.tools.mima.core.{DirectMissingMethodProblem, ProblemFilters}
import com.typesafe.tools.mima.plugin.MimaPlugin
import com.typesafe.tools.mima.plugin.MimaPlugin.autoImport._
import java.lang.{Runtime => JRuntime}
Expand Down Expand Up @@ -96,6 +97,10 @@ object Http4sPlugin extends AutoPlugin {
}.map {
organization.value % s"${moduleName.value}_${scalaBinaryVersion.value}" % _
}).toSet,
mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("org.http4s.client.blaze.BlazeClientBuilder.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.http4s.client.blaze.Http1Support.this")
),

libraryDependencies += compilerPlugin(
CrossVersion.binaryScalaVersion(scalaVersion.value) match {
Expand Down Expand Up @@ -295,7 +300,7 @@ object Http4sPlugin extends AutoPlugin {
lazy val alpnBoot = "org.mortbay.jetty.alpn" % "alpn-boot" % "8.1.13.v20181017"
lazy val argonaut = "io.argonaut" %% "argonaut" % "6.2.3"
lazy val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.8.1"
lazy val blaze = "org.http4s" %% "blaze-http" % "0.14.6"
lazy val blaze = "org.http4s" %% "blaze-http" % "0.14.7"
lazy val boopickle = "io.suzaku" %% "boopickle" % "1.3.1"
lazy val cats = "org.typelevel" %% "cats-core" % "1.6.1"
lazy val catsEffect = "org.typelevel" %% "cats-effect" % "1.3.1"
Expand Down