Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [Scan Operator] Add Python I/O support (+ JSON) to MicroPartition reads #1578

Merged
merged 2 commits into from
Nov 8, 2023

Conversation

clarkzinzow
Copy link
Contributor

This PR adds support for the Python I/O layer to MicroPartition reads, which thereby adds support for reading MicroPartitions from JSON files with the scan operator path.

As a driveby, this PR also fixes a column ordering bug when out-of-order column projections are provided to our native CSV reader.

This PR is stacked on top of #1559

@github-actions github-actions bot added the enhancement New feature or request label Nov 7, 2023
fields = include_columns
.into_iter()
.map(|col| field_map[col].clone())
.collect::<Vec<_>>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia Here was the include_columns column ordering bug; we were pruning columns while maintaining the original field ordering instead of using the new field ordering indicated by the order of include_columns.

)
.context(DaftCoreComputeSnafu)?,
)),
}?;
Copy link
Contributor Author

@clarkzinzow clarkzinzow Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia I moved the field pruning on the schema after the read, since I believe that all of the I/O layers expect the schema to give the full (pre-projection) schema of the file, rather than the pruned schema. I at least know that this is the case for the CSV reader.

This also matches the behavior of the non-scan operator + micropartition path.

Copy link
Contributor

@jaychia jaychia Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that all of the I/O layers expect the schema to give the full (pre-projection) schema of the file

I'm not sure if we can make that strong of an assumption actually, since this might not hold true in two scenarios:

Scenario 1

User-provided schema hints:

daft.read_csv("files/**.csv", schema_hints={"foo": daft.DataType.int64()})

I think if a user ran the above, we would expect it to work on all the CSV files with the assumption that each CSV file could have any number of columns, but they should all at least have one column called "foo" that can be casted to int64.

Scenario 2

Assymetric files/schema inference

daft.read_csv(["files/1.csv", "files/2.csv"])

We perform schema inference on "files/1.csv", and then will be "projecting" that schema on the read of "files/2.csv"

Solution

The current way we work around this in both our Python Parquet/CSV/JSON reads and in our native Parquet read is:

  1. The read perform its own schema inference (and takes in options to help with this)
  2. We only apply the "schema projection" after the read, using our Table.cast_to_schema functionality

For native CSV reads, we currently pass in a hardcoded None so that it is consistent with this behavior, but we should chat about whether that is the right course of action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion:

  1. We should let read_csv do its thing and not assume anything about schema ordering
  2. Schema/column ordering is enforced later on using .cast_to_schema
  3. read_csv doesn't take a schema as an argument, and will perform its own schema inference

Copy link

codecov bot commented Nov 7, 2023

Codecov Report

Merging #1578 (ed3712f) into main (e176f2e) will not change coverage.
Report is 2 commits behind head on main.
The diff coverage is n/a.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #1578   +/-   ##
=======================================
  Coverage   84.85%   84.85%           
=======================================
  Files          54       54           
  Lines        5165     5165           
=======================================
  Hits         4383     4383           
  Misses        782      782           

src/daft-micropartition/src/lib.rs Show resolved Hide resolved
None,
None, // column_names.clone(), NOTE: `read_csv` seems to be buggy when provided with out-of-order column_names
col_names.clone(),
column_names.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should assign let include_columns = column_names.clone(); to help readability here

Also I wonder what the expected (and current!) behavior for ordering of columns is when we provide: schema, column_names and include_columns... Man I hate CSVs.

I feel like it should be:

  1. include_columns takes precedence over all arguments for determining column ordering
  2. Provided schema (currently None though) is the fallback in terms of determining column ordering.
  3. Otherwise, the fallback is to use the ordering as presented in the CSV file itself.
  4. col_names does not affect ordering at all and only helps for the case where the CSV doesn't have headers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion: read_csv will not guarantee any ordering of columns being returned, the caller needs to perform its own re-ordering using cast_to_schema if it wants any guarantees.

)
.context(DaftCoreComputeSnafu)?,
)),
}?;

let casted_table_values = table_values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current code would run cast_to_schema twice for the Python storage configs!

  1. Once during the table_io.read_* code
  2. Once again here, after we have the tables

And also I think based on the way our code is written right now, (1) will run it on the pre-pruned schema, and (2) will run it on the pruned schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline: this should be fine for now, since it doesn't affect correctness and will be short lived (deprecated soon)

@clarkzinzow clarkzinzow merged commit a59b946 into main Nov 8, 2023
37 checks passed
@clarkzinzow clarkzinzow deleted the clark/micropartition-py-io branch November 8, 2023 01:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants