Skip to content

Commit

Permalink
stream: reuse maxBytesPerChunk buffer for inflate / gunzip (#30239)
Browse files Browse the repository at this point in the history
Before, for every round of `parse` a new buffer was allocated that was then
copied again by `ByteString.fromArray`. This effectively more than doubled
the allocation rate while inflating. For bulk data like expected for
compressed data this can make a big difference in throughput.

The slight downside of keeping the buffer is that the stage now uses more memory
by default even while idle. deflate/gzip's window is 64kb which happens to be also the
default `maxBytesPerChunk` setting. It is therefore expected that the additional buffer will
less than double the existing memory footprint while dividing the allocation rate
by more than two which seems like a good trade-off.
  • Loading branch information
jrudolph committed May 17, 2021
1 parent 2dde4b6 commit 18e7881
Showing 1 changed file with 8 additions and 1 deletion.
Expand Up @@ -21,12 +21,19 @@ import akka.util.ByteString
def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit
def inflating: Inflate

/**
* Pre-allocated buffer to read from inflater. ByteString.fromArray below
* will always create a copy of the read data. Keeping this fixed
* buffer around avoids reallocating a buffer that may be too big in many
* cases for every call of `parse`.
*/
private[this] val buffer = new Array[Byte](maxBytesPerChunk)

abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] {
override def canWorkWithPartialData = true
override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = {
inflater.setInput(reader.remainingData.toArray)

val buffer = new Array[Byte](maxBytesPerChunk)
val read = inflater.inflate(buffer)

reader.skip(reader.remainingSize - inflater.getRemaining)
Expand Down

0 comments on commit 18e7881

Please sign in to comment.