From 0c5a6101861a1fc4e1d66bebe0014d2ea168b581 Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Tue, 29 Aug 2023 14:42:37 -0500 Subject: [PATCH] feat: Allow creating a ValuesExec from record batches --- datafusion/core/src/physical_plan/values.rs | 69 ++++++++++++++++++++- 1 file changed, 68 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index ff5e71969027..539a88a9d50e 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -88,8 +88,35 @@ impl ValuesExec { Ok(Self { schema, data }) } + /// Create a new plan using the provided schema and batches. + /// + /// Errors if any of the batches don't match the provided schema, or if no + /// batches are provided. + pub fn try_new_from_batches( + schema: SchemaRef, + batches: Vec, + ) -> Result { + if batches.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + for batch in &batches { + let batch_schema = batch.schema(); + if batch_schema != schema { + return plan_err!( + "Batch has invalid schema. Expected: {schema}, got: {batch_schema}" + ); + } + } + + Ok(ValuesExec { + schema, + data: batches, + }) + } + /// provides the data - fn data(&self) -> Vec { + pub fn data(&self) -> Vec { self.data.clone() } } @@ -168,7 +195,10 @@ impl ExecutionPlan for ValuesExec { #[cfg(test)] mod tests { use super::*; + + use crate::test::create_vec_batches; use crate::test_util; + use arrow_schema::{DataType, Field, Schema}; #[tokio::test] async fn values_empty_case() -> Result<()> { @@ -177,4 +207,41 @@ mod tests { assert!(empty.is_err()); Ok(()) } + + #[test] + fn new_exec_with_batches() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let batches = create_vec_batches(&schema, 10); + let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap(); + } + + #[test] + fn new_exec_with_batches_empty() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); + } + + #[test] + fn new_exec_with_batches_invalid_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + let batches = create_vec_batches(&schema, 10); + + let invalid_schema = Arc::new(Schema::new(vec![ + Field::new("col0", DataType::UInt32, false), + Field::new("col1", DataType::Utf8, false), + ])); + let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); + } }