Skip to content

Commit

Permalink
Add support for the Python I/O layer for MicroPartition reading.
Browse files Browse the repository at this point in the history
  • Loading branch information
clarkzinzow committed Nov 7, 2023
1 parent 2d3579b commit 91f6829
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 29 deletions.
16 changes: 9 additions & 7 deletions src/daft-csv/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
sync::Arc,
};
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};

use arrow2::{
datatypes::Field,
Expand Down Expand Up @@ -283,8 +279,14 @@ where
.await?;
// Truncate fields to only contain projected columns.
if let Some(include_columns) = include_columns {
let include_columns: HashSet<&str> = include_columns.into_iter().collect();
fields.retain(|f| include_columns.contains(f.name.as_str()))
let field_map = fields
.into_iter()
.map(|field| (field.name.clone(), field))
.collect::<HashMap<String, Field>>();
fields = include_columns
.into_iter()
.map(|col| field_map[col].clone())
.collect::<Vec<_>>();
}
// Concatenate column chunks and convert into Daft Series.
// Note that this concatenation is done in parallel on the rayon threadpool.
Expand Down
5 changes: 4 additions & 1 deletion src/daft-micropartition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ mod ops;
#[cfg(feature = "python")]
pub mod python;
#[cfg(feature = "python")]
use pyo3::PyErr;
#[cfg(feature = "python")]
pub use python::register_modules;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("DaftCoreComputeError: {}", source))]
DaftCoreCompute { source: DaftError },

#[cfg(feature = "python")]
PyIO { source: PyErr },
#[snafu(display("Duplicate name found when evaluating expressions: {}", name))]
DuplicatedField { name: String },
#[snafu(display(
Expand Down
106 changes: 89 additions & 17 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use daft_table::Table;
use snafu::ResultExt;

use crate::DaftCoreComputeSnafu;
#[cfg(feature = "python")]
use crate::PyIOSnafu;

