Skip to content

Commit 463c048

Browse files
committed
Add notion of sortedness to ExecutionPlan, use to avoid repartitioning when that would result in incorrect behavior
1 parent 2858e34 commit 463c048

30 files changed

+565
-77
lines changed

datafusion/src/physical_optimizer/repartition.rs

Lines changed: 309 additions & 52 deletions
Large diffs are not rendered by default.

datafusion/src/physical_plan/analyze.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::{
3030
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
3131
use futures::StreamExt;
3232

33+
use super::expressions::PhysicalSortExpr;
3334
use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
3435
use crate::execution::runtime_env::RuntimeEnv;
3536
use async_trait::async_trait;
@@ -82,6 +83,10 @@ impl ExecutionPlan for AnalyzeExec {
8283
Partitioning::UnknownPartitioning(1)
8384
}
8485

86+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
87+
None
88+
}
89+
8590
fn with_new_children(
8691
&self,
8792
mut children: Vec<Arc<dyn ExecutionPlan>>,

datafusion/src/physical_plan/coalesce_batches.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use async_trait::async_trait;
3838
use futures::stream::{Stream, StreamExt};
3939
use log::debug;
4040

41+
use super::expressions::PhysicalSortExpr;
4142
use super::metrics::{BaselineMetrics, MetricsSet};
4243
use super::{metrics::ExecutionPlanMetricsSet, Statistics};
4344

@@ -97,6 +98,10 @@ impl ExecutionPlan for CoalesceBatchesExec {
9798
self.input.output_partitioning()
9899
}
99100

101+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
102+
None
103+
}
104+
100105
fn with_new_children(
101106
&self,
102107
children: Vec<Arc<dyn ExecutionPlan>>,

datafusion/src/physical_plan/coalesce_partitions.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use arrow::record_batch::RecordBatch;
3131
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};
3232

3333
use super::common::AbortOnDropMany;
34+
use super::expressions::PhysicalSortExpr;
3435
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
3536
use super::{RecordBatchStream, Statistics};
3637
use crate::error::{DataFusionError, Result};
@@ -86,6 +87,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
8687
Partitioning::UnknownPartitioning(1)
8788
}
8889

90+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
91+
None
92+
}
93+
8994
fn with_new_children(
9095
&self,
9196
children: Vec<Arc<dyn ExecutionPlan>>,

datafusion/src/physical_plan/cross_join.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use arrow::record_batch::RecordBatch;
2727

2828
use futures::{Stream, TryStreamExt};
2929

30+
use super::expressions::PhysicalSortExpr;
3031
use super::{
3132
coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid,
3233
ColumnStatistics, Statistics,
@@ -137,6 +138,10 @@ impl ExecutionPlan for CrossJoinExec {
137138
self.right.output_partitioning()
138139
}
139140

141+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
142+
None
143+
}
144+
140145
async fn execute(
141146
&self,
142147
partition: usize,

datafusion/src/physical_plan/empty.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use arrow::array::NullArray;
2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2929
use arrow::record_batch::RecordBatch;
3030

31+
use super::expressions::PhysicalSortExpr;
3132
use super::{common, SendableRecordBatchStream, Statistics};
3233

3334
use crate::execution::runtime_env::RuntimeEnv;
@@ -98,6 +99,10 @@ impl ExecutionPlan for EmptyExec {
9899
Partitioning::UnknownPartitioning(1)
99100
}
100101

102+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
103+
None
104+
}
105+
101106
fn with_new_children(
102107
&self,
103108
children: Vec<Arc<dyn ExecutionPlan>>,

datafusion/src/physical_plan/explain.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::{
3030
};
3131
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
3232

33-
use super::SendableRecordBatchStream;
33+
use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
3434
use crate::execution::runtime_env::RuntimeEnv;
3535
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
3636
use async_trait::async_trait;
@@ -89,6 +89,10 @@ impl ExecutionPlan for ExplainExec {
8989
Partitioning::UnknownPartitioning(1)
9090
}
9191

92+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
93+
None
94+
}
95+
9296
fn with_new_children(
9397
&self,
9498
children: Vec<Arc<dyn ExecutionPlan>>,

datafusion/src/physical_plan/file_format/avro.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#[cfg(feature = "avro")]
2020
use crate::avro_to_arrow;
2121
use crate::error::{DataFusionError, Result};
22+
use crate::physical_plan::expressions::PhysicalSortExpr;
2223
use crate::physical_plan::{
2324
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
2425
};
@@ -74,6 +75,10 @@ impl ExecutionPlan for AvroExec {
7475
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
7576
}
7677

78+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
79+
None
80+
}
81+
7782
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
7883
Vec::new()
7984
}

datafusion/src/physical_plan/file_format/csv.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! Execution plan for reading CSV files
1919
2020
use crate::error::{DataFusionError, Result};
21+
use crate::physical_plan::expressions::PhysicalSortExpr;
2122
use crate::physical_plan::{
2223
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
2324
};
@@ -88,6 +89,10 @@ impl ExecutionPlan for CsvExec {
8889
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
8990
}
9091

92+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
93+
None
94+
}
95+
9196
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
9297
// this is a leaf node and has no children
9398
vec![]

datafusion/src/physical_plan/file_format/json.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use async_trait::async_trait;
2020

2121
use crate::error::{DataFusionError, Result};
2222
use crate::execution::runtime_env::RuntimeEnv;
23+
use crate::physical_plan::expressions::PhysicalSortExpr;
2324
use crate::physical_plan::{
2425
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
2526
};
@@ -65,6 +66,10 @@ impl ExecutionPlan for NdJsonExec {
6566
Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
6667
}
6768

69+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
70+
None
71+
}
72+
6873
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
6974
Vec::new()
7075
}

0 commit comments

Comments
 (0)