Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming json document/array data #2494

Merged
merged 19 commits into from
Jan 30, 2024
Merged

feat: streaming json document/array data #2494

merged 19 commits into from
Jan 30, 2024

Conversation

tychoish
Copy link
Collaborator

As an alternate implementation of #2306 for #2218.

I'm sort of partial to this implementation, as it requires a bunch less DF boiler plate, but would love other opinions.

There's some more work that should be done, a lot of which

  • testing
  • discussion about what the API should be. We should probably write a function that chooses between ndjson and json based on the first non-whitespace character of a file/stream.

Other notes:

  • I don't think this implementation is suitable for all json data, (though it will hande "one json document as a one row table" and "an array of json documents" quite well) even if we could get it to handle ndjson, without some work to sort of the schema inference:
  • my intention was that we always read the entire dataset, get the superset of the schema and then return the data. We could do schema inference.
  • as a result I sort of arbitrarily added some chunking and batch sizing. I definitely picked these randomly, and if there are better options, let me know.
  • while it does support globbing, it still reads everything into memory always.

Copy link
Member

@scsmithr scsmithr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test and ship!

crates/datasources/src/json/stream.rs Outdated Show resolved Hide resolved

pub struct JsonPartitionStream {
schema: Arc<Schema>,
stream: Mutex<Option<Pin<Box<dyn Stream<Item = Result<RecordBatch, DataFusionError>> + Send>>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically prefer parking_lot over std mutex.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use parking_lot and the .take() trick that you used here (because I stole it from the bson thing you did) wouldn't compile. Not super sure.

@tychoish
Copy link
Collaborator Author

Test and ship!

patience.

I also kinda want to have external tables for this too?

Also, how do we solve the ndjson/json disambiguation? have an option with a default?

crates/datasources/src/json/table.rs Outdated Show resolved Hide resolved
crates/datasources/src/json/table.rs Outdated Show resolved Hide resolved
pub fn new(schema: Arc<Schema>, chunk: Vec<Map<String, Value>>) -> Self {
let stream_schema = schema.clone();
let stream = futures::stream::iter(chunk)
.chunks(25)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 25?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's less than 100; Other numbers could be good too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess i don't understand why we chunk it, then chunk it again?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these streaming tables work by collecting a number of streams (from files, though in this case we will have read them all into memory because we need/want to get the schema (and do that losslessly)). Then the stream produces record batches.

The first chunk, gives us the partitions, the second gives us the size of the record batch.

I (following from your earlier comment) made the recordbatch size 1000, and the stream size as 10,000.


let mut field_set = indexmap::IndexMap::<String, DataType>::new();
for obj in &data {
for key in obj.keys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this iterate over all rows?

We already are iterating over it when its passed into arrow's decoder.

It seems reasonable that we could deserialized the json in a single pass instead of 2.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the loop on 37 iterates over all files/urls (in the case of a glob) and in most cases will only run once.

push_unwind_... doesn't iterate over the document, just builds the list of documents, erroring if we get something that isn't useable (a scalar). This is either a single document, or an array of documents, and unwinds the array when needed. We could extract the schema in this function, but:

  • I think the code would get more complicated, and this is pretty easy to understand.
  • in the future where you might specify a schema to the table provider, the code is, in its current form easier to modify in that direction
  • it would be easy to end up with code where you were were iterating less but doing more allocations to temporary data structures to keep track as you go (particularly if you have multiple files), and I feel like two loops are likely to be faster than one loop with a lot of allocation pressure.
  • my baseline assumption is that it's always going to be relatively small amounts of data (a few megs tops).
  • in the error case you end up building a schema that you're never going to use, and errors might end up being pretty common just because of malformed REST apis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my baseline assumption is that it's always going to be relatively small amounts of data (a few megs tops).

I don't think we should be making any assumptions about size of data, especially considering the globbing support. I could see someone trying to read in a directory of hundreds or thousands of small/medium json files.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of assumptions here:

  • the primary use case will be pulling information off of HTTP endpoints. It's just convenient to use to object store stuff. We want to say "point glaredb at and you get a SQL table." files with arbitrary data is fine too but a sort of just something that comes for free.
  • there are a lot of things that are kind of inefficient here because I want the experience in the above use case to not lose data (e.g. drop something from later documents because it's not in the schema). When we let folks pass schema clauses into functions and external tables, we can update these functions and also provide schema inference, and then we should change the features.

Unless we want to shelve this for a while while we figure out how to handle the schema, then all the data have to be in memory, or we have to parse all the files twice. I opted for the former based on the assumptions about the use case. I'm not opposed to addressing the "many small json files" or "data size larger than (I dunno) a gig" use cases, but I'm also fine if we punt on those, as there are workarounds (use polars) or (convert to ndjson), and it would not be difficult to add schema inference or explicit schema and/or lazy file reading to this implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mostly agree with these assumptions, but I do think there is another path forward (particularly for the multi file case). I don't think the assumption of all the data have to be in memory is correct.

At any given time, we only need to hold a single file in memory. All of our file based readers expect the same schema across multiple files/globs. So we can apply that same logic here, and only use the first file to extract the schema. From there, we can defer fetching & parsing of the remaining files into the streams.

This would be especially useful for (eventually) limit pushdowns. If you only want limit 1000 of thousands of files and each file is 1000, you could stop after the first file, saving n-1 cycles (both IO + CPU).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The streaming table handling basically implicitly pushes down limits because the streams are iterated lazily.

I implemented the "all of the first file, nothing more" schema inference. I think this will be ok for now.

The globbing definitely doesn't work (or at least I couldn't provoke it to work); it's probably a regression, but the bson code is subject to the same flaw, and we're not doing anything except using the library functions, which at least for now renders the multi-file use case somewhat moot.

We can dig into this next week.

crates/datasources/src/json/table.rs Outdated Show resolved Hide resolved
pub struct JsonScan;

impl ConstBuiltinFunction for JsonScan {
const NAME: &'static str = "read_json";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with just read_json and read_ndjson. This is explicit & doesn't leave any ambiguity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the duckdb json interface is a bit cleaner in terms of format autodetection, which this really doesn't have and that might be confusing.

If you take a newline delimited json file or glob you'll get one document per-file, with this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i think the autodetection is nice, but I think making users aware of the implications of using ndjson vs json is more important.

We can do a lot more optimizations with ndjson than we can with json, and the code paths are very different. We had some pretty lengthy discussions over at polars when adding ndjson functionality, and ended up making them two distinct functions for this reason. IMO, they are nearly as distinct from one another as json and csv, and should be treated as such.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense here for this function, and I'm willing to go with this naming, but we don't have a good answer for CREATE EXTERNAL TABLE which calls ndjson ... json.

} else {
DataType::List(Arc::new(Field::new(
"",
type_for_value(v.first().unwrap()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This'll definitely cause some issues later on. I made this same mistake in the polars json parser & we ended up getting a bunch of issues from parsing errors due to schema mismatching. We ended up needing to infer the super type for the values in the array.

for example, an array of [null, "some string", "some other string"] will result in List<Null> instead of List<Utf8>.

similarly, an array of [0, 1.0, 2.2] will get parsed as List<Int64> instead of List<Float64>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a minimum, we should error out if the values in the array are of different types.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I thought of this and decided to not care initially. There are a lot of edge cases that we'll continue to not handle very well ([nul, "foo", 1] plus arrays of objects...

I can do something replaces nulls with something that isn't a null (that'd cover a lot of cases) sort of everything else gets much harrier.

@universalmind303
Copy link
Contributor

can we please add some slt's. I don't think we should use pytests as the default way of writing tests. The SLT's ensure that the functionality works across multiple protocols. Additionally, they are subjectively easier to read and maintain than the pytests.

@tychoish
Copy link
Collaborator Author

can we please add some slt's. I don't think we should use pytests as the default way of writing tests. The SLT's ensure that the functionality works across multiple protocols.

I don't think you need full coverage across data sources and protocols:

  • you need to test that different query shapes and operations work over different protocols, but there are no new query shapes here.
  • we're not consistent with "testing across different protocols" already: none of the service integration tests currently run on postgres, or flightsql; same with snowflake.

Ideally there should be a small core of tests that run across all protocols with the remainder of the tests executed in whatever way is fast. At the moment there's no real downside to running a bunch of local tests/files with the same tests over different protocols, but if the tests took a long time, I wouldn't hesitate to drop things like the minio flight tests or the sqlserver flight test (actually, we should do this one, because we wait 6 minutes a build for it.)

Additionally, they are subjectively easier to read and maintain than the pytests.

I understand your preference here, but The SLTs frankly have terrible ergonomics for anything related to the filesystem, the fixtures are unintuitive, and the assertions are awkward enough. Though not applicable in this particular case (yet) SLTs don't provide any reasonable way to test COPY TO. We end up testing, effectively, GlareDB with itself and we end up with a lot of blindspots as a result.

The maintenance thing is tricky: the fixtures get setup in a lot of cases pretty far from the the actual assertions and there's no language server for SLTs, so you just have to grep wildly and hope than you've found the right thing. This is complicated by a lot of copy and pasted test cases. There's also no fixture cleanup within a "suite". This is often fine because people have made sure to make tests not collide (but people make mistakes), but moving tests around can't always be safe and the execution order and statefulness assumptions in a lot of the integration-type tests are a weakness both of the assertion and of the tests themselves.

There are lots of tests that shouldn't be tested with the pytests, and the SLTs are great for testing the semantics of SQL, (which makes sense) but pretty poorly suited to integration testing, and multistatement operations. In any case, the product is better for having multiple testing modalities and tools.

@tychoish
Copy link
Collaborator Author

testing modality aside, I think the real question is how we square the fact that ndjson is json when creating an external table or for file extension sniffing, and this would confuse that.

@tychoish tychoish dismissed universalmind303’s stale review January 30, 2024 23:25

testing implementation details

@tychoish tychoish merged commit df419aa into main Jan 30, 2024
22 checks passed
@tychoish tychoish deleted the tycho/json-support branch January 30, 2024 23:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants