Skip to content
Merged
9 changes: 9 additions & 0 deletions ballista/rust/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::utils::WrappedStream;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -82,6 +83,14 @@ impl ExecutionPlan for DistributedQueryExec {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
9 changes: 9 additions & 0 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
Expand Down Expand Up @@ -85,6 +86,14 @@ impl ExecutionPlan for ShuffleReaderExec {
Partitioning::UnknownPartitioning(self.partition.len())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
9 changes: 9 additions & 0 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use datafusion::physical_plan::expressions::PhysicalSortExpr;
use parking_lot::Mutex;
use std::fs::File;
use std::iter::Iterator;
Expand Down Expand Up @@ -334,6 +335,14 @@ impl ExecutionPlan for ShuffleWriterExec {
self.plan.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.plan.clone()]
}
Expand Down
9 changes: 9 additions & 0 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::serde::scheduler::PartitionLocation;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -85,6 +86,14 @@ impl ExecutionPlan for UnresolvedShuffleExec {
Partitioning::UnknownPartitioning(self.output_partition_count)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
5 changes: 5 additions & 0 deletions ballista/rust/executor/src/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::arrow::{
};
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -62,6 +63,10 @@ impl ExecutionPlan for CollectExec {
Partitioning::UnknownPartitioning(1)
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.plan.clone()]
}
Expand Down
Loading