diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 3def89f78501..63f8c405308e 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -25,7 +25,7 @@ use arrow::{ }; use crate::PhysicalExpr; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; /// Represents the column at a given index in a RecordBatch @@ -74,21 +74,84 @@ impl PhysicalExpr for Column { /// Get the data type of this expression, given the schema of the input fn data_type(&self, input_schema: &Schema) -> Result { + self.bounds_check(input_schema)?; Ok(input_schema.field(self.index).data_type().clone()) } /// Decide whehter this expression is nullable, given the schema of the input fn nullable(&self, input_schema: &Schema) -> Result { + self.bounds_check(input_schema)?; Ok(input_schema.field(self.index).is_nullable()) } /// Evaluate the expression fn evaluate(&self, batch: &RecordBatch) -> Result { + self.bounds_check(batch.schema().as_ref())?; Ok(ColumnarValue::Array(batch.column(self.index).clone())) } } +impl Column { + fn bounds_check(&self, input_schema: &Schema) -> Result<()> { + if self.index < input_schema.fields.len() { + Ok(()) + } else { + Err(DataFusionError::Internal(format!( + "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}", + self.name, + self.index, input_schema.fields.len(), input_schema.fields().iter().map(|f| f.name().clone()).collect::>()))) + } + } +} + /// Create a column expression pub fn col(name: &str, schema: &Schema) -> Result> { Ok(Arc::new(Column::new_with_schema(name, schema)?)) } + +#[cfg(test)] +mod test { + use crate::expressions::Column; + use crate::PhysicalExpr; + use arrow::array::StringArray; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_common::Result; + use std::sync::Arc; + + #[test] + fn out_of_bounds_data_type() { + let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]); + let col = Column::new("id", 9); + let error = col.data_type(&schema).expect_err("error"); + assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \ + but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \ + DataFusion's code and we would welcome that you file an bug report in our issue tracker", + &format!("{}", error)) + } + + #[test] + fn out_of_bounds_nullable() { + let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]); + let col = Column::new("id", 9); + let error = col.nullable(&schema).expect_err("error"); + assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \ + but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \ + DataFusion's code and we would welcome that you file an bug report in our issue tracker", + &format!("{}", error)) + } + + #[test] + fn out_of_bounds_evaluate() -> Result<()> { + let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]); + let data: StringArray = vec!["data"].into(); + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?; + let col = Column::new("id", 9); + let error = col.evaluate(&batch).expect_err("error"); + assert_eq!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \ + but input schema only has 1 columns: [\"foo\"]. This was likely caused by a bug in \ + DataFusion's code and we would welcome that you file an bug report in our issue tracker", + &format!("{}", error)); + Ok(()) + } +}