use daft_io::{IOConfig, IOStatsRef};
use daft_stats::TableMetadata;
Expand Down Expand Up @@ -84,20 +86,6 @@ fn materialize_scan_task(
// Schema to cast resultant tables into, ensuring that all Tables have the same schema.
// Note that we need to apply column pruning here if specified by the ScanTask
let cast_to_schema = cast_to_schema.unwrap_or_else(|| scan_task.schema.clone());
let cast_to_schema = match &column_names {
None => Ok(cast_to_schema),
Some(column_names) => Ok(Arc::new(
Schema::new(
cast_to_schema
.names()
.iter()
.filter(|name| column_names.contains(&name.as_str()))
.map(|name| cast_to_schema.get_field(name).unwrap().clone())
.collect(),
)
.context(DaftCoreComputeSnafu)?,
)),
}?;

let table_values = match scan_task.storage_config.as_ref() {
StorageConfig::Native(native_storage_config) => {
Expand Down Expand Up @@ -146,11 +134,22 @@ fn materialize_scan_task(
// Native CSV Reads
// ****************
FileFormatConfig::Csv(cfg @ CsvSourceConfig { .. }) => {
let col_names = if !cfg.has_headers {
Some(
cast_to_schema
.fields
.values()
.map(|f| f.name.as_str())
.collect::<Vec<_>>(),
)
} else {
None
};
urls.map(|url| {
daft_csv::read::read_csv(
url,
None,
None, // column_names.clone(), NOTE: `read_csv` seems to be buggy when provided with out-of-order column_names
col_names.clone(),
column_names.clone(),
scan_task.limit,
cfg.has_headers,
Some(cfg.delimiter.as_bytes()[0]),
Expand Down Expand Up @@ -178,9 +177,82 @@ fn materialize_scan_task(
}
#[cfg(feature = "python")]
StorageConfig::Python(_) => {
todo!("TODO: Implement Python I/O backend for MicroPartitions.")
use pyo3::Python;
match scan_task.file_format_config.as_ref() {
FileFormatConfig::Parquet(ParquetSourceConfig {
coerce_int96_timestamp_unit,
..
}) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_parquet_into_py_table(
py,
url,
cast_to_schema.clone().into(),
(*coerce_int96_timestamp_unit).into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
FileFormatConfig::Csv(CsvSourceConfig {
has_headers,
delimiter,
double_quote,
..
}) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_csv_into_py_table(
py,
url,
*has_headers,
delimiter,
*double_quote,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
FileFormatConfig::Json(_) => Python::with_gil(|py| {
urls.map(|url| {
crate::python::read_json_into_py_table(
py,
url,
cast_to_schema.clone().into(),
scan_task.storage_config.clone().into(),
scan_task.columns.as_ref().map(|cols| cols.as_ref().clone()),
scan_task.limit,
)
.map(|t| t.into())
.context(PyIOSnafu)
})
.collect::<crate::Result<Vec<_>>>()
})?,
}
}
};
let cast_to_schema = match &column_names {
None => Ok(cast_to_schema),
Some(column_names) => Ok(Arc::new(
Schema::new(
cast_to_schema
.names()
.iter()
.filter(|name| column_names.contains(&name.as_str()))
.map(|name| cast_to_schema.get_field(name).unwrap().clone())
.collect(),
)
.context(DaftCoreComputeSnafu)?,
)),
}?;

let casted_table_values = table_values
.iter()
Expand Down
120 changes: 119 additions & 1 deletion src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use daft_core::{
use daft_dsl::python::PyExpr;
use daft_io::{python::IOConfig, IOStatsContext};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_scan::{python::pylib::PyScanTask, ScanTask};
use daft_scan::{python::pylib::PyScanTask, storage_config::PyStorageConfig, ScanTask};
use daft_stats::TableStatistics;
use daft_table::python::PyTable;
use pyo3::{exceptions::PyValueError, prelude::*, types::PyBytes, Python};
Expand Down Expand Up @@ -330,6 +330,31 @@ impl PyMicroPartition {
})
}

#[staticmethod]
pub fn read_json(
py: Python,
uri: &str,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<Self> {
let py_table = read_json_into_py_table(
py,
uri,
schema.clone(),
storage_config,
include_columns,
num_rows,
)?;
let mp = crate::micropartition::MicroPartition::new_loaded(
schema.into(),
Arc::new(vec![py_table.into()]),
None,
);
Ok(mp.into())
}

#[allow(clippy::too_many_arguments)]
#[staticmethod]
pub fn read_csv(
Expand Down Expand Up @@ -552,6 +577,99 @@ impl PyMicroPartition {
}
}

pub(crate) fn read_json_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_json"))?
.call1((uri, py_schema, storage_config, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_csv_into_py_table(
py: Python,
uri: &str,
has_header: bool,
delimiter: &str,
double_quote: bool,
schema: PySchema,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let header_idx = if has_header { Some(0) } else { None };
let parse_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableParseCSVOptions"))?
.call1((delimiter, header_idx, double_quote))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_csv"))?
.call1((uri, py_schema, storage_config, parse_options, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn read_parquet_into_py_table(
py: Python,
uri: &str,
schema: PySchema,
coerce_int96_timestamp_unit: PyTimeUnit,
storage_config: PyStorageConfig,
include_columns: Option<Vec<String>>,
num_rows: Option<usize>,
) -> PyResult<PyTable> {
let py_schema = py
.import(pyo3::intern!(py, "daft.logical.schema"))?
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((schema,))?;
let read_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableReadOptions"))?
.call1((num_rows, include_columns))?;
let parse_options = py
.import(pyo3::intern!(py, "daft.runners.partitioning"))?
.getattr(pyo3::intern!(py, "TableParseParquetOptions"))?
.call1((coerce_int96_timestamp_unit,))?;
py.import(pyo3::intern!(py, "daft.table.table_io"))?
.getattr(pyo3::intern!(py, "read_parquet"))?
.call1((uri, py_schema, storage_config, parse_options, read_options))?
.getattr(pyo3::intern!(py, "to_table"))?
.call0()?
.getattr(pyo3::intern!(py, "_table"))?
.extract()
}

impl From<MicroPartition> for PyMicroPartition {
fn from(value: MicroPartition) -> Self {
PyMicroPartition {
Expand Down
20 changes: 19 additions & 1 deletion src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use common_error::DaftResult;
use daft_core::schema::SchemaRef;
use daft_io::{get_io_client, get_runtime, parse_url, IOClient, IOStatsContext};
use daft_parquet::read::ParquetSchemaInferenceOptions;
#[cfg(feature = "python")]
use {crate::PyIOSnafu, daft_core::schema::Schema, pyo3::Python, snafu::ResultExt};

use crate::{
file_format::{CsvSourceConfig, FileFormatConfig, JsonSourceConfig, ParquetSourceConfig},
Expand Down Expand Up @@ -120,7 +122,23 @@ impl GlobScanOperator {
FileFormatConfig::Json(JsonSourceConfig {}) => {
// NOTE: Native JSON reads not yet implemented, so we have to delegate to Python here or implement
// a daft_json crate that gives us native JSON schema inference
todo!("Implement schema inference from JSON in GlobScanOperator");
match storage_config.as_ref() {
StorageConfig::Native(_) => todo!(
"Implement native JSON schema inference in a daft_json crate."
),
#[cfg(feature = "python")]
StorageConfig::Python(_) => Python::with_gil(|py| {
crate::python::pylib::read_json_schema(
py,
first_filepath,
storage_config.clone().into(),
)
.and_then(|s| {
Ok(Schema::new(s.schema.fields.values().cloned().collect())?)
})
.context(PyIOSnafu)
})?,
}
}
};
Arc::new(inferred_schema)
Expand Down
Loading

0 comments on commit 91f6829

Please sign in to comment.