Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion datafusion/physical-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::{
};

use crate::PhysicalExpr;
use datafusion_common::Result;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;

/// Represents the column at a given index in a RecordBatch
Expand Down Expand Up @@ -74,21 +74,84 @@ impl PhysicalExpr for Column {

/// Get the data type of this expression, given the schema of the input
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.bounds_check(input_schema)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the bound check into the constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. We can only do the bounds check when the input schema is available, during evaluation.

Ok(input_schema.field(self.index).data_type().clone())
}

/// Decide whehter this expression is nullable, given the schema of the input
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.bounds_check(input_schema)?;
Ok(input_schema.field(self.index).is_nullable())
}

/// Evaluate the expression
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
self.bounds_check(batch.schema().as_ref())?;
Ok(ColumnarValue::Array(batch.column(self.index).clone()))
}
}

impl Column {
fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
if self.index < input_schema.fields.len() {
Ok(())
} else {
Err(DataFusionError::Internal(format!(
"PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
self.name,
self.index, input_schema.fields.len(), input_schema.fields().iter().map(|f| f.name().clone()).collect::<Vec<String>>())))
}
}
}

/// Create a column expression
pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}

#[cfg(test)]
mod test {
use crate::expressions::Column;
use crate::PhysicalExpr;
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::sync::Arc;

#[test]
fn out_of_bounds_data_type() {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let col = Column::new("id", 9);
let error = col.data_type(&schema).expect_err("error");
assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
&format!("{}", error))
}

#[test]
fn out_of_bounds_nullable() {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let col = Column::new("id", 9);
let error = col.nullable(&schema).expect_err("error");
assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
&format!("{}", error))
}

#[test]
fn out_of_bounds_evaluate() -> Result<()> {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let data: StringArray = vec!["data"].into();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
let col = Column::new("id", 9);
let error = col.evaluate(&batch).expect_err("error");
assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
&format!("{}", error));
Ok(())
}
}