Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 71 additions & 35 deletions datafusion/substrait/src/logical_plan/consumer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,57 +121,53 @@ pub async fn from_read_rel(
}));
}

// Check for produce_one_row pattern in both old (values) and new (expressions) formats.
// A VirtualTable with exactly one row containing only empty/default fields represents
// an EmptyRelation with produce_one_row=true. This pattern is used for queries without
Comment on lines +125 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Oracle the "one row (dummy) table" is called the "dual table", by checking the "In other database systems" section it seems to be quite a well-known name for that concept.

I am not suggesting to necessarily adopt this name here, but maybe a reference to it in the comments could be good for people with a DB background, but not necessarily familiar with Datafusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a reference to the dual table concept - figured it might help folks coming FROM DUAL backgrounds 😉 Thanks for the nudge!

// a FROM clause (e.g., "SELECT 1 AS one") where a single phantom row is needed to
// provide a context for evaluating scalar expressions. This is conceptually similar to
// the SQL "DUAL" table (see: https://en.wikipedia.org/wiki/DUAL_table) which some
// databases provide as a single-row source for selecting constant expressions when no
// real table is present.
let is_produce_one_row = (vt.values.len() == 1
&& vt.expressions.is_empty()
&& substrait_schema.fields().is_empty()
&& vt.values[0].fields.is_empty())
|| (vt.expressions.len() == 1
&& vt.values.is_empty()
&& substrait_schema.fields().is_empty()
&& vt.expressions[0].fields.is_empty());

if is_produce_one_row {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema: DFSchemaRef::new(substrait_schema),
}));
}

let values = if !vt.expressions.is_empty() {
let mut exprs = vec![];
for row in &vt.expressions {
let mut name_idx = 0;
let mut row_exprs = vec![];
for expression in &row.fields {
name_idx += 1;
let expr = consumer
.consume_expression(expression, &DFSchema::empty())
.consume_expression(expression, &substrait_schema)
.await?;
row_exprs.push(expr);
}
if name_idx != named_struct.names.len() {
// For expressions, validate against top-level schema fields, not nested names
if row_exprs.len() != substrait_schema.fields().len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
named_struct.names.len()
"Field count mismatch: expected {} fields but found {} in virtual table row",
substrait_schema.fields().len(),
row_exprs.len()
);
}
exprs.push(row_exprs);
}
exprs
} else {
vt
.values
.iter()
.map(|row| {
let mut name_idx = 0;
let lits = row
.fields
.iter()
.map(|lit| {
name_idx += 1; // top-level names are provided through schema
Ok(Expr::Literal(from_substrait_literal(
consumer,
lit,
&named_struct.names,
&mut name_idx,
)?, None))
})
.collect::<datafusion::common::Result<_>>()?;
if name_idx != named_struct.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
named_struct.names.len()
);
}
Ok(lits)
})
.collect::<datafusion::common::Result<_>>()?
convert_literal_rows(consumer, vt, named_struct)?
};

Ok(LogicalPlan::Values(Values {
Expand Down Expand Up @@ -226,6 +222,46 @@ pub async fn from_read_rel(
}
}

/// Converts Substrait literal rows from a VirtualTable into DataFusion expressions.
///
/// This function processes the deprecated `values` field of VirtualTable, converting
/// each literal value into a `Expr::Literal` while tracking and validating the name
/// indices against the provided named struct schema.
fn convert_literal_rows(
consumer: &impl SubstraitConsumer,
vt: &substrait::proto::read_rel::VirtualTable,
named_struct: &substrait::proto::NamedStruct,
) -> datafusion::common::Result<Vec<Vec<Expr>>> {
#[allow(deprecated)]
vt.values
.iter()
.map(|row| {
let mut name_idx = 0;
let lits = row
.fields
.iter()
.map(|lit| {
name_idx += 1; // top-level names are provided through schema
Ok(Expr::Literal(from_substrait_literal(
consumer,
lit,
&named_struct.names,
&mut name_idx,
)?, None))
})
.collect::<datafusion::common::Result<_>>()?;
if name_idx != named_struct.names.len() {
return substrait_err!(
"Names list must match exactly to nested schema, but found {} uses for {} names",
name_idx,
named_struct.names.len()
);
}
Ok(lits)
})
.collect::<datafusion::common::Result<_>>()
}

pub fn apply_masking(
schema: DFSchema,
mask_expression: &::core::option::Option<MaskExpression>,
Expand Down
58 changes: 47 additions & 11 deletions datafusion/substrait/src/logical_plan/producer/rel/read_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
use crate::logical_plan::producer::{
to_substrait_literal, to_substrait_named_struct, SubstraitProducer,
};
use datafusion::common::{not_impl_err, substrait_datafusion_err, DFSchema, ToDFSchema};
use datafusion::common::{substrait_datafusion_err, DFSchema, ToDFSchema};
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{EmptyRelation, Expr, TableScan, Values};
use datafusion::scalar::ScalarValue;
use std::sync::Arc;
use substrait::proto::expression::literal::Struct;
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
Expand Down Expand Up @@ -83,26 +84,61 @@ pub fn from_table_scan(
}))
}

