Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate socket options into Ember #2836

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -14,6 +14,7 @@ import org.http4s.headers.Connection
import org.http4s.Response
import org.http4s.client._
import fs2.io.tcp.SocketGroup
import fs2.io.tcp.SocketOptionMapping
import scala.concurrent.duration.Duration

final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
Expand All @@ -25,7 +26,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
private val logger: Logger[F],
val chunkSize: Int,
val maxResponseHeaderSize: Int,
val timeout: Duration
val timeout: Duration,
val additionalSocketOptions: List[SocketOptionMapping[_]]
) { self =>

private def copy(
Expand All @@ -37,7 +39,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
logger: Logger[F] = self.logger,
chunkSize: Int = self.chunkSize,
maxResponseHeaderSize: Int = self.maxResponseHeaderSize,
timeout: Duration = self.timeout
timeout: Duration = self.timeout,
additionalSocketOptions: List[SocketOptionMapping[_]] = self.additionalSocketOptions
): EmberClientBuilder[F] = new EmberClientBuilder[F](
sslContextOpt = sslContextOpt,
sgR = sgR,
Expand All @@ -47,7 +50,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
logger = logger,
chunkSize = chunkSize,
maxResponseHeaderSize = maxResponseHeaderSize,
timeout = timeout
timeout = timeout,
additionalSocketOptions = additionalSocketOptions
)

def withSslContext(sslExecutionContext: ExecutionContext, sslContext: SSLContext) =
Expand All @@ -65,6 +69,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
def withMaxResponseHeaderSize(maxResponseHeaderSize: Int) =
copy(maxResponseHeaderSize = maxResponseHeaderSize)
def withTimeout(timeout: Duration) = copy(timeout = timeout)
def withAdditionalSocketOptions(additionalSocketOptions: List[SocketOptionMapping[_]]) =
copy(additionalSocketOptions = additionalSocketOptions)

def build: Resource[F, Client[F]] =
for {
Expand All @@ -76,7 +82,8 @@ final class EmberClientBuilder[F[_]: Concurrent: Timer: ContextShift] private (
.requestKeyToSocketWithKey[F](
requestKey,
sslContextOpt,
sg
sg,
additionalSocketOptions
)
.allocated <* logger.trace(s"Created Connection - RequestKey: ${requestKey}")
}, {
Expand Down Expand Up @@ -149,7 +156,8 @@ object EmberClientBuilder {
Slf4jLogger.getLogger[F],
Defaults.chunkSize,
Defaults.maxResponseHeaderSize,
Defaults.timeout
Defaults.timeout,
Defaults.additionalSocketOptions
)

private object Defaults {
Expand All @@ -169,6 +177,7 @@ object EmberClientBuilder {
}
val maxTotal = 100
val idleTimeInPool = 30.seconds // 30 Seconds in Nanos
val additionalSocketOptions = List.empty[SocketOptionMapping[_]]
}

private def tryDefaultSslContext: Option[SSLContext] =
Expand Down
Expand Up @@ -25,24 +25,27 @@ private[client] object ClientHelpers {
def requestToSocketWithKey[F[_]: Concurrent: Timer: ContextShift](
request: Request[F],
sslContext: Option[(ExecutionContext, SSLContext)],
sg: SocketGroup
sg: SocketGroup,
additionalSocketOptions: List[SocketOptionMapping[_]]
): Resource[F, RequestKeySocket[F]] = {
val requestKey = RequestKey.fromRequest(request)
requestKeyToSocketWithKey[F](
requestKey,
sslContext,
sg
sg,
additionalSocketOptions
)
}

def requestKeyToSocketWithKey[F[_]: Concurrent: Timer: ContextShift](
requestKey: RequestKey,
sslContext: Option[(ExecutionContext, SSLContext)],
sg: SocketGroup
sg: SocketGroup,
additionalSocketOptions: List[SocketOptionMapping[_]]
): Resource[F, RequestKeySocket[F]] =
for {
address <- Resource.liftF(getAddress(requestKey))
initSocket <- sg.client[F](address)
initSocket <- sg.client[F](address, additionalSocketOptions = additionalSocketOptions)
socket <- Resource.liftF {
if (requestKey.scheme === Uri.Scheme.https)
sslContext.fold[F[Socket[F]]](
Expand Down
Expand Up @@ -5,6 +5,7 @@ import cats.implicits._
import cats.effect._
import fs2.concurrent._
import fs2.io.tcp.SocketGroup
import fs2.io.tcp.SocketOptionMapping
import org.http4s._
import org.http4s.server.Server
import scala.concurrent.duration._
Expand All @@ -22,6 +23,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
val receiveBufferSize: Int,
val maxHeaderSize: Int,
val requestHeaderReceiveTimeout: Duration,
val additionalSocketOptions: List[SocketOptionMapping[_]],
private val logger: Logger[F]
) { self =>

Expand All @@ -36,6 +38,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize: Int = self.receiveBufferSize,
maxHeaderSize: Int = self.maxHeaderSize,
requestHeaderReceiveTimeout: Duration = self.requestHeaderReceiveTimeout,
additionalSocketOptions: List[SocketOptionMapping[_]] = self.additionalSocketOptions,
logger: Logger[F] = self.logger
): EmberServerBuilder[F] = new EmberServerBuilder[F](
host = host,
Expand All @@ -48,6 +51,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize = receiveBufferSize,
maxHeaderSize = maxHeaderSize,
requestHeaderReceiveTimeout = requestHeaderReceiveTimeout,
additionalSocketOptions = additionalSocketOptions,
logger = logger
)

Expand Down Expand Up @@ -86,6 +90,7 @@ final class EmberServerBuilder[F[_]: Concurrent: Timer: ContextShift] private (
receiveBufferSize,
maxHeaderSize,
requestHeaderReceiveTimeout,
additionalSocketOptions,
logger
)
.compile
Expand Down Expand Up @@ -116,6 +121,7 @@ object EmberServerBuilder {
receiveBufferSize = Defaults.receiveBufferSize,
maxHeaderSize = Defaults.maxHeaderSize,
requestHeaderReceiveTimeout = Defaults.requestHeaderReceiveTimeout,
additionalSocketOptions = Defaults.additionalSocketOptions,
logger = Slf4jLogger.getLogger[F]
)

Expand All @@ -137,5 +143,6 @@ object EmberServerBuilder {
val receiveBufferSize: Int = 256 * 1024
val maxHeaderSize: Int = 10 * 1024
val requestHeaderReceiveTimeout: Duration = 5.seconds
val additionalSocketOptions = List.empty[SocketOptionMapping[_]]
}
}
Expand Up @@ -28,6 +28,7 @@ private[server] object ServerHelpers {
receiveBufferSize: Int = 256 * 1024,
maxHeaderSize: Int = 10 * 1024,
requestHeaderReceiveTimeout: Duration = 5.seconds,
additionalSocketOptions: List[SocketOptionMapping[_]] = List.empty,
logger: Logger[F]
)(implicit C: Clock[F]): Stream[F, Nothing] = {

Expand All @@ -38,7 +39,8 @@ private[server] object ServerHelpers {
def socketReadRequest(
socket: Socket[F],
requestHeaderReceiveTimeout: Duration,
receiveBufferSize: Int): F[Request[F]] = {
receiveBufferSize: Int
): F[Request[F]] = {
val (initial, readDuration) = requestHeaderReceiveTimeout match {
case fin: FiniteDuration => (true, fin)
case _ => (false, 0.millis)
Expand All @@ -61,7 +63,7 @@ private[server] object ServerHelpers {
.eval(termSignal)
.flatMap(
terminationSignal =>
sg.server[F](bindAddress)
sg.server[F](bindAddress, additionalSocketOptions = additionalSocketOptions)
.map(connect =>
Stream.eval(
connect.use { socket =>
Expand Down