Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bufferMaxSize parameter of CachingChunkWriter made configurable #2366

Merged
merged 9 commits into from Jan 24, 2019
Expand Up @@ -26,6 +26,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
val maxResponseLineSize: Int,
val maxHeaderLength: Int,
val maxChunkSize: Int,
val chunkBufferMaxSize: Int,
val parserMode: ParserMode,
val bufferSize: Int,
val executionContext: ExecutionContext,
Expand All @@ -49,6 +50,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
maxResponseLineSize: Int = maxResponseLineSize,
maxHeaderLength: Int = maxHeaderLength,
maxChunkSize: Int = maxChunkSize,
chunkBufferMaxSize: Int = chunkBufferMaxSize,
parserMode: ParserMode = parserMode,
bufferSize: Int = bufferSize,
executionContext: ExecutionContext = executionContext,
Expand All @@ -68,6 +70,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
maxResponseLineSize = maxResponseLineSize,
maxHeaderLength = maxHeaderLength,
maxChunkSize = maxChunkSize,
chunkBufferMaxSize = chunkBufferMaxSize,
parserMode = parserMode,
bufferSize = bufferSize,
executionContext = executionContext,
Expand Down Expand Up @@ -120,6 +123,9 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
def withMaxChunkSize(maxChunkSize: Int): BlazeClientBuilder[F] =
copy(maxChunkSize = maxChunkSize)

def withChunkBufferMaxSize(chunkBufferMaxSize: Int): BlazeClientBuilder[F] =
copy(chunkBufferMaxSize = chunkBufferMaxSize)

def withParserMode(parserMode: ParserMode): BlazeClientBuilder[F] =
copy(parserMode = parserMode)

