Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support channel options on BlazeServerBuilder #2242

Merged
merged 3 commits into from Nov 13, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -5,11 +5,10 @@ package blaze
import cats.effect._
import cats.implicits._
import fs2.Stream
import java.net.{SocketOption, StandardSocketOptions}
import java.nio.channels.AsynchronousChannelGroup
import javax.net.ssl.SSLContext
import org.http4s.blaze.channel.{ChannelOptions, OptionValue}
import org.http4s.blazecore.tickWheelResource
import org.http4s.blaze.channel.ChannelOptions
import org.http4s.blazecore.{BlazeBackendBuilder, tickWheelResource}
import org.http4s.headers.{AgentProduct, `User-Agent`}
import org.http4s.internal.allocated
import scala.concurrent.ExecutionContext
Expand All @@ -33,7 +32,9 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
val executionContext: ExecutionContext,
val asynchronousChannelGroup: Option[AsynchronousChannelGroup],
val channelOptions: ChannelOptions
) {
) extends BlazeBackendBuilder[Client[F]] {
type Self = BlazeClientBuilder[F]

private def copy(
responseHeaderTimeout: Duration = responseHeaderTimeout,
idleTimeout: Duration = idleTimeout,
Expand Down Expand Up @@ -136,53 +137,8 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
def withoutAsynchronousChannelGroup: BlazeClientBuilder[F] =
withAsynchronousChannelGroupOption(None)

def channelOption[A](socketOption: SocketOption[A]) =
channelOptions.options.collectFirst {
case OptionValue(key, value) if key == socketOption =>
value.asInstanceOf[A]
}
def withChannelOptions(channelOptions: ChannelOptions): BlazeClientBuilder[F] =
copy(channelOptions = channelOptions)
def withChannelOption[A](key: SocketOption[A], value: A): BlazeClientBuilder[F] =
withChannelOptions(
ChannelOptions(channelOptions.options.filterNot(_.key == key) :+ OptionValue(key, value)))
def withDefaultChannelOption[A](key: SocketOption[A]): BlazeClientBuilder[F] =
withChannelOptions(ChannelOptions(channelOptions.options.filterNot(_.key == key)))

def socketSendBufferSize: Option[Int] =
channelOption(StandardSocketOptions.SO_SNDBUF).map(Int.unbox)
def withSocketSendBufferSize(socketSendBufferSize: Int): BlazeClientBuilder[F] =
withChannelOption(StandardSocketOptions.SO_SNDBUF, Int.box(socketSendBufferSize))
def withDefaultSocketSendBufferSize: BlazeClientBuilder[F] =
withDefaultChannelOption(StandardSocketOptions.SO_SNDBUF)

def socketReceiveBufferSize: Option[Int] =
channelOption(StandardSocketOptions.SO_RCVBUF).map(Int.unbox)
def withSocketReceiveBufferSize(socketReceiveBufferSize: Int): BlazeClientBuilder[F] =
withChannelOption(StandardSocketOptions.SO_RCVBUF, Int.box(socketReceiveBufferSize))
def withDefaultSocketReceiveBufferSize: BlazeClientBuilder[F] =
withDefaultChannelOption(StandardSocketOptions.SO_RCVBUF)

def socketKeepAlive: Option[Boolean] =
channelOption(StandardSocketOptions.SO_KEEPALIVE).map(Boolean.unbox)
def withSocketKeepAlive(socketKeepAlive: Boolean): BlazeClientBuilder[F] =
withChannelOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.box(socketKeepAlive))
def withDefaultSocketKeepAlive: BlazeClientBuilder[F] =
withDefaultChannelOption(StandardSocketOptions.SO_KEEPALIVE)

def socketReuseAddress: Option[Boolean] =
channelOption(StandardSocketOptions.SO_REUSEADDR).map(Boolean.unbox)
def withSocketReuseAddress(socketReuseAddress: Boolean): BlazeClientBuilder[F] =
withChannelOption(StandardSocketOptions.SO_REUSEADDR, Boolean.box(socketReuseAddress))
def withDefaultSocketReuseAddress: BlazeClientBuilder[F] =
withDefaultChannelOption(StandardSocketOptions.SO_REUSEADDR)

def tcpNoDelay: Option[Boolean] =
channelOption(StandardSocketOptions.TCP_NODELAY).map(Boolean.unbox)
def withTcpNoDelay(tcpNoDelay: Boolean): BlazeClientBuilder[F] =
withChannelOption(StandardSocketOptions.TCP_NODELAY, Boolean.box(tcpNoDelay))
def withDefaultTcpNoDelay: BlazeClientBuilder[F] =
withDefaultChannelOption(StandardSocketOptions.TCP_NODELAY)

def allocate(implicit F: ConcurrentEffect[F]): F[(Client[F], F[Unit])] =
allocated(resource)
Expand Down
@@ -0,0 +1,58 @@
package org.http4s
package blazecore

import java.net.{SocketOption, StandardSocketOptions}
import org.http4s.blaze.channel.{ChannelOptions, OptionValue}

private[http4s] trait BlazeBackendBuilder[B] {
type Self

def channelOptions: ChannelOptions

def channelOption[A](socketOption: SocketOption[A]) =
channelOptions.options.collectFirst {
case OptionValue(key, value) if key == socketOption =>
value.asInstanceOf[A]
}
def withChannelOptions(channelOptions: ChannelOptions): Self
def withChannelOption[A](key: SocketOption[A], value: A): Self =
withChannelOptions(
ChannelOptions(channelOptions.options.filterNot(_.key == key) :+ OptionValue(key, value)))
def withDefaultChannelOption[A](key: SocketOption[A]): Self =
withChannelOptions(ChannelOptions(channelOptions.options.filterNot(_.key == key)))

def socketSendBufferSize: Option[Int] =
channelOption(StandardSocketOptions.SO_SNDBUF).map(Int.unbox)
def withSocketSendBufferSize(socketSendBufferSize: Int): Self =
withChannelOption(StandardSocketOptions.SO_SNDBUF, Int.box(socketSendBufferSize))
def withDefaultSocketSendBufferSize: Self =
withDefaultChannelOption(StandardSocketOptions.SO_SNDBUF)

def socketReceiveBufferSize: Option[Int] =
channelOption(StandardSocketOptions.SO_RCVBUF).map(Int.unbox)
def withSocketReceiveBufferSize(socketReceiveBufferSize: Int): Self =
withChannelOption(StandardSocketOptions.SO_RCVBUF, Int.box(socketReceiveBufferSize))
def withDefaultSocketReceiveBufferSize: Self =
withDefaultChannelOption(StandardSocketOptions.SO_RCVBUF)

def socketKeepAlive: Option[Boolean] =
channelOption(StandardSocketOptions.SO_KEEPALIVE).map(Boolean.unbox)
def withSocketKeepAlive(socketKeepAlive: Boolean): Self =
withChannelOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.box(socketKeepAlive))
def withDefaultSocketKeepAlive: Self =
withDefaultChannelOption(StandardSocketOptions.SO_KEEPALIVE)

def socketReuseAddress: Option[Boolean] =
channelOption(StandardSocketOptions.SO_REUSEADDR).map(Boolean.unbox)
def withSocketReuseAddress(socketReuseAddress: Boolean): Self =
withChannelOption(StandardSocketOptions.SO_REUSEADDR, Boolean.box(socketReuseAddress))
def withDefaultSocketReuseAddress: Self =
withDefaultChannelOption(StandardSocketOptions.SO_REUSEADDR)

def tcpNoDelay: Option[Boolean] =
channelOption(StandardSocketOptions.TCP_NODELAY).map(Boolean.unbox)
def withTcpNoDelay(tcpNoDelay: Boolean): Self =
withChannelOption(StandardSocketOptions.TCP_NODELAY, Boolean.box(tcpNoDelay))
def withDefaultTcpNoDelay: Self =
withDefaultChannelOption(StandardSocketOptions.TCP_NODELAY)
}
Expand Up @@ -12,14 +12,13 @@ import java.nio.ByteBuffer
import java.security.{KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLEngine, TrustManagerFactory}
import org.http4s.blaze.{BuildInfo => BlazeBuildInfo}
import org.http4s.blaze.channel
import org.http4s.blaze.channel.SocketConnection
import org.http4s.blaze.channel.{ChannelOptions, DefaultPoolSize, SocketConnection}
import org.http4s.blaze.channel.nio1.NIO1SocketServerGroup
import org.http4s.blaze.channel.nio2.NIO2SocketServerGroup
import org.http4s.blaze.http.http2.server.ALPNServerSelector
import org.http4s.blaze.pipeline.LeafBuilder
import org.http4s.blaze.pipeline.stages.SSLStage
import org.http4s.blazecore.tickWheelResource
import org.http4s.blazecore.{BlazeBackendBuilder, tickWheelResource}
import org.http4s.server.SSLKeyStoreSupport.StoreInfo
import org.log4s.getLogger
import scala.collection.immutable
Expand Down Expand Up @@ -73,9 +72,11 @@ class BlazeServerBuilder[F[_]](
maxHeadersLen: Int,
httpApp: HttpApp[F],
serviceErrorHandler: ServiceErrorHandler[F],
banner: immutable.Seq[String]
banner: immutable.Seq[String],
val channelOptions: ChannelOptions
)(implicit protected val F: ConcurrentEffect[F], timer: Timer[F])
extends ServerBuilder[F] {
extends ServerBuilder[F]
with BlazeBackendBuilder[Server[F]] {
type Self = BlazeServerBuilder[F]

private[this] val logger = getLogger
Expand All @@ -95,7 +96,8 @@ class BlazeServerBuilder[F[_]](
maxHeadersLen: Int = maxHeadersLen,
httpApp: HttpApp[F] = httpApp,
serviceErrorHandler: ServiceErrorHandler[F] = serviceErrorHandler,
banner: immutable.Seq[String] = banner
banner: immutable.Seq[String] = banner,
channelOptions: ChannelOptions = channelOptions
): Self =
new BlazeServerBuilder(
socketAddress,
Expand All @@ -112,7 +114,8 @@ class BlazeServerBuilder[F[_]](
maxHeadersLen,
httpApp,
serviceErrorHandler,
banner
banner,
channelOptions
)

/** Configure HTTP parser length limits
Expand Down Expand Up @@ -172,6 +175,9 @@ class BlazeServerBuilder[F[_]](
def withBanner(banner: immutable.Seq[String]): Self =
copy(banner = banner)

def withChannelOptions(channelOptions: ChannelOptions): BlazeServerBuilder[F] =
copy(channelOptions = channelOptions)

def resource: Resource[F, Server[F]] = tickWheelResource.flatMap { scheduler =>
Resource(F.delay {

Expand Down Expand Up @@ -246,9 +252,9 @@ class BlazeServerBuilder[F[_]](

val factory =
if (isNio2)
NIO2SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize)
NIO2SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize, channelOptions)
else
NIO1SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize)
NIO1SocketServerGroup.fixedGroup(connectorPoolSize, bufferSize, channelOptions)

val address = resolveAddress(socketAddress)

Expand Down Expand Up @@ -326,7 +332,7 @@ object BlazeServerBuilder {
responseHeaderTimeout = 1.minute,
idleTimeout = defaults.IdleTimeout,
isNio2 = false,
connectorPoolSize = channel.DefaultPoolSize,
connectorPoolSize = DefaultPoolSize,
bufferSize = 64 * 1024,
enableWebSockets = true,
sslBits = None,
Expand All @@ -335,7 +341,8 @@ object BlazeServerBuilder {
maxHeadersLen = 40 * 1024,
httpApp = defaultApp[F],
serviceErrorHandler = DefaultServiceErrorHandler[F],
banner = defaults.Banner
banner = defaults.Banner,
channelOptions = ChannelOptions(Vector.empty)
)

private def defaultApp[F[_]: Applicative]: HttpApp[F] =
Expand Down
Expand Up @@ -5,6 +5,7 @@ package blaze
import cats.effect.IO
import java.net.{HttpURLConnection, URL}
import java.nio.charset.StandardCharsets
import org.http4s.blaze.channel.ChannelOptions
import org.http4s.dsl.io._
import scala.concurrent.duration._
import scala.io.Source
Expand Down Expand Up @@ -88,4 +89,55 @@ class BlazeServerSpec extends Http4sSpec {
}
}
}

"ChannelOptions" should {
"default to empty" in {
builder.channelOptions must_== ChannelOptions(Vector.empty)
}
"set socket send buffer size" in {
builder.withSocketSendBufferSize(8192).socketSendBufferSize must beSome(8192)
}
"set socket receive buffer size" in {
builder.withSocketReceiveBufferSize(8192).socketReceiveBufferSize must beSome(8192)
}
"set socket keepalive" in {
builder.withSocketKeepAlive(true).socketKeepAlive must beSome(true)
}
"set socket reuse address" in {
builder.withSocketReuseAddress(true).socketReuseAddress must beSome(true)
}
"set TCP nodelay" in {
builder.withTcpNoDelay(true).tcpNoDelay must beSome(true)
}
"unset socket send buffer size" in {
builder
.withSocketSendBufferSize(8192)
.withDefaultSocketSendBufferSize
.socketSendBufferSize must beNone
}
"unset socket receive buffer size" in {
builder
.withSocketReceiveBufferSize(8192)
.withDefaultSocketReceiveBufferSize
.socketReceiveBufferSize must beNone
}
"unset socket keepalive" in {
builder.withSocketKeepAlive(true).withDefaultSocketKeepAlive.socketKeepAlive must beNone
}
"unset socket reuse address" in {
builder
.withSocketReuseAddress(true)
.withDefaultSocketReuseAddress
.socketReuseAddress must beNone
}
"unset TCP nodelay" in {
builder.withTcpNoDelay(true).withDefaultTcpNoDelay.tcpNoDelay must beNone
}
"overwrite keys" in {
builder
.withSocketSendBufferSize(8192)
.withSocketSendBufferSize(4096)
.socketSendBufferSize must beSome(4096)
}
}
}