-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Open
Description
Currently, scalar information is lost when data moves between execution plans. While PhysicalExpr and ScalarUDFImpl work with ColumnarValue (enabling optimized implementations for scalars), the stream of RecordBatches between ExecutionPlans doesn't preserve scalar knowledge.
Proposed Solution
Allow ExecutionPlan to return a stream that preserves scalar information (will return a stream of ReturnedValue):
enum ReturnedValue {
Batch(RecordBatch),
BatchWithScalars(RecordBatchWithScalars)
}
/// Same as ColumnarValue but with Arc-wrapped Scalar to avoid unnecessary copying
enum ColumnarValue {
Array(ArrayRef),
Scalar(Arc<ScalarValue>),
}
struct RecordBatchWithScalars {
schema: SchemaRef,
columns: Vec<ColumnarValue>,
row_count: usize,
}Benefits
Sort Operations
- Skip sorting: When a column contains a scalar value (same value for the entire batch), sorting on that column can be skipped
- Efficient copying: When sorting by other columns, scalar columns can be copied more efficiently or only partially
- Produce scalars: Sort operations can output scalars for faster downstream operations
Aggregation
- Avoid expensive operations: When grouping by a scalar column, expensive hashing/comparison can be avoided
Downsides
- Significant breaking change
- Increased code complexity
Potential Extensions
We could add another variant for row-based encoding:
ReturnedValue::EncodedRows(Rows)This would allow operators that process data as rows to pass encoded rows directly to the next operator, avoiding unnecessary conversions between columnar and row formats. The receiving operator can then decide whether to:
- Use row-based input directly (avoiding conversion overhead if it would benefit from row based as well)
- Convert to columnar format (same cost as current behavior)
Metadata
Metadata
Assignees
Labels
No labels