Expand Down Expand Up @@ -166,6 +172,7 @@ sealed abstract class BlazeClientBuilder[F[_]] private (
maxResponseLineSize = maxResponseLineSize,
maxHeaderLength = maxHeaderLength,
maxChunkSize = maxChunkSize,
chunkBufferMaxSize = chunkBufferMaxSize,
parserMode = parserMode,
userAgent = userAgent,
channelOptions = channelOptions
Expand Down Expand Up @@ -200,6 +207,7 @@ object BlazeClientBuilder {
maxResponseLineSize = 4096,
maxHeaderLength = 40960,
maxChunkSize = Int.MaxValue,
chunkBufferMaxSize = 1024 * 1024,
parserMode = ParserMode.Strict,
bufferSize = 8192,
executionContext = executionContext,
Expand Down
Expand Up @@ -31,6 +31,7 @@ import scala.concurrent.duration._
* @param maxResponseLineSize maximum length of the request line
* @param maxHeaderLength maximum length of headers
* @param maxChunkSize maximum size of chunked content chunks
* @param chunkBufferMaxSize Size of the buffer that is used when Content-Length header is not specified.
* @param lenientParser a lenient parser will accept illegal chars but replaces them with � (0xFFFD)
* @param bufferSize internal buffer size of the blaze client
* @param executionContext custom executionContext to run async computations.
Expand All @@ -53,6 +54,7 @@ final case class BlazeClientConfig( // HTTP properties
maxResponseLineSize: Int,
maxHeaderLength: Int,
maxChunkSize: Int,
chunkBufferMaxSize: Int,
lenientParser: Boolean,
// pipeline management
bufferSize: Int,
Expand Down Expand Up @@ -80,6 +82,7 @@ object BlazeClientConfig {
maxResponseLineSize = 4 * 1024,
maxHeaderLength = 40 * 1024,
maxChunkSize = Integer.MAX_VALUE,
chunkBufferMaxSize = 1024 * 1024,
lenientParser = false,
bufferSize = bits.DefaultBufferSize,
executionContext = ExecutionContext.global,
Expand Down
Expand Up @@ -26,6 +26,7 @@ object Http1Client {
maxResponseLineSize = config.maxResponseLineSize,
maxHeaderLength = config.maxHeaderLength,
maxChunkSize = config.maxChunkSize,
chunkBufferMaxSize = config.chunkBufferMaxSize,
parserMode = if (config.lenientParser) ParserMode.Lenient else ParserMode.Strict,
userAgent = config.userAgent,
channelOptions = ChannelOptions(Vector.empty)
Expand Down
Expand Up @@ -27,6 +27,7 @@ private final class Http1Connection[F[_]](
maxResponseLineSize: Int,
maxHeaderLength: Int,
maxChunkSize: Int,
override val chunkBufferMaxSize: Int,
parserMode: ParserMode,
userAgent: Option[`User-Agent`]
)(implicit protected val F: ConcurrentEffect[F])
Expand Down
Expand Up @@ -28,6 +28,7 @@ final private class Http1Support[F[_]](
maxResponseLineSize: Int,
maxHeaderLength: Int,
maxChunkSize: Int,
chunkBufferMaxSize: Int,
parserMode: ParserMode,
userAgent: Option[`User-Agent`],
channelOptions: ChannelOptions
Expand Down Expand Up @@ -65,6 +66,7 @@ final private class Http1Support[F[_]](
maxResponseLineSize = maxResponseLineSize,
maxHeaderLength = maxHeaderLength,
maxChunkSize = maxChunkSize,
chunkBufferMaxSize = chunkBufferMaxSize,
parserMode = parserMode,
userAgent = userAgent
)
Expand Down
Expand Up @@ -21,14 +21,16 @@ class BlazeClientSpec extends Http4sSpec {
def mkClient(
maxConnectionsPerRequestKey: Int,
responseHeaderTimeout: Duration = 1.minute,
requestTimeout: Duration = 1.minute
requestTimeout: Duration = 1.minute,
chunkBufferMaxSize: Int = 1024
) =
BlazeClientBuilder[IO](testExecutionContext)
.withSslContext(bits.TrustingSslContext)
.withCheckEndpointAuthentication(false)
.withResponseHeaderTimeout(responseHeaderTimeout)
.withRequestTimeout(requestTimeout)
.withMaxConnectionsPerRequestKey(Function.const(maxConnectionsPerRequestKey))
.withChunkBufferMaxSize(chunkBufferMaxSize)
.resource

private def testServlet = new HttpServlet {
Expand Down
Expand Up @@ -35,6 +35,7 @@ class ClientTimeoutSpec extends Http4sSpec {
maxResponseLineSize = 4 * 1024,
maxHeaderLength = 40 * 1024,
maxChunkSize = Int.MaxValue,
chunkBufferMaxSize = 1024 * 1024,
parserMode = ParserMode.Strict,
userAgent = None
)
Expand Down
Expand Up @@ -35,6 +35,7 @@ class Http1ClientStageSpec extends Http4sSpec {
maxResponseLineSize = 4096,
maxHeaderLength = 40960,
maxChunkSize = Int.MaxValue,
chunkBufferMaxSize = 1024,
parserMode = ParserMode.Strict,
userAgent = userAgent
)
Expand Down
Expand Up @@ -26,6 +26,8 @@ trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] =>

protected implicit def F: Effect[F]

protected def chunkBufferMaxSize: Int

protected def doParseContent(buffer: ByteBuffer): Option[ByteBuffer]

protected def contentComplete(): Boolean
Expand Down Expand Up @@ -121,7 +123,7 @@ trait Http1Stage[F[_]] { self: TailStage[ByteBuffer] =>

case None => // use a cached chunk encoder for HTTP/1.1 without length of transfer encoding
logger.trace("Using Caching Chunk Encoder")
new CachingChunkWriter(this, trailer)
new CachingChunkWriter(this, trailer, chunkBufferMaxSize)
}
}

Expand Down
Expand Up @@ -14,9 +14,7 @@ import scala.concurrent._
private[http4s] class CachingChunkWriter[F[_]](
pipe: TailStage[ByteBuffer],
trailer: F[Headers],
bufferMaxSize: Int = 10 * 1024)(
implicit protected val F: Effect[F],
protected val ec: ExecutionContext)
bufferMaxSize: Int)(implicit protected val F: Effect[F], protected val ec: ExecutionContext)
extends Http1Writer[F] {
import ChunkWriter._

Expand Down
Expand Up @@ -92,11 +92,11 @@ class Http1WriterSpec extends Http4sSpec {
}

"CachingChunkWriter" should {
runNonChunkedTests(tail => new CachingChunkWriter[IO](tail, IO.pure(Headers())))
runNonChunkedTests(tail => new CachingChunkWriter[IO](tail, IO.pure(Headers()), 1024 * 1024))
}

"CachingStaticWriter" should {
runNonChunkedTests(tail => new CachingChunkWriter[IO](tail, IO.pure(Headers())))
runNonChunkedTests(tail => new CachingChunkWriter[IO](tail, IO.pure(Headers()), 1024 * 1024))
}

"FlushingChunkWriter" should {
Expand Down
Expand Up @@ -52,6 +52,7 @@ import scodec.bits.ByteVector
* If exceeded returns a 400 Bad Request.
* @param maxHeadersLen: Maximum data that composes the headers.
* If exceeded returns a 400 Bad Request.
* @param chunkBufferMaxSize Size of the buffer that is used when Content-Length header is not specified.
* @param serviceMounts: The services that are mounted on this server to serve.
* These services get assembled into a Router with the longer prefix winning.
* @param serviceErrorHandler: The last resort to recover and generate a response
Expand All @@ -72,6 +73,7 @@ class BlazeServerBuilder[F[_]](
isHttp2Enabled: Boolean,
maxRequestLineLen: Int,
maxHeadersLen: Int,
chunkBufferMaxSize: Int,
httpApp: HttpApp[F],
serviceErrorHandler: ServiceErrorHandler[F],
banner: immutable.Seq[String],
Expand All @@ -96,6 +98,7 @@ class BlazeServerBuilder[F[_]](
http2Support: Boolean = isHttp2Enabled,
maxRequestLineLen: Int = maxRequestLineLen,
maxHeadersLen: Int = maxHeadersLen,
chunkBufferMaxSize: Int = chunkBufferMaxSize,
httpApp: HttpApp[F] = httpApp,
serviceErrorHandler: ServiceErrorHandler[F] = serviceErrorHandler,
banner: immutable.Seq[String] = banner,
Expand All @@ -114,6 +117,7 @@ class BlazeServerBuilder[F[_]](
http2Support,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
httpApp,
serviceErrorHandler,
banner,
Expand Down Expand Up @@ -188,6 +192,9 @@ class BlazeServerBuilder[F[_]](
def withMaxHeadersLength(maxHeadersLength: Int): BlazeServerBuilder[F] =
copy(maxHeadersLen = maxHeadersLength)

def withChunkBufferMaxSize(chunkBufferMaxSize: Int): BlazeServerBuilder[F] =
copy(chunkBufferMaxSize = chunkBufferMaxSize)

def resource: Resource[F, Server[F]] = tickWheelResource.flatMap { scheduler =>
Resource(F.delay {

Expand Down Expand Up @@ -241,6 +248,7 @@ class BlazeServerBuilder[F[_]](
enableWebSockets,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
serviceErrorHandler,
responseHeaderTimeout,
idleTimeout,
Expand All @@ -253,6 +261,7 @@ class BlazeServerBuilder[F[_]](
httpApp,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
requestAttributes(secure = true, engine.some),
executionContext,
serviceErrorHandler,
Expand Down Expand Up @@ -381,6 +390,7 @@ object BlazeServerBuilder {
isHttp2Enabled = false,
maxRequestLineLen = 4 * 1024,
maxHeadersLen = 40 * 1024,
chunkBufferMaxSize = 1024 * 1024,
httpApp = defaultApp[F],
serviceErrorHandler = DefaultServiceErrorHandler[F],
banner = defaults.Banner,
Expand Down
Expand Up @@ -31,6 +31,7 @@ private[blaze] object Http1ServerStage {
enableWebSockets: Boolean,
maxRequestLineLen: Int,
maxHeadersLen: Int,
chunkBufferMaxSize: Int,
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration,
idleTimeout: Duration,
Expand All @@ -44,6 +45,7 @@ private[blaze] object Http1ServerStage {
executionContext,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
serviceErrorHandler,
responseHeaderTimeout,
idleTimeout,
Expand All @@ -55,6 +57,7 @@ private[blaze] object Http1ServerStage {
executionContext,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
serviceErrorHandler,
responseHeaderTimeout,
idleTimeout,
Expand All @@ -67,6 +70,7 @@ private[blaze] class Http1ServerStage[F[_]](
implicit protected val executionContext: ExecutionContext,
maxRequestLineLen: Int,
maxHeadersLen: Int,
override val chunkBufferMaxSize: Int,
serviceErrorHandler: ServiceErrorHandler[F],
responseHeaderTimeout: Duration,
idleTimeout: Duration,
Expand Down
Expand Up @@ -19,6 +19,7 @@ private[blaze] object ProtocolSelector {
httpApp: HttpApp[F],
maxRequestLineLen: Int,
maxHeadersLen: Int,
chunkBufferMaxSize: Int,
requestAttributes: () => AttributeMap,
executionContext: ExecutionContext,
serviceErrorHandler: ServiceErrorHandler[F],
Expand Down Expand Up @@ -63,6 +64,7 @@ private[blaze] object ProtocolSelector {
enableWebSockets = false,
maxRequestLineLen,
maxHeadersLen,
chunkBufferMaxSize,
serviceErrorHandler,
responseHeaderTimeout,
idleTimeout,
Expand Down
Expand Up @@ -56,6 +56,7 @@ class Http1ServerStageSpec extends Http4sSpec with AfterAll {
enableWebSockets = true,
maxReqLine,
maxHeaders,
10 * 1024,
DefaultServiceErrorHandler,
30.seconds,
30.seconds,
Expand Down