Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [Scan Operator] Add Python I/O support (+ JSON) to MicroPartition reads #1578

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
sync::Arc,
};
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use arrow2::{
datatypes::Field,
Expand Down Expand Up @@ -283,8 +279,14 @@ where
.await?;
// Truncate fields to only contain projected columns.
if let Some(include_columns) = include_columns {
let include_columns: HashSet<&str> = include_columns.into_iter().collect();
fields.retain(|f| include_columns.contains(f.name.as_str()))
let field_map = fields
.into_iter()
.map(|field| (field.name.clone(), field))
.collect::<HashMap<String, Field>>();
fields = include_columns
.into_iter()
.map(|col| field_map[col].clone())
.collect::<Vec<_>>();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia Here was the include_columns column ordering bug; we were pruning columns while maintaining the original field ordering instead of using the new field ordering indicated by the order of include_columns.

// Concatenate column chunks and convert into Daft Series.
// Note that this concatenation is done in parallel on the rayon threadpool.
Expand Down
7 changes: 7 additions & 0 deletions src/daft-micropartition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ mod ops;
#[cfg(feature = "python")]
pub mod python;
#[cfg(feature = "python")]
use pyo3::PyErr;
#[cfg(feature = "python")]
pub use python::register_modules;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("DaftCoreComputeError: {}", source))]
DaftCoreCompute { source: DaftError },

#[cfg(feature = "python")]
#[snafu(display("PyIOError: {}", source))]
PyIO { source: PyErr },
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved

#[snafu(display("Duplicate name found when evaluating expressions: {}", name))]
DuplicatedField { name: String },

#[snafu(display(
"Field: {} not found in Parquet File: Available Fields: {:?}",
field,
Expand Down
106 changes: 89 additions & 17 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use daft_table::Table;
use snafu::ResultExt;

use crate::DaftCoreComputeSnafu;
#[cfg(feature = "python")]
use crate::PyIOSnafu;

use daft_io::{IOConfig, IOStatsRef};
use daft_stats::TableMetadata;
Expand Down Expand Up @@ -84,20 +86,6 @@ fn materialize_scan_task(
// Schema to cast resultant tables into, ensuring that all Tables have the same schema.
// Note that we need to apply column pruning here if specified by the ScanTask
let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.schema.clone());
let cast_to_schema = match &column_names {
None => Ok(cast_to_schema),
Some(column_names) => Ok(Arc::new(
Schema::new(
cast_to_schema
.names()
.iter()
.filter(|name| column_names.contains(&name.as_str()))
.map(|name| cast_to_schema.get_field(name).unwrap().clone())
.collect(),
)
.context(DaftCoreComputeSnafu)?,
)),
}?;

