New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Streamed Json Encoding #3891
Fix Streamed Json Encoding #3891
Conversation
Could or should the chunk size, turned into a parameter of the functions, even at a later PR? |
Part of the untyped contract of encoders is that it flushes per chunk. It's the only way the user can control when partial results become available. Instead of an arbitrary limit, could we intersperse each |
That's what the most recent commit is. Each incoming chunk gets written, interpolated with the commas, and then rechunked. The first chunk gets special treatment because it has different logic than the comma interpolation. Sorry, original PR description is now wrong with what the code implements. As currently written outputs chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah, this looks different than what I looked at. This is good, and should obviate the need for configuration (which was a good idea by @diesalbla in the old impl).
} | ||
Chunk | ||
.vector(bldr.result()) | ||
.flatMap(identity) // I know there must be a more efficient weay to do this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe Chunk.concatBytes(bldr.result())
?
case None => Pull.done | ||
case Some((hd, tl)) => | ||
Pull.output( | ||
Chunk.concatBytes(Vector(CirceInstances.openBrace, fromJsonToChunk(printer)(hd))) | ||
) >> // Output First Json As Chunk with leading `[` | ||
tl.repeatPull { | ||
_.uncons.flatMap { | ||
case None => Pull.pure(None) | ||
case Some((hd, tl)) => | ||
val interspersed = { | ||
val bldr = Vector.newBuilder[Chunk[Byte]] | ||
bldr.sizeHint(hd.size * 2) | ||
hd.foreach { o => | ||
bldr += CirceInstances.comma | ||
bldr += fromJsonToChunk(printer)(o) | ||
} | ||
Chunk.concatBytes(bldr.result()) | ||
} | ||
Pull.output(interspersed) >> Pull.pure(Some(tl)) | ||
} | ||
}.pull | ||
.echo | ||
}.stream ++ Stream.chunk(closeBrace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChristopherDavenport @rossabaker In the case of encoding an empty list using streams, I notice the closeBrace
is printed always at then, but the openBrace
is only printed if it has elements. If not, it will use Pull.done
and the encoding will only be: ]
, right?
.echo | ||
}.stream ++ Stream.chunk(closeBrace) | ||
|
||
private final val openBrace: Chunk[Byte] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, what purpose does 'final' serve since the val lives in an object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember right, a final val in an object can be inlined only if it doesn't have a type annotation. I don't know how it matters here, if at all.
Fixes #3890
Questions: Is this a good chunk size? should we make it configurable in other versions? how does this relate to the benches for cutoff?
This should see a reduction in buffer flushes of 2000x