diff --git a/blaze-client/src/main/scala/org/http4s/client/blaze/BlazeClientBuilder.scala b/blaze-client/src/main/scala/org/http4s/client/blaze/BlazeClientBuilder.scala index 5c5d03f58ef..a1808fbe14c 100644 --- a/blaze-client/src/main/scala/org/http4s/client/blaze/BlazeClientBuilder.scala +++ b/blaze-client/src/main/scala/org/http4s/client/blaze/BlazeClientBuilder.scala @@ -4,11 +4,10 @@ package blaze import cats.effect._ import cats.implicits._ -import java.net.{SocketOption, StandardSocketOptions} import java.nio.channels.AsynchronousChannelGroup import javax.net.ssl.SSLContext -import org.http4s.blaze.channel.{ChannelOptions, OptionValue} -import org.http4s.blazecore.tickWheelResource +import org.http4s.blaze.channel.ChannelOptions +import org.http4s.blazecore.{BlazeBackendBuilder, tickWheelResource} import org.http4s.headers.{AgentProduct, `User-Agent`} import org.http4s.internal.BackendBuilder import scala.concurrent.ExecutionContext @@ -33,7 +32,10 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( val asynchronousChannelGroup: Option[AsynchronousChannelGroup], val channelOptions: ChannelOptions )(implicit protected val F: ConcurrentEffect[F]) - extends BackendBuilder[F, Client[F]] { + extends BlazeBackendBuilder[Client[F]] + with BackendBuilder[F, Client[F]] { + type Self = BlazeClientBuilder[F] + private def copy( responseHeaderTimeout: Duration = responseHeaderTimeout, idleTimeout: Duration = idleTimeout, @@ -136,53 +138,8 @@ sealed abstract class BlazeClientBuilder[F[_]] private ( def withoutAsynchronousChannelGroup: BlazeClientBuilder[F] = withAsynchronousChannelGroupOption(None) - def channelOption[A](socketOption: SocketOption[A]) = - channelOptions.options.collectFirst { - case OptionValue(key, value) if key == socketOption => - value.asInstanceOf[A] - } def withChannelOptions(channelOptions: ChannelOptions): BlazeClientBuilder[F] = copy(channelOptions = channelOptions) - def withChannelOption[A](key: SocketOption[A], value: A): BlazeClientBuilder[F] = - withChannelOptions( - ChannelOptions(channelOptions.options.filterNot(_.key == key) :+ OptionValue(key, value))) - def withDefaultChannelOption[A](key: SocketOption[A]): BlazeClientBuilder[F] = - withChannelOptions(ChannelOptions(channelOptions.options.filterNot(_.key == key))) - - def socketSendBufferSize: Option[Int] = - channelOption(StandardSocketOptions.SO_SNDBUF).map(Int.unbox) - def withSocketSendBufferSize(socketSendBufferSize: Int): BlazeClientBuilder[F] = - withChannelOption(StandardSocketOptions.SO_SNDBUF, Int.box(socketSendBufferSize)) - def withDefaultSocketSendBufferSize: BlazeClientBuilder[F] = - withDefaultChannelOption(StandardSocketOptions.SO_SNDBUF) - - def socketReceiveBufferSize: Option[Int] = - channelOption(StandardSocketOptions.SO_RCVBUF).map(Int.unbox) - def withSocketReceiveBufferSize(socketReceiveBufferSize: Int): BlazeClientBuilder[F] = - withChannelOption(StandardSocketOptions.SO_RCVBUF, Int.box(socketReceiveBufferSize)) - def withDefaultSocketReceiveBufferSize: BlazeClientBuilder[F] = - withDefaultChannelOption(StandardSocketOptions.SO_RCVBUF) - - def socketKeepAlive: Option[Boolean] = - channelOption(StandardSocketOptions.SO_KEEPALIVE).map(Boolean.unbox) - def withSocketKeepAlive(socketKeepAlive: Boolean): BlazeClientBuilder[F] = - withChannelOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.box(socketKeepAlive)) - def withDefaultSocketKeepAlive: BlazeClientBuilder[F] = - withDefaultChannelOption(StandardSocketOptions.SO_KEEPALIVE) - - def socketReuseAddress: Option[Boolean] = - channelOption(StandardSocketOptions.SO_REUSEADDR).map(Boolean.unbox) - def withSocketReuseAddress(socketReuseAddress: Boolean): BlazeClientBuilder[F] = - withChannelOption(StandardSocketOptions.SO_REUSEADDR, Boolean.box(socketReuseAddress)) - def withDefaultSocketReuseAddress: BlazeClientBuilder[F] = - withDefaultChannelOption(StandardSocketOptions.SO_REUSEADDR) - - def tcpNoDelay: Option[Boolean] = - channelOption(StandardSocketOptions.TCP_NODELAY).map(Boolean.unbox) - def withTcpNoDelay(tcpNoDelay: Boolean): BlazeClientBuilder[F] = - withChannelOption(StandardSocketOptions.TCP_NODELAY, Boolean.box(tcpNoDelay)) - def withDefaultTcpNoDelay: BlazeClientBuilder[F] = - withDefaultChannelOption(StandardSocketOptions.TCP_NODELAY) def resource: Resource[F, Client[F]] = tickWheelResource.flatMap { scheduler => diff --git a/blaze-core/src/main/scala/org/http4s/blazecore/BlazeBackendBuilder.scala b/blaze-core/src/main/scala/org/http4s/blazecore/BlazeBackendBuilder.scala new file mode 100644 index 00000000000..e2d662d4f24 --- /dev/null +++ b/blaze-core/src/main/scala/org/http4s/blazecore/BlazeBackendBuilder.scala @@ -0,0 +1,58 @@ +package org.http4s +package blazecore + +import java.net.{SocketOption, StandardSocketOptions} +import org.http4s.blaze.channel.{ChannelOptions, OptionValue} + +private[http4s] trait BlazeBackendBuilder[B] { + type Self + + def channelOptions: ChannelOptions + + def channelOption[A](socketOption: SocketOption[A]) = + channelOptions.options.collectFirst { + case OptionValue(key, value) if key == socketOption => + value.asInstanceOf[A] + } + def withChannelOptions(channelOptions: ChannelOptions): Self + def withChannelOption[A](key: SocketOption[A], value: A): Self = + withChannelOptions( + ChannelOptions(channelOptions.options.filterNot(_.key == key) :+ OptionValue(key, value))) + def withDefaultChannelOption[A](key: SocketOption[A]): Self = + withChannelOptions(ChannelOptions(channelOptions.options.filterNot(_.key == key))) + + def socketSendBufferSize: Option[Int] = + channelOption(StandardSocketOptions.SO_SNDBUF).map(Int.unbox) + def withSocketSendBufferSize(socketSendBufferSize: Int): Self = + withChannelOption(StandardSocketOptions.SO_SNDBUF, Int.box(socketSendBufferSize)) + def withDefaultSocketSendBufferSize: Self = + withDefaultChannelOption(StandardSocketOptions.SO_SNDBUF) + + def socketReceiveBufferSize: Option[Int] = + channelOption(StandardSocketOptions.SO_RCVBUF).map(Int.unbox) + def withSocketReceiveBufferSize(socketReceiveBufferSize: Int): Self = + withChannelOption(StandardSocketOptions.SO_RCVBUF, Int.box(socketReceiveBufferSize)) + def withDefaultSocketReceiveBufferSize: Self = + withDefaultChannelOption(StandardSocketOptions.SO_RCVBUF) + + def socketKeepAlive: Option[Boolean] = + channelOption(StandardSocketOptions.SO_KEEPALIVE).map(Boolean.unbox) + def withSocketKeepAlive(socketKeepAlive: Boolean): Self = + withChannelOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.box(socketKeepAlive)) + def withDefaultSocketKeepAlive: Self = + withDefaultChannelOption(StandardSocketOptions.SO_KEEPALIVE) + + def socketReuseAddress: Option[Boolean] = + channelOption(StandardSocketOptions.SO_REUSEADDR).map(Boolean.unbox) + def withSocketReuseAddress(socketReuseAddress: Boolean): Self = + withChannelOption(StandardSocketOptions.SO_REUSEADDR, Boolean.box(socketReuseAddress)) + def withDefaultSocketReuseAddress: Self = + withDefaultChannelOption(StandardSocketOptions.SO_REUSEADDR) + + def tcpNoDelay: Option[Boolean] = + channelOption(StandardSocketOptions.TCP_NODELAY).map(Boolean.unbox) + def withTcpNoDelay(tcpNoDelay: Boolean): Self = + withChannelOption(StandardSocketOptions.TCP_NODELAY, Boolean.box(tcpNoDelay)) + def withDefaultTcpNoDelay: Self = + withDefaultChannelOption(StandardSocketOptions.TCP_NODELAY) +} diff --git a/blaze-server/src/main/scala/org/http4s/server/blaze/BlazeServerBuilder.scala b/blaze-server/src/main/scala/org/http4s/server/blaze/BlazeServerBuilder.scala index 6ac19161c4e..004acf43dc2 100644 --- a/blaze-server/src/main/scala/org/http4s/server/blaze/BlazeServerBuilder.scala +++ b/blaze-server/src/main/scala/org/http4s/server/blaze/BlazeServerBuilder.scala @@ -12,14 +12,13 @@ import java.nio.ByteBuffer import java.security.{KeyStore, Security} import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLEngine, TrustManagerFactory} import org.http4s.blaze.{BuildInfo => BlazeBuildInfo} -import org.http4s.blaze.channel -import org.http4s.blaze.channel.SocketConnection +import org.http4s.blaze.channel.{ChannelOptions, DefaultPoolSize, SocketConnection} import org.http4s.blaze.channel.nio1.NIO1SocketServerGroup import org.http4s.blaze.channel.nio2.NIO2SocketServerGroup import org.http4s.blaze.http.http2.server.ALPNServerSelector import org.http4s.blaze.pipeline.LeafBuilder import org.http4s.blaze.pipeline.stages.SSLStage -import org.http4s.blazecore.tickWheelResource +import org.http4s.blazecore.{BlazeBackendBuilder, tickWheelResource} import org.http4s.server.SSLKeyStoreSupport.StoreInfo import org.log4s.getLogger import scala.collection.immutable @@ -73,9 +72,11 @@ class BlazeServerBuilder[F[_]]( maxHeadersLen: Int, httpApp: HttpApp[F], serviceErrorHandler: ServiceErrorHandler[F], - banner: immutable.Seq[String] + banner: immutable.Seq[String], + val channelOptions: ChannelOptions )(implicit protected val F: ConcurrentEffect[F], timer: Timer[F]) - extends ServerBuilder[F] { + extends ServerBuilder[F] + with BlazeBackendBuilder[Server[F]] { type Self = BlazeServerBuilder[F] private[this] val logger = getLogger @@ -95,7 +96,8 @@ class BlazeServerBuilder[F[_]]( maxHeadersLen: Int = maxHeadersLen, httpApp: HttpApp[F] = httpApp, serviceErrorHandler: ServiceErrorHandler[F] = serviceErrorHandler, - banner: immutable.Seq[String] = banner + banner: immutable.Seq[String] = banner, + channelOptions: ChannelOptions = channelOptions ): Self = new BlazeServerBuilder( socketAddress, @@ -112,7 +114,8 @@ class BlazeServerBuilder[F[_]]( maxHeadersLen, httpApp, serviceErrorHandler, - banner + banner, + channelOptions ) /** Configure HTTP parser length limits @@ -172,6 +175,9 @@ class BlazeServerBuilder[F[_]]( def withBanner(banner: immutable.Seq[String]): Self = copy(banner = banner) + def withChannelOptions(channelOptions: ChannelOptions): BlazeServerBuilder[F] = + copy(channelOptions = channelOptions) + def resource: Resource[F, Server[F]] = tickWheelResource.flatMap { scheduler => Resource(F.delay { @@ -246,9 +252,9 @@ class BlazeServerBuilder[F[_]]( val factory = if (isNio2) - NIO2SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize) + NIO2SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize, channelOptions) else - NIO1SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize) + NIO1SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize, channelOptions) val address = resolveAddress(socketAddress) @@ -326,7 +332,7 @@ object BlazeServerBuilder { responseHeaderTimeout = 1.minute, idleTimeout = defaults.IdleTimeout, isNio2 = false, - connectorPoolSize = channel.DefaultPoolSize, + connectorPoolSize = DefaultPoolSize, bufferSize = 64 * 1024, enableWebSockets = true, sslBits = None, @@ -335,7 +341,8 @@ object BlazeServerBuilder { maxHeadersLen = 40 * 1024, httpApp = defaultApp[F], serviceErrorHandler = DefaultServiceErrorHandler[F], - banner = defaults.Banner + banner = defaults.Banner, + channelOptions = ChannelOptions(Vector.empty) ) private def defaultApp[F[_]: Applicative]: HttpApp[F] = diff --git a/blaze-server/src/test/scala/org/http4s/server/blaze/BlazeServerSpec.scala b/blaze-server/src/test/scala/org/http4s/server/blaze/BlazeServerSpec.scala index 0346d645afb..25567b713ac 100644 --- a/blaze-server/src/test/scala/org/http4s/server/blaze/BlazeServerSpec.scala +++ b/blaze-server/src/test/scala/org/http4s/server/blaze/BlazeServerSpec.scala @@ -5,6 +5,7 @@ package blaze import cats.effect.IO import java.net.{HttpURLConnection, URL} import java.nio.charset.StandardCharsets +import org.http4s.blaze.channel.ChannelOptions import org.http4s.dsl.io._ import scala.concurrent.duration._ import scala.io.Source @@ -88,4 +89,55 @@ class BlazeServerSpec extends Http4sSpec { } } } + + "ChannelOptions" should { + "default to empty" in { + builder.channelOptions must_== ChannelOptions(Vector.empty) + } + "set socket send buffer size" in { + builder.withSocketSendBufferSize(8192).socketSendBufferSize must beSome(8192) + } + "set socket receive buffer size" in { + builder.withSocketReceiveBufferSize(8192).socketReceiveBufferSize must beSome(8192) + } + "set socket keepalive" in { + builder.withSocketKeepAlive(true).socketKeepAlive must beSome(true) + } + "set socket reuse address" in { + builder.withSocketReuseAddress(true).socketReuseAddress must beSome(true) + } + "set TCP nodelay" in { + builder.withTcpNoDelay(true).tcpNoDelay must beSome(true) + } + "unset socket send buffer size" in { + builder + .withSocketSendBufferSize(8192) + .withDefaultSocketSendBufferSize + .socketSendBufferSize must beNone + } + "unset socket receive buffer size" in { + builder + .withSocketReceiveBufferSize(8192) + .withDefaultSocketReceiveBufferSize + .socketReceiveBufferSize must beNone + } + "unset socket keepalive" in { + builder.withSocketKeepAlive(true).withDefaultSocketKeepAlive.socketKeepAlive must beNone + } + "unset socket reuse address" in { + builder + .withSocketReuseAddress(true) + .withDefaultSocketReuseAddress + .socketReuseAddress must beNone + } + "unset TCP nodelay" in { + builder.withTcpNoDelay(true).withDefaultTcpNoDelay.tcpNoDelay must beNone + } + "overwrite keys" in { + builder + .withSocketSendBufferSize(8192) + .withSocketSendBufferSize(4096) + .socketSendBufferSize must beSome(4096) + } + } }