Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multi-line json streaming #2729

Merged
merged 32 commits into from
Mar 6, 2024
Merged

feat: multi-line json streaming #2729

merged 32 commits into from
Mar 6, 2024

Conversation

tychoish
Copy link
Collaborator

@tychoish tychoish commented Mar 3, 2024

This cleans up and unifies a lot of the existing multi-line json code,
and also (I think, though I need to do more testing,) should make it
possible to read ndjson with the same set of functions.

I don't think we should (neccessarily) remove the dedicated ndjson
support, unless this has on-par performance, but I think it would
be good if read_json didn't error for ndjson. If the performance is
close (less than 50% slowdown? say) to ndjson then CREATE EXTERNAL TABLE and the .json extension could go through these paths (while
adding explict ndjson or jsonl support.)

Regardles, however, given that the code here is a bit more managable,
(and could probably be refactored a bit more still), I think this
should be merged even if it doesn't work with ndjson (or
multi-line/multi-document) as long as there aren't any functional
regressions.

@tychoish
Copy link
Collaborator Author

tychoish commented Mar 3, 2024

status:

  • the code is in a form that makes sense
  • I have tests that catch: they do hang because the streaming library doesn't seem to do the right thing that I want with regards to the end of files/eof messages. this seems reasonable to hack in and the maintainer seems to be responsive and amenable.
  • The alternative is to wrap the byte stream in a reader, or do something kind of blocking for the entire batch? and then use the StreamDeserializer directly. The library does roughly the right thing from an async/incremental perspective but only imagines unbounded streams. Will ponder more later.

Copy link
Contributor

@universalmind303 universalmind303 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I think it would be good if read_json didn't error for ndjson.

I disagree, the handling and parsing of ndjson is fundamentally different from json and should be handled separately. We shouldn't attempt to give the user a suboptimal experience, but instead try to direct them to use the right tool for the job. if the functionality must exist within read_json, then we should have a flag for it like every other data processing tool that supports ndjson and json via the same function. read_json('some-file', lines=true) which just delegates to read_ndjson. The optimization techniques between the two also generally don't overlap.

Overloading read_json for multi purpose use also introduces it's own complexity. The rise in popularity of polars has shown that people don't mind explicit function calls like read_json and read_ndjson.

@tychoish
Copy link
Collaborator Author

tychoish commented Mar 4, 2024