/// Encodes an EmptyRelation as a Substrait VirtualTable.
///
/// EmptyRelation represents a relation with no input data. When `produce_one_row` is true,
/// it generates a single row with all fields set to their default values (typically NULL).
/// This is used for queries without a FROM clause, such as "SELECT 1 AS one" or
/// "SELECT current_timestamp()".
///
/// When `produce_one_row` is false, it represents a truly empty relation with no rows,
/// used in optimizations or as a placeholder.
pub fn from_empty_relation(
producer: &mut impl SubstraitProducer,
e: &EmptyRelation,
) -> datafusion::common::Result<Box<Rel>> {
if e.produce_one_row {
return not_impl_err!("Producing a row from empty relation is unsupported");
}
#[allow(deprecated)]
let base_schema = to_substrait_named_struct(producer, &e.schema)?;

let read_type = if e.produce_one_row {
// Create one row with default scalar values for each field in the schema.
// For example, an Int32 field gets Int32(NULL), a Utf8 field gets Utf8(NULL), etc.
// This represents the "phantom row" that provides a context for evaluating
// scalar expressions in queries without a FROM clause.
let fields = e
.schema
.fields()
.iter()
.map(|f| {
let scalar = ScalarValue::try_from(f.data_type())?;
to_substrait_literal(producer, &scalar)
})
.collect::<datafusion::common::Result<_>>()?;

ReadType::VirtualTable(VirtualTable {
// Use deprecated 'values' field instead of 'expressions' because the consumer's
// nested expression support (RexType::Nested) is not yet implemented.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// nested expression support (RexType::Nested) is not yet implemented.
// nested expression support (RelType::Nested) is not yet implemented.

// The 'values' field uses literal::Struct which the consumer can properly
// deserialize with field name preservation.
#[allow(deprecated)]
values: vec![Struct { fields }],
expressions: vec![],
})
} else {
ReadType::VirtualTable(VirtualTable {
#[allow(deprecated)]
values: vec![],
expressions: vec![],
})
};
Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
base_schema: Some(to_substrait_named_struct(producer, &e.schema)?),
base_schema: Some(base_schema),
filter: None,
best_effort_filter: None,
projection: None,
advanced_extension: None,
read_type: Some(ReadType::VirtualTable(VirtualTable {
values: vec![],
expressions: vec![],
})),
read_type: Some(read_type),
}))),
}))
}
Expand Down Expand Up @@ -134,7 +170,6 @@ pub fn from_values(
Ok(Struct { fields })
})
.collect::<datafusion::common::Result<_>>()?;
#[allow(deprecated)]
Ok(Box::new(Rel {
rel_type: Some(RelType::Read(Box::new(ReadRel {
common: None,
Expand All @@ -144,6 +179,7 @@ pub fn from_values(
projection: None,
advanced_extension: None,
read_type: Some(ReadType::VirtualTable(VirtualTable {
#[allow(deprecated)]
values,
expressions: vec![],
})),
Expand Down
44 changes: 42 additions & 2 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use datafusion::execution::registry::SerializerRegistry;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::{
Extension, InvariantLevel, LogicalPlan, PartitionEvaluator, Repartition,
UserDefinedLogicalNode, Values, Volatility,
EmptyRelation, Extension, InvariantLevel, LogicalPlan, PartitionEvaluator,
Repartition, UserDefinedLogicalNode, Values, Volatility,
};
use datafusion::optimizer::simplify_expressions::expr_simplifier::THRESHOLD_INLINE_INLIST;
use datafusion::prelude::*;
Expand Down Expand Up @@ -185,6 +185,46 @@ async fn simple_select() -> Result<()> {
roundtrip("SELECT a, b FROM data").await
}

#[tokio::test]
async fn roundtrip_literal_without_from() -> Result<()> {
roundtrip("SELECT 1 AS one").await
}

#[tokio::test]
async fn roundtrip_empty_relation_with_schema() -> Result<()> {
// Test produce_one_row=true with multiple typed columns
roundtrip("SELECT 1::int as a, 'hello'::text as b, 3.14::double as c").await
}

#[tokio::test]
async fn roundtrip_empty_relation_no_rows() -> Result<()> {
// Test produce_one_row=false
let ctx = create_context().await?;
let plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: DFSchemaRef::new(DFSchema::empty()),
});
roundtrip_logical_plan_with_ctx(plan, ctx).await?;
Ok(())
}

#[tokio::test]
async fn roundtrip_subquery_with_empty_relation() -> Result<()> {
// Test EmptyRelation in the context of scalar subqueries.
// The optimizer may simplify the subquery away, but we're testing that
// the EmptyRelation round-trips correctly when it appears in the plan.
let ctx = create_context().await?;
let df = ctx.sql("SELECT (SELECT 1) as nested").await?;
let plan = df.into_optimized_plan()?;

// Just verify the round-trip succeeds and produces valid results
let proto = to_substrait_plan(&plan, &ctx.state())?;
let plan2 = from_substrait_plan(&ctx.state(), &proto).await?;
let df2 = DataFrame::new(ctx.state(), plan2);
df2.show().await?;
Ok(())
}

#[tokio::test]
async fn wildcard_select() -> Result<()> {
let plan = generate_plan_from_sql("SELECT * FROM data", true, false).await?;
Expand Down