diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs index 48e93c04bb03..35062695e8ac 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs @@ -121,57 +121,53 @@ pub async fn from_read_rel( })); } + // Check for produce_one_row pattern in both old (values) and new (expressions) formats. + // A VirtualTable with exactly one row containing only empty/default fields represents + // an EmptyRelation with produce_one_row=true. This pattern is used for queries without + // a FROM clause (e.g., "SELECT 1 AS one") where a single phantom row is needed to + // provide a context for evaluating scalar expressions. This is conceptually similar to + // the SQL "DUAL" table (see: https://en.wikipedia.org/wiki/DUAL_table) which some + // databases provide as a single-row source for selecting constant expressions when no + // real table is present. + let is_produce_one_row = (vt.values.len() == 1 + && vt.expressions.is_empty() + && substrait_schema.fields().is_empty() + && vt.values[0].fields.is_empty()) + || (vt.expressions.len() == 1 + && vt.values.is_empty() + && substrait_schema.fields().is_empty() + && vt.expressions[0].fields.is_empty()); + + if is_produce_one_row { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema: DFSchemaRef::new(substrait_schema), + })); + } + let values = if !vt.expressions.is_empty() { let mut exprs = vec![]; for row in &vt.expressions { - let mut name_idx = 0; let mut row_exprs = vec![]; for expression in &row.fields { - name_idx += 1; let expr = consumer - .consume_expression(expression, &DFSchema::empty()) + .consume_expression(expression, &substrait_schema) .await?; row_exprs.push(expr); } - if name_idx != named_struct.names.len() { + // For expressions, validate against top-level schema fields, not nested names + if row_exprs.len() != substrait_schema.fields().len() { return substrait_err!( - "Names list must match exactly to nested schema, but found {} uses for {} names", - name_idx, - named_struct.names.len() + "Field count mismatch: expected {} fields but found {} in virtual table row", + substrait_schema.fields().len(), + row_exprs.len() ); } exprs.push(row_exprs); } exprs } else { - vt - .values - .iter() - .map(|row| { - let mut name_idx = 0; - let lits = row - .fields - .iter() - .map(|lit| { - name_idx += 1; // top-level names are provided through schema - Ok(Expr::Literal(from_substrait_literal( - consumer, - lit, - &named_struct.names, - &mut name_idx, - )?, None)) - }) - .collect::>()?; - if name_idx != named_struct.names.len() { - return substrait_err!( - "Names list must match exactly to nested schema, but found {} uses for {} names", - name_idx, - named_struct.names.len() - ); - } - Ok(lits) - }) - .collect::>()? + convert_literal_rows(consumer, vt, named_struct)? }; Ok(LogicalPlan::Values(Values { @@ -226,6 +222,46 @@ pub async fn from_read_rel( } } +/// Converts Substrait literal rows from a VirtualTable into DataFusion expressions. +/// +/// This function processes the deprecated `values` field of VirtualTable, converting +/// each literal value into a `Expr::Literal` while tracking and validating the name +/// indices against the provided named struct schema. +fn convert_literal_rows( + consumer: &impl SubstraitConsumer, + vt: &substrait::proto::read_rel::VirtualTable, + named_struct: &substrait::proto::NamedStruct, +) -> datafusion::common::Result>> { + #[allow(deprecated)] + vt.values + .iter() + .map(|row| { + let mut name_idx = 0; + let lits = row + .fields + .iter() + .map(|lit| { + name_idx += 1; // top-level names are provided through schema + Ok(Expr::Literal(from_substrait_literal( + consumer, + lit, + &named_struct.names, + &mut name_idx, + )?, None)) + }) + .collect::>()?; + if name_idx != named_struct.names.len() { + return substrait_err!( + "Names list must match exactly to nested schema, but found {} uses for {} names", + name_idx, + named_struct.names.len() + ); + } + Ok(lits) + }) + .collect::>() +} + pub fn apply_masking( schema: DFSchema, mask_expression: &::core::option::Option, diff --git a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs index 4b2e3782108b..60506a15ec11 100644 --- a/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs +++ b/datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs @@ -18,9 +18,10 @@ use crate::logical_plan::producer::{ to_substrait_literal, to_substrait_named_struct, SubstraitProducer, }; -use datafusion::common::{not_impl_err, substrait_datafusion_err, DFSchema, ToDFSchema}; +use datafusion::common::{substrait_datafusion_err, DFSchema, ToDFSchema}; use datafusion::logical_expr::utils::conjunction; use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values}; +use datafusion::scalar::ScalarValue; use std::sync::Arc; use substrait::proto::expression::literal::Struct; use substrait::proto::expression::mask_expression::{StructItem, StructSelect}; @@ -83,26 +84,61 @@ pub fn from_table_scan( })) } +/// Encodes an EmptyRelation as a Substrait VirtualTable. +/// +/// EmptyRelation represents a relation with no input data. When `produce_one_row` is true, +/// it generates a single row with all fields set to their default values (typically NULL). +/// This is used for queries without a FROM clause, such as "SELECT 1 AS one" or +/// "SELECT current_timestamp()". +/// +/// When `produce_one_row` is false, it represents a truly empty relation with no rows, +/// used in optimizations or as a placeholder. pub fn from_empty_relation( producer: &mut impl SubstraitProducer, e: &EmptyRelation, ) -> datafusion::common::Result> { - if e.produce_one_row { - return not_impl_err!("Producing a row from empty relation is unsupported"); - } - #[allow(deprecated)] + let base_schema = to_substrait_named_struct(producer, &e.schema)?; + + let read_type = if e.produce_one_row { + // Create one row with default scalar values for each field in the schema. + // For example, an Int32 field gets Int32(NULL), a Utf8 field gets Utf8(NULL), etc. + // This represents the "phantom row" that provides a context for evaluating + // scalar expressions in queries without a FROM clause. + let fields = e + .schema + .fields() + .iter() + .map(|f| { + let scalar = ScalarValue::try_from(f.data_type())?; + to_substrait_literal(producer, &scalar) + }) + .collect::>()?; + + ReadType::VirtualTable(VirtualTable { + // Use deprecated 'values' field instead of 'expressions' because the consumer's + // nested expression support (RelType::Nested) is not yet implemented. + // The 'values' field uses literal::Struct which the consumer can properly + // deserialize with field name preservation. + #[allow(deprecated)] + values: vec![Struct { fields }], + expressions: vec![], + }) + } else { + ReadType::VirtualTable(VirtualTable { + #[allow(deprecated)] + values: vec![], + expressions: vec![], + }) + }; Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, - base_schema: Some(to_substrait_named_struct(producer, &e.schema)?), + base_schema: Some(base_schema), filter: None, best_effort_filter: None, projection: None, advanced_extension: None, - read_type: Some(ReadType::VirtualTable(VirtualTable { - values: vec![], - expressions: vec![], - })), + read_type: Some(read_type), }))), })) } @@ -134,7 +170,6 @@ pub fn from_values( Ok(Struct { fields }) }) .collect::>()?; - #[allow(deprecated)] Ok(Box::new(Rel { rel_type: Some(RelType::Read(Box::new(ReadRel { common: None, @@ -144,6 +179,7 @@ pub fn from_values( projection: None, advanced_extension: None, read_type: Some(ReadType::VirtualTable(VirtualTable { + #[allow(deprecated)] values, expressions: vec![], })), diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f14d4cbf1fcc..d931dd58d8ef 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -32,8 +32,8 @@ use datafusion::execution::registry::SerializerRegistry; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{ - Extension, InvariantLevel, LogicalPlan, PartitionEvaluator, Repartition, - UserDefinedLogicalNode, Values, Volatility, + EmptyRelation, Extension, InvariantLevel, LogicalPlan, PartitionEvaluator, + Repartition, UserDefinedLogicalNode, Values, Volatility, }; use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST; use datafusion::prelude::*; @@ -185,6 +185,46 @@ async fn simple_select() -> Result<()> { roundtrip("SELECT a, b FROM data").await } +#[tokio::test] +async fn roundtrip_literal_without_from() -> Result<()> { + roundtrip("SELECT 1 AS one").await +} + +#[tokio::test] +async fn roundtrip_empty_relation_with_schema() -> Result<()> { + // Test produce_one_row=true with multiple typed columns + roundtrip("SELECT 1::int as a, 'hello'::text as b, 3.14::double as c").await +} + +#[tokio::test] +async fn roundtrip_empty_relation_no_rows() -> Result<()> { + // Test produce_one_row=false + let ctx = create_context().await?; + let plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: DFSchemaRef::new(DFSchema::empty()), + }); + roundtrip_logical_plan_with_ctx(plan, ctx).await?; + Ok(()) +} + +#[tokio::test] +async fn roundtrip_subquery_with_empty_relation() -> Result<()> { + // Test EmptyRelation in the context of scalar subqueries. + // The optimizer may simplify the subquery away, but we're testing that + // the EmptyRelation round-trips correctly when it appears in the plan. + let ctx = create_context().await?; + let df = ctx.sql("SELECT (SELECT 1) as nested").await?; + let plan = df.into_optimized_plan()?; + + // Just verify the round-trip succeeds and produces valid results + let proto = to_substrait_plan(&plan, &ctx.state())?; + let plan2 = from_substrait_plan(&ctx.state(), &proto).await?; + let df2 = DataFrame::new(ctx.state(), plan2); + df2.show().await?; + Ok(()) +} + #[tokio::test] async fn wildcard_select() -> Result<()> { let plan = generate_plan_from_sql("SELECT * FROM data", true, false).await?;