let table_values = match scan_task.storage_config.as_ref() {
StorageConfig::Native(native_storage_config) => {
Expand Down Expand Up @@ -146,11 +134,22 @@ fn materialize_scan_task(
// Native CSV Reads
// ****************
FileFormatConfig::Csv(cfg @ CsvSourceConfig { .. }) => {
let col_names = if !cfg.has_headers {
Some(
cast_to_schema
.fields
.values()
.map(|f| f.name.as_str())
.collect::<Vec<_>>(),
)
} else {
None
};
urls.map(|url| {
daft_csv::read::read_csv(
url,
None,
None, // column_names.clone(), NOTE: `read_csv` seems to be buggy when provided with out-of-order column_names
col_names.clone(),
column_names.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should assign let include_columns = column_names.clone(); to help readability here

Also I wonder what the expected (and current!) behavior for ordering of columns is when we provide: schema, column_names and include_columns... Man I hate CSVs.

I feel like it should be:

  1. include_columns takes precedence over all arguments for determining column ordering
  2. Provided schema (currently None though) is the fallback in terms of determining column ordering.
  3. Otherwise, the fallback is to use the ordering as presented in the CSV file itself.
  4. col_names does not affect ordering at all and only helps for the case where the CSV doesn't have headers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion: read_csv will not guarantee any ordering of columns being returned, the caller needs to perform its own re-ordering using cast_to_schema if it wants any guarantees.

scan_task.limit,
cfg.has_headers,
Some(cfg.delimiter.as_bytes()[0]),
Expand Down Expand Up @@ -178,9 +177,82 @@ fn materialize_scan_task(
}
#[cfg(feature = "python")]
StorageConfig::Python(_) => {
todo!("TODO: Implement Python I/O backend for MicroPartitions.")
use pyo3::Python;
match scan_task.file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
..
}) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_parquet_into_py_table(
py,
url,
cast_to_schema.clone().into(),
(*coerce_int96_timestamp_unit).into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
FileFormatConfig::Csv(CsvSourceConfig {
has_headers,
delimiter,
double_quote,
..
}) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_csv_into_py_table(
py,
url,
*has_headers,
delimiter,
*double_quote,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
FileFormatConfig::Json(_) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_json_into_py_table(
py,
url,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
}
}
};
let cast_to_schema = match &column_names {
None => Ok(cast_to_schema),
Some(column_names) => Ok(Arc::new(
Schema::new(
cast_to_schema
.names()
.iter()
.filter(|name| column_names.contains(&name.as_str()))
.map(|name| cast_to_schema.get_field(name).unwrap().clone())
.collect(),
)
.context(DaftCoreComputeSnafu)?,
)),
}?;
Copy link
Contributor Author

@clarkzinzow clarkzinzow Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia I moved the field pruning on the schema after the read, since I believe that all of the I/O layers expect the schema to give the full (pre-projection) schema of the file, rather than the pruned schema. I at least know that this is the case for the CSV reader.

This also matches the behavior of the non-scan operator + micropartition path.

Copy link
Contributor

@jaychia jaychia Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that all of the I/O layers expect the schema to give the full (pre-projection) schema of the file

I'm not sure if we can make that strong of an assumption actually, since this might not hold true in two scenarios:

Scenario 1

User-provided schema hints:

daft.read_csv("files/**.csv", schema_hints={"foo": daft.DataType.int64()})

I think if a user ran the above, we would expect it to work on all the CSV files with the assumption that each CSV file could have any number of columns, but they should all at least have one column called "foo" that can be casted to int64.

Scenario 2

Assymetric files/schema inference

daft.read_csv(["files/1.csv", "files/2.csv"])

We perform schema inference on "files/1.csv", and then will be "projecting" that schema on the read of "files/2.csv"

Solution

The current way we work around this in both our Python Parquet/CSV/JSON reads and in our native Parquet read is:

  1. The read perform its own schema inference (and takes in options to help with this)
  2. We only apply the "schema projection" after the read, using our Table.cast_to_schema functionality

For native CSV reads, we currently pass in a hardcoded None so that it is consistent with this behavior, but we should chat about whether that is the right course of action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion:

  1. We should let read_csv do its thing and not assume anything about schema ordering
  2. Schema/column ordering is enforced later on using .cast_to_schema
  3. read_csv doesn't take a schema as an argument, and will perform its own schema inference


let casted_table_values = table_values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current code would run cast_to_schema twice for the Python storage configs!

  1. Once during the table_io.read_* code
  2. Once again here, after we have the tables

And also I think based on the way our code is written right now, (1) will run it on the pre-pruned schema, and (2) will run it on the pruned schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synced offline: this should be fine for now, since it doesn't affect correctness and will be short lived (deprecated soon)

.iter()
Expand Down
120 changes: 119 additions & 1 deletion src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use daft_core::{
use daft_dsl::python::PyExpr;
use daft_io::{python::IOConfig, IOStatsContext};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_scan::{python::pylib::PyScanTask, ScanTask};
use daft_scan::{python::pylib::PyScanTask, storage_config::PyStorageConfig, ScanTask};
use daft_stats::TableStatistics;
use daft_table::python::PyTable;
use pyo3::{exceptions::PyValueError, prelude::*, types::PyBytes, Python};
Expand Down Expand Up @@ -332,6 +332,31 @@ impl PyMicroPartition {
})
}

#[staticmethod]
pub fn read_json(
py: Python,
uri: &str,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<Self> {
let py_table = read_json_into_py_table(
py,
uri,
schema.clone(),
storage_config,
include_columns,
num_rows,
)?;
let mp = crate::micropartition::MicroPartition::new_loaded(
schema.into(),
Arc::new(vec![py_table.into()]),
None,
);
Ok(mp.into())
}

#[allow(clippy::too_many_arguments)]
#[staticmethod]
pub fn read_csv(
Expand Down Expand Up @@ -554,6 +579,99 @@ impl PyMicroPartition {
}
}

pub(crate) fn read_json_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_json"))?
.call1((uri, py_schema, storage_config, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_csv_into_py_table(
py: Python,
uri: &str,
has_header: bool,
delimiter: &str,
double_quote: bool,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let header_idx = if has_header { Some(0) } else { None };
let parse_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableParseCSVOptions"))?
.call1((delimiter, header_idx, double_quote))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_csv"))?
.call1((uri, py_schema, storage_config, parse_options, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_parquet_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
coerce_int96_timestamp_unit: PyTimeUnit,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let parse_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableParseParquetOptions"))?
.call1((coerce_int96_timestamp_unit,))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_parquet"))?
.call1((uri, py_schema, storage_config, parse_options, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

impl From<MicroPartition> for PyMicroPartition {
fn from(value: MicroPartition) -> Self {
PyMicroPartition {
Expand Down
20 changes: 19 additions & 1 deletion src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use common_error::DaftResult;
use daft_core::schema::SchemaRef;
use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext};
use daft_parquet::read::ParquetSchemaInferenceOptions;
#[cfg(feature = "python")]
use {crate::PyIOSnafu, daft_core::schema::Schema, pyo3::Python, snafu::ResultExt};

use crate::{
file_format::{CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig},
Expand Down Expand Up @@ -120,7 +122,23 @@ impl GlobScanOperator {
FileFormatConfig::Json(JsonSourceConfig {}) => {
// NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement
// a daft_json crate that gives us native JSON schema inference
todo!("Implement schema inference from JSON in GlobScanOperator");
match storage_config.as_ref() {
StorageConfig::Native(_) => todo!(
"Implement native JSON schema inference in a daft_json crate."
),
#[cfg(feature = "python")]
StorageConfig::Python(_) => Python::with_gil(|py| {
crate::python::pylib::read_json_schema(
py,
first_filepath,
storage_config.clone().into(),
)
.and_then(|s| {
Ok(Schema::new(s.schema.fields.values().cloned().collect())?)
})
.context(PyIOSnafu)
})?,
}
}
};
Arc::new(inferred_schema)
Expand Down
Loading
Loading