Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFusion 13.0.0 #61

Merged
merged 9 commits into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
662 changes: 437 additions & 225 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ default = ["mimalloc"]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.7"
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
datafusion = { version = "^12.0.0", features = ["pyarrow", "avro"] }
datafusion-expr = { version = "^12.0.0" }
datafusion-common = { version = "^12.0.0", features = ["pyarrow"] }
datafusion = { version = "^13.0.0", features = ["pyarrow", "avro"] }
datafusion-expr = { version = "^13.0.0" }
datafusion-common = { version = "^13.0.0", features = ["pyarrow"] }
uuid = { version = "0.8", features = ["v4"] }
mimalloc = { version = "*", optional = true, default-features = false }
async-trait = "0.1"
Expand All @@ -51,4 +51,4 @@ name = "datafusion._internal"

[profile.release]
lto = true
codegen-units = 1
codegen-units = 1
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ python -m pip install -U pip
python -m pip install -r requirements-310.txt
```

The tests rely on test data in git submodules.

```bash
git submodule init
git submodule update
```

Whenever rust code changes (your changes or via `git pull`):

```bash
Expand Down
41 changes: 25 additions & 16 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
Expand Down Expand Up @@ -99,9 +100,12 @@ impl PySessionContext {
Ok(PyDataFrame::new(df))
}

