Skip to content

Conversation

@tustvold
Copy link
Contributor

@tustvold tustvold commented Jul 17, 2022

Which issue does this PR close?

Closes #2935
Closes #2930

Rationale for this change

See ticket

What changes are included in this PR?

It aligns the chunks received from object storage to record boundaries, and then feeds these through the decoders

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Jul 17, 2022
@codecov-commenter
Copy link

Codecov Report

Merging #2936 (ce4f59e) into master (b5537e7) will increase coverage by 0.02%.
The diff coverage is 80.18%.

@@            Coverage Diff             @@
##           master    #2936      +/-   ##
==========================================
+ Coverage   85.30%   85.33%   +0.02%     
==========================================
  Files         273      274       +1     
  Lines       49269    49450     +181     
==========================================
+ Hits        42029    42198     +169     
- Misses       7240     7252      +12     
Impacted Files Coverage Δ
datafusion/core/src/datasource/listing/helpers.rs 94.96% <ø> (ø)
...tafusion/core/src/physical_plan/file_format/csv.rs 91.75% <0.00%> (-0.48%) ⬇️
...afusion/core/src/physical_plan/file_format/json.rs 89.07% <0.00%> (-2.00%) ⬇️
...tafusion/core/src/physical_plan/file_format/mod.rs 97.36% <ø> (ø)
.../src/physical_plan/file_format/delimited_stream.rs 90.42% <90.42%> (ø)
datafusion/common/src/pyarrow.rs 0.00% <0.00%> (ø)
datafusion/proto/src/bytes/mod.rs 82.75% <0.00%> (ø)
...usion/core/src/avro_to_arrow/arrow_array_reader.rs 0.00% <0.00%> (ø)
datafusion/core/tests/sql/aggregates.rs 99.28% <0.00%> (+0.01%) ⬆️
datafusion/expr/src/aggregate_function.rs 92.25% <0.00%> (+0.02%) ⬆️
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update b5537e7...ce4f59e. Read the comment docs.

@alamb alamb changed the title Add streaming JSON and CSV (#2935) Add streaming JSON and CSV reading (#2935) Jul 18, 2022
@alamb alamb changed the title Add streaming JSON and CSV reading (#2935) Add streaming JSON and CSV reading, `NewlineDelimitedStream' (#2935) Jul 18, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool @tustvold -- thank you!

I wonder if we can write an end to end tests for this (aka a query from a CSV file or something) 🤔

cc @sitano

use std::collections::VecDeque;

/// The ASCII encoding of `"`
const QUOTE: u8 = 34;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the b constant syntax would make this easier to validate (and the same below)

Suggested change
const QUOTE: u8 = 34;
const QUOTE: u8 = b'"';


let is_escape = &mut self.is_escape;
let is_quote = &mut self.is_quote;
let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documenting the escaping rules that LineDelimiter assumes might be good (like \ style escapes with " quotes)

Some(idx) => {
self.remainder.extend_from_slice(&val[0..idx]);
self.complete
.push_back(Bytes::from(std::mem::take(&mut self.remainder)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If remainder was a bytes this would probably be cleaner (though the clause below to handle no records in the chunk would be more complicated); However, I suspect the "next Bytes actually has more than one record ending is the more common case

}

/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each
/// yielded [`Bytes`] contains a whole number of new line delimited records
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

use futures::stream::TryStreamExt;

#[test]
fn test_delimiter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case where some push doesn't have a delimiter covered? I couldn't find it

assert_eq!(delimiter.next().unwrap(), Bytes::from("\n"));
assert!(delimiter.next().is_none());

delimiter.push("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making a new test here as a way to make the tests more self documenting.

Unless it is important that data can be pushed into a LineDelimiter after finish() is called 🤔

Suggested change
delimiter.push("");
#[test]
fn test_delimiter_escaped() {
delimiter.push("");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is important to test that more data can be added to a LineDelimiter after data has been pulled from it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 that would be good to document (the intent of the test).

I think the main suggestion of break up the test into smaller self contained blocks with descriptive names still holds even if this particular cut-off point would not be ideal.

The total test size will be larger, but I think each test will be easier to understand what it is testing.

maybe worth a thought

/// Complete chunks of [`Bytes`]
complete: VecDeque<Bytes>,
/// Remainder bytes that form the next record
remainder: Vec<u8>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you could use something like

Suggested change
remainder: Vec<u8>,
remainder: Bytes,

As bytes has a slice method https://docs.rs/bytes/1.1.0/bytes/struct.Bytes.html#method.slice 🤔

Which might reduce some copies 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the copy is unavoidable as the nature of the remainder, is you need to take data from two separate Bytes. It should only be a single "line" though, and so should be relatively minor from a performance standpoint

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@tustvold tustvold marked this pull request as draft July 18, 2022 15:48
@tustvold
Copy link
Contributor Author

Marking as draft whilst I work on some tests

@tustvold tustvold marked this pull request as ready for review July 18, 2022 16:46
assert_eq!(size, expected);
remaining -= expected;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend also assert_eq!(remaining, 0) at the end of the test to ensure nothing is lost

match store.get(&file.location).await? {
GetResult::File(file, _) => {
Ok(futures::stream::iter(config.open(file)).boxed())
Ok(futures::stream::iter(config.open(file, true)).boxed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is first-chunk a bug fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup 😄 Tests FTW

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳 🦜

ctx.runtime_env().register_object_store(
"file",
"",
Arc::new(ChunkedStore::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice 👌

@tustvold tustvold merged commit 944ef3d into apache:master Jul 18, 2022
@ursabot
Copy link

ursabot commented Jul 18, 2022

Benchmark runs are scheduled for baseline = b772c6d and contender = 944ef3d. 944ef3d is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming CSV/JSON Object Store Read Support CSV Limit Pushdown to Object Storage

4 participants