Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion datafusion/core/src/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,35 @@ impl ValuesExec {
Ok(Self { schema, data })
}

/// Create a new plan using the provided schema and batches.
///
/// Errors if any of the batches don't match the provided schema, or if no
/// batches are provided.
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}

for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {schema}, got: {batch_schema}"
);
}
}

Ok(ValuesExec {
schema,
data: batches,
})
}

/// provides the data
fn data(&self) -> Vec<RecordBatch> {
pub fn data(&self) -> Vec<RecordBatch> {
self.data.clone()
}
}
Expand Down Expand Up @@ -168,7 +195,10 @@ impl ExecutionPlan for ValuesExec {
#[cfg(test)]
mod tests {
use super::*;

use crate::test::create_vec_batches;
use crate::test_util;
use arrow_schema::{DataType, Field, Schema};

#[tokio::test]
async fn values_empty_case() -> Result<()> {
Expand All @@ -177,4 +207,41 @@ mod tests {
assert!(empty.is_err());
Ok(())
}

#[test]
fn new_exec_with_batches() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let batches = create_vec_batches(&schema, 10);
let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap();
}

#[test]
fn new_exec_with_batches_empty() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err();
}

#[test]
fn new_exec_with_batches_invalid_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let batches = create_vec_batches(&schema, 10);

let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err();
}
}