fn create_dataframe(&mut self, partitions: Vec<Vec<RecordBatch>>) -> PyResult<PyDataFrame> {
let table = MemTable::try_new(partitions[0][0].schema(), partitions)
.map_err(DataFusionError::from)?;
fn create_dataframe(
&mut self,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<PyDataFrame> {
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0).map_err(DataFusionError::from)?;

// generate a random (unique) name for this table
// table name cannot start with numeric digit
Expand Down Expand Up @@ -136,10 +140,10 @@ impl PySessionContext {
fn register_record_batches(
&mut self,
name: &str,
partitions: Vec<Vec<RecordBatch>>,
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<()> {
let schema = partitions[0][0].schema();
let table = MemTable::try_new(schema, partitions)?;
let schema = partitions.0[0][0].schema();
let table = MemTable::try_new(schema, partitions.0)?;
self.ctx
.register_table(name, Arc::new(table))
.map_err(DataFusionError::from)?;
Expand Down Expand Up @@ -182,7 +186,7 @@ impl PySessionContext {
&mut self,
name: &str,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -204,7 +208,7 @@ impl PySessionContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
options.schema = schema.as_ref();
options.schema = schema.as_ref().map(|x| &x.0);

let result = self.ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?;
Expand Down Expand Up @@ -277,7 +281,7 @@ impl PySessionContext {
fn read_csv(
&self,
path: PathBuf,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
Expand All @@ -302,12 +306,17 @@ impl PySessionContext {
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension)
.table_partition_cols(table_partition_cols);
options.schema = schema.as_ref();

let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);

Ok(df)
if let Some(py_schema) = schema {
options.schema = Some(&py_schema.0);
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Ok(df)
} else {
let result = self.ctx.read_csv(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Ok(df)
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -346,14 +355,14 @@ impl PySessionContext {
fn read_avro(
&self,
path: &str,
schema: Option<Schema>,
schema: Option<PyArrowType<Schema>>,
table_partition_cols: Vec<String>,
file_extension: &str,
py: Python,
) -> PyResult<PyDataFrame> {
let mut options = AvroReadOptions::default().table_partition_cols(table_partition_cols);
options.file_extension = file_extension;
options.schema = schema.map(Arc::new);
options.schema = schema.map(|s| Arc::new(s.0));

let result = self.ctx.read_avro(path, options);
let df = PyDataFrame::new(wait_for_future(py, result).map_err(DataFusionError::from)?);
Expand Down
10 changes: 5 additions & 5 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::utils::wait_for_future;
use crate::{errors::DataFusionError, expression::PyExpr};
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
use datafusion::prelude::*;
Expand Down Expand Up @@ -65,8 +65,8 @@ impl PyDataFrame {
}

/// Returns the schema from the logical plan
fn schema(&self) -> Schema {
self.df.schema().into()
fn schema(&self) -> PyArrowType<Schema> {
PyArrowType(self.df.schema().into())
}

#[args(args = "*")]
Expand Down Expand Up @@ -144,7 +144,7 @@ impl PyDataFrame {
fn show(&self, py: Python, num: usize) -> PyResult<()> {
let df = self.df.limit(0, Some(num))?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}

/// Filter out duplicate rows
Expand Down Expand Up @@ -186,7 +186,7 @@ impl PyDataFrame {
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
let df = self.df.explain(verbose, analyze)?;
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string()))
}

/// Repartition a `DataFrame` based on a logical partitioning scheme.
Expand Down
10 changes: 9 additions & 1 deletion src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;

use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::datasource::datasource::TableProviderFilterPushDown;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
Expand Down Expand Up @@ -74,7 +75,14 @@ impl TableProvider for Dataset {
Python::with_gil(|py| {
let dataset = self.dataset.as_ref(py);
// This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
Arc::new(dataset.getattr("schema").unwrap().extract().unwrap())
Arc::new(
dataset
.getattr("schema")
.unwrap()
.extract::<PyArrowType<_>>()
.unwrap()
.0,
)
})
}

Expand Down
12 changes: 9 additions & 3 deletions src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
Expand All @@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter {
Some(
batches
.next()?
.and_then(|batch| batch.extract())
.and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
.map_err(|err| ArrowError::ExternalError(Box::new(err))),
)
})
Expand Down Expand Up @@ -109,7 +110,12 @@ impl DatasetExec {

let scanner = dataset.call_method("scanner", (), Some(kwargs))?;

let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?);
let schema = Arc::new(
scanner
.getattr("projected_schema")?
.extract::<PyArrowType<_>>()?
.0,
);

let builtins = Python::import(py, "builtins")?;
let pylist = builtins.getattr("list")?;
Expand Down Expand Up @@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec {
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
.and_then(|schema| schema.extract())
.and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: &PyIterator = scanner
Expand Down
5 changes: 3 additions & 2 deletions src/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};

use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion_expr::{col, lit, Expr};

use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -125,12 +126,12 @@ impl PyExpr {
self.expr.clone().is_null().into()
}

pub fn cast(&self, to: DataType) -> PyExpr {
pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
let expr = Expr::Cast {
expr: Box::new(self.expr.clone()),
data_type: to,
data_type: to.0,
};
expr.into()
}
Expand Down
17 changes: 9 additions & 8 deletions src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;

use pyo3::{prelude::*, types::PyTuple};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::array::{Array, ArrayRef};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion_expr::{
Expand Down Expand Up @@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator {

// 1. cast states to Pyarrow array
let state = state
.data()
.to_pyarrow(py)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;

Expand Down Expand Up @@ -120,18 +121,18 @@ impl PyAggregateUDF {
fn new(
name: &str,
accumulator: PyObject,
input_type: DataType,
return_type: DataType,
state_type: Vec<DataType>,
input_type: PyArrowType<DataType>,
return_type: PyArrowType<DataType>,
state_type: PyArrowType<Vec<DataType>>,
volatility: &str,
) -> PyResult<Self> {
let function = create_udaf(
name,
input_type,
Arc::new(return_type),
input_type.0,
Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_accumulator(accumulator),
Arc::new(state_type),
Arc::new(state_type.0),
);
Ok(Self { function })
}
Expand Down
25 changes: 12 additions & 13 deletions src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::sync::Arc;

use pyo3::{prelude::*, types::PyTuple};

use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::array::{make_array, Array, ArrayData, ArrayRef};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::udf::ScalarUDF;
Expand All @@ -46,15 +46,14 @@ fn to_rust_function(func: PyObject) -> ScalarFunctionImplementation {
let py_args = PyTuple::new(py, py_args);

// 2. call function
let value = func.as_ref(py).call(py_args, None);
let value = match value {
Ok(n) => Ok(n),
Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))),
}?;
let value = func
.as_ref(py)
.call(py_args, None)
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

// 3. cast to arrow::array::Array
let array = ArrayRef::from_pyarrow(value).unwrap();
Ok(array)
let array_data = ArrayData::from_pyarrow(value).unwrap();
Ok(make_array(array_data))
})
},
)
Expand All @@ -73,14 +72,14 @@ impl PyScalarUDF {
fn new(
name: &str,
func: PyObject,
input_types: Vec<DataType>,
return_type: DataType,
input_types: PyArrowType<Vec<DataType>>,
return_type: PyArrowType<DataType>,
volatility: &str,
) -> PyResult<Self> {
let function = create_udf(
name,
input_types,
Arc::new(return_type),
input_types.0,
Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_function(func),
);
Expand Down