Skip to content
Permalink
Browse files

Integrate socket options into Ember

  • Loading branch information
viktor.lund
viktor.lund committed Sep 3, 2019
1 parent 0f8ae71 commit a42df478a98e965e797e6815548c95711fe3e1c2
@@ -14,6 +14,7 @@ import org.http4s.headers.Connection
import org.http4s.Response
import org.http4s.client._
import fs2.io.tcp.SocketGroup
import fs2.io.tcp.SocketOptionMapping
import scala.concurrent.duration.Duration

final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
@@ -25,7 +26,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
private val logger: Logger[F],
val chunkSize: Int,
val maxResponseHeaderSize: Int,
val timeout: Duration
val timeout: Duration,
val additionalSocketOptions: List[SocketOptionMapping[_]]
) { self =>

private def copy(
@@ -37,7 +39,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
logger: Logger[F] = self.logger,
chunkSize: Int = self.chunkSize,
maxResponseHeaderSize: Int = self.maxResponseHeaderSize,
timeout: Duration = self.timeout
timeout: Duration = self.timeout,
additionalSocketOptions: List[SocketOptionMapping[_]] = self.additionalSocketOptions
): EmberClientBuilder[F] = new EmberClientBuilder[F](
sslContextOpt = sslContextOpt,
sgR = sgR,
@@ -47,7 +50,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
logger = logger,
chunkSize = chunkSize,
maxResponseHeaderSize = maxResponseHeaderSize,
timeout = timeout
timeout = timeout,
additionalSocketOptions = additionalSocketOptions
)

def withSslContext(sslExecutionContext: ExecutionContext, sslContext: SSLContext) =
@@ -65,6 +69,7 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
def withMaxResponseHeaderSize(maxResponseHeaderSize: Int) =
copy(maxResponseHeaderSize = maxResponseHeaderSize)
def withTimeout(timeout: Duration) = copy(timeout = timeout)
def withAdditionalSocketOptions(additionalSocketOptions: List[SocketOptionMapping[_]]) = copy(additionalSocketOptions = additionalSocketOptions)

def build: Resource[F, Client[F]] =
for {
@@ -76,7 +81,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
.requestKeyToSocketWithKey[F](
requestKey,
sslContextOpt,
sg
sg,
additionalSocketOptions
)
.allocated <* logger.trace(s"Created Connection - RequestKey: ${requestKey}")
}, {
@@ -149,7 +155,8 @@ object EmberClientBuilder {
Slf4jLogger.getLogger[F],
Defaults.chunkSize,
Defaults.maxResponseHeaderSize,
Defaults.timeout
Defaults.timeout,
Defaults.additionalSocketOptions
)

private object Defaults {
@@ -169,6 +176,7 @@ object EmberClientBuilder {
}
val maxTotal = 100
val idleTimeInPool = 30.seconds // 30 Seconds in Nanos
val additionalSocketOptions = List.empty[SocketOptionMapping[_]]
}

private def tryDefaultSslContext: Option[SSLContext] =
@@ -25,24 +25,27 @@ private[client] object ClientHelpers {
def requestToSocketWithKey[F[_]: Concurrent: Timer: ContextShift](
request: Request[F],
sslContext: Option[(ExecutionContext, SSLContext)],
sg: SocketGroup
sg: SocketGroup,
additionalSocketOptions: List[SocketOptionMapping[_]]
): Resource[F, RequestKeySocket[F]] = {
val requestKey = RequestKey.fromRequest(request)
requestKeyToSocketWithKey[F](
requestKey,
sslContext,
sg
sg,
additionalSocketOptions
)
}

def requestKeyToSocketWithKey[F[_]: Concurrent: Timer: ContextShift](
requestKey: RequestKey,
sslContext: Option[(ExecutionContext, SSLContext)],
sg: SocketGroup
sg: SocketGroup,
additionalSocketOptions: List[SocketOptionMapping[_]]
): Resource[F, RequestKeySocket[F]] =
for {
address <- Resource.liftF(getAddress(requestKey))
initSocket <- sg.client[F](address)
initSocket <- sg.client[F](address, additionalSocketOptions = additionalSocketOptions)
socket <- Resource.liftF {
if (requestKey.scheme === Uri.Scheme.https)
sslContext.fold[F[Socket[F]]](
@@ -5,6 +5,7 @@ import cats.implicits._
import cats.effect._
import fs2.concurrent._
import fs2.io.tcp.SocketGroup
import fs2.io.tcp.SocketOptionMapping
import org.http4s._
import org.http4s.server.Server
import scala.concurrent.duration._
@@ -22,6 +23,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
val receiveBufferSize: Int,
val maxHeaderSize: Int,
val requestHeaderReceiveTimeout: Duration,
val additionalSocketOptions: List[SocketOptionMapping[_]],
private val logger: Logger[F]
) { self =>

@@ -36,6 +38,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize: Int = self.receiveBufferSize,
maxHeaderSize: Int = self.maxHeaderSize,
requestHeaderReceiveTimeout: Duration = self.requestHeaderReceiveTimeout,
additionalSocketOptions: List[SocketOptionMapping[_]] = self.additionalSocketOptions,
logger: Logger[F] = self.logger
): EmberServerBuilder[F] = new EmberServerBuilder[F](
host = host,
@@ -48,6 +51,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize = receiveBufferSize,
maxHeaderSize = maxHeaderSize,
requestHeaderReceiveTimeout = requestHeaderReceiveTimeout,
additionalSocketOptions = additionalSocketOptions,
logger = logger
)

@@ -86,6 +90,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize,
maxHeaderSize,
requestHeaderReceiveTimeout,
additionalSocketOptions,
logger
)
.compile
@@ -116,6 +121,7 @@ object EmberServerBuilder {
receiveBufferSize = Defaults.receiveBufferSize,
maxHeaderSize = Defaults.maxHeaderSize,
requestHeaderReceiveTimeout = Defaults.requestHeaderReceiveTimeout,
additionalSocketOptions = Defaults.additionalSocketOptions,
logger = Slf4jLogger.getLogger[F]
)

@@ -137,5 +143,6 @@ object EmberServerBuilder {
val receiveBufferSize: Int = 256 * 1024
val maxHeaderSize: Int = 10 * 1024
val requestHeaderReceiveTimeout: Duration = 5.seconds
val additionalSocketOptions = List.empty[SocketOptionMapping[_]]
}
}
@@ -28,6 +28,7 @@ private[server] object ServerHelpers {
receiveBufferSize: Int = 256 * 1024,
maxHeaderSize: Int = 10 * 1024,
requestHeaderReceiveTimeout: Duration = 5.seconds,
additionalSocketOptions: List[SocketOptionMapping[_]] = List.empty,
logger: Logger[F]
)(implicit C: Clock[F]): Stream[F, Nothing] = {

@@ -38,7 +39,8 @@ private[server] object ServerHelpers {
def socketReadRequest(
socket: Socket[F],
requestHeaderReceiveTimeout: Duration,
receiveBufferSize: Int): F[Request[F]] = {
receiveBufferSize: Int
): F[Request[F]] = {
val (initial, readDuration) = requestHeaderReceiveTimeout match {
case fin: FiniteDuration => (true, fin)
case _ => (false, 0.millis)
@@ -61,7 +63,7 @@ private[server] object ServerHelpers {
.eval(termSignal)
.flatMap(
terminationSignal =>
sg.server[F](bindAddress)
sg.server[F](bindAddress, additionalSocketOptions = additionalSocketOptions)
.map(connect =>
Stream.eval(
connect.use { socket =>

0 comments on commit a42df47

Please sign in to comment.
You can’t perform that action at this time.