I understand what you're saying here, and I don't disagree, and I don't think that we should combine functions, however, if we want to have the following inputs work:

  • unseparated:
{"a": 1} {"a":2}{"a":3}
  • multi line/multi document:
{
  "a": 1
} 
{
  "a": 2
}
  • newline-seperated json from-non-file sources (e.g. kafka streams, or victoria metrics data streams (which are, or could be new-line separated)
  • newline json data from any rest API that isn't supported by datafusion's object store

Then then we ought to make this work, and what's more, if this is going to be useful for any of these non-trivial use-case, it's going to incidentally work for newline separated json. Having poked a bit more at the arrow-json/datafusion implementations the benefits we get from ndjson are pretty limited and are mostly around the way that the reading infrastructure handles framing (and datafusion's repartitioning support.)

While there's probably a benefit around framing/batching the stream that you can if you know something is new-line separated particularly in a few edge cases (large sized documents, where the byte stream is long and we end up fetching in larger numbers unaligned batches,) smaller individual documents sizes and fewer batches will have much similar performance.

In any case, I'm not proposing any great change to either of these functions right now, just removing a collection of failure modes, we can deal with the semantic implications later.

@universalmind303
Copy link
Contributor

however, if we want to have the following inputs work:

neither of those examples are valid json, or ndjson. I think those are edge cases we shouldn't even try to program for.

@tychoish
Copy link
Collaborator Author

tychoish commented Mar 5, 2024

neither of those examples are valid json, or ndjson. I think those are edge cases we shouldn't even try to program for.

And yet, they're all trivally supported by serde_json. The stream-json library that I used here is a thin wrapper that buffers an input stream.) Indeed, even arrow-json supports this "concatenated json values with arbitrary whitespace."

The datafusion ndjson support seems to be at the level of the object store/data reading/framing, and not really have anything to do with parsing. I don't know.

In addition to cleaning up the code, this patch really just corrects the previous behavior (which was weird) that this corrects is that we'd previously download a complete file, read the first object from the file, unwind it if it was an array and then drops the rest of the data. Which is neither efficient nor the semantics that anyone would reasonably expect.

The thing that this code that the nd path doesn't do (in addition to the difference in framing) is unwinding top-level arrays of json documents, which is definitely useful for reading data off of APIs.

dependabot bot and others added 5 commits March 5, 2024 08:55
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Currently this commit simply adds `#[serde(default)]` for fields that
are required in v1 (or sets a valid default).

We do need more datasets to test this on. There's some deprecated stuff
that might require more attention to support.

Signed-off-by: Vaibhav <vrongmeal@gmail.com>
@tychoish
Copy link
Collaborator Author

tychoish commented Mar 5, 2024

Sorry for the messy merge, things should be cleaned up.

I ran the same testdata/userdata1.json file against the both read_json and read_ndjson and they have comparable performance (json is very slightly faster for select count(*), more consistently (by 25-30%) faster for select *). I don't think this is necessarily significant, but it's nice to see that they both are operating in the same order of magnitude.

I was reading the code, and realized that there was a place where I was copying from the object store's output stream into a vector and so I removed that (that's committed and pushed.) I'm pretty sure (now) that our stream handling and reading should avoid needing to copy the entirety of (anything) from the stream into vectors, which will help reduce the memory utilization with larger datasets. Having said that, this had the impact of making the count slower, and I suspect that there's some place in the pipeline that we could buffer more (between the object-store byte stream and the json stream?) that would get this back, but I don't think it's going to be material in the short term.

@@ -75,6 +75,7 @@ scylla = { version = "0.12.0" }
glob = "0.3.1"
indexmap = "2.2.5"
async-sqlite = "0.2.2"
json-stream = { git = "https://github.com/tychoish/json-stream", rev = "bd4990fab95f789740a75a8eea98d5dac1f0160a" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale? Are there changes to this? Can those be upstreamed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is just the content of a PR that's open: resyncgg/json-stream#5

The maintainer is responsive and said he'd look at it by the end of the week.

Comment on lines 22 to 24
// this is the same as a sendable recordbatch stream, but declared
// separtley so we can have isomorphic values using adapters.
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this means.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so if the type of this is SendableRecordBatchStream or another named type, and I got a stream of the appropriate type of objects using .flat_map or flatten I couldn't get it to compile,

I think the comments made things less clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might have been from not importing the futures::stream::Stream trait since those methods are defined on the trait.

struct JsonStreamHandler {
schema: Arc<Schema>,
buf: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
worker: tokio::task::JoinHandle<DFResult<()>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Why can't we pull and decode directly from the stream?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I just refactored it without this and it seemed to work. smh

@scsmithr
Copy link
Member

scsmithr commented Mar 6, 2024

however, if we want to have the following inputs work:

neither of those examples are valid json, or ndjson. I think those are edge cases we shouldn't even try to program for.

I think being able to read streaming json makes sense, especially in the context of a web request with a large body. Not needing to collect the entire input before being able to deserialize is a nice property.

I think the real hold up seems to be on handling multiple json objects in the same stream. I personally think being able to do that is a nice convenience even if it's technically neither valid ndjson or json (e.g. what a timeseries stream would output). I also think this code could be the base for handling a very large json arrays. Instead of reading the entirety into memory, we could just start streaming in the array body.

@tychoish
Copy link
Collaborator Author

tychoish commented Mar 6, 2024

I also think this code could be the base for handling a very large json arrays. Instead of reading the entirety into memory, we could just start streaming in the array body.

😓 This is a good thought, though it only operates on single objects so very large arrays go into memory and get unwound and are streamed back to the client, but live in memory for deserialization (we have to know that the end of an object exists in order to start processing it. So we'd probably have to write or find a JSON if we wanted to do this, but... I still like the idea.

scsmithr and others added 9 commits March 6, 2024 11:58
Related: GlareDB/cloud#2746

Previously we weren't allowing it just errored with something along the
lines of:

```
Catalog error: Lease for database 'a9edc39a-6129-4b19-b8dc-02483df4fc6e' held by other process; other_process_id: d7796f4c-c3e4-4822-b59e-1b4b9f32a48d, current_process_id: d7796f4c-c3e4-4822-b59e-1b4b9f32a48d
```

I believe I originally intended to allow a leaser to acquire a lease its
already acquired based on the error message.

This doesn't fix the rate limit issue, but does partially alleviate
errors causes by failing to drop leases because dropping the lease
failed due to a rate limit.
lance uses duckdb as a submodule, and it's a pretty massive repo, `555.9
MB` on a fresh clone. It currently takes absolutely forever to build
glaredb the first time _(or subsequent times if you nuke your cargo
cache)_.

`git clone https://github.com/duckdb/duckdb.git` alone takes almost 4
minutes locally.

this just forks lance and removes the submodule to avoid the problem
alltogether.
Co-authored-by: Sean Smith <scsmithr@gmail.com>
@scsmithr
Copy link
Member

scsmithr commented Mar 6, 2024

Having said that, this had the impact of making the count slower, and I suspect that there's some place in the pipeline that we could buffer more (between the object-store byte stream and the json stream?) that would get this back, but I don't think it's going to be material in the short term.

Are there relative performance numbers for this?

Copy link
Collaborator Author

@tychoish tychoish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got all of your feedback, and was able to refactor away one of the wrapper types and clean things up a bit. Sorry for the last minute churn, but the code is a bunch simpler now.

Comment on lines 22 to 24
// this is the same as a sendable recordbatch stream, but declared
// separtley so we can have isomorphic values using adapters.
stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so if the type of this is SendableRecordBatchStream or another named type, and I got a stream of the appropriate type of objects using .flat_map or flatten I couldn't get it to compile,

I think the comments made things less clear.

Comment on lines 22 to 23
// this is the same as a sendable recordbatch stream, but declared
// separtley so we can have isomorphic values using adapters.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// this is the same as a sendable recordbatch stream, but declared
// separtley so we can have isomorphic values using adapters.

struct JsonStreamHandler {
schema: Arc<Schema>,
buf: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>,
worker: tokio::task::JoinHandle<DFResult<()>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I just refactored it without this and it seemed to work. smh

@tychoish
Copy link
Collaborator Author

tychoish commented Mar 6, 2024

Are there relative performance numbers for this?

So I just ran the following queries a bunch on my laptop from the cli/repl:

\timing
select count(*) from read_json('testdata/json/userdata1.json');
select count(*) from read_ndjson('testdata/json/userdata1.json');
select * from read_ndjson('testdata/json/userdata1.json');
select * from read_json('testdata/json/userdata1.json');

There was an intermediary version of this PR where there was a bigger delta between the count timings that I'm not quite sure of...

Having said that, this is really a bake-off between serde_json::StreamDeserializer and arrow_json::TapeDecoder (I might be forgetting the type name here), and (I think) doesn't have anything to do with any of this code or anything in this design/implementation. Makes sense that serde_json would be faster

The ndjson path (probably? hopefully? maybe?) can do something more advanced than this with partitioning than just treating each file as it's own partition and I have done much less globbing work (local globbing didn't work until the middle of this PR's life)...

@tychoish tychoish requested a review from scsmithr March 6, 2024 18:52
@tychoish tychoish enabled auto-merge (squash) March 6, 2024 20:41
@tychoish tychoish merged commit 167403e into main Mar 6, 2024
25 checks passed
@tychoish tychoish deleted the tycho/full-json-streaming branch March 6, 2024 20:51
@tychoish tychoish mentioned this pull request Mar 6, 2024
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants