Skip to content
4 changes: 2 additions & 2 deletions datafusion-examples/examples/file_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod non_windows {
use datafusion::datasource::TableProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{exec_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::SortExpr;

// Number of lines written to FIFO
const TEST_BATCH_SIZE: usize = 5;
Expand All @@ -49,7 +49,7 @@ mod non_windows {
fn fifo_table(
schema: SchemaRef,
path: impl Into<PathBuf>,
sort: Vec<Vec<Expr>>,
sort: Vec<Vec<SortExpr>>,
) -> Arc<dyn TableProvider> {
let source = FileStreamProvider::new_file(schema, path.into())
.with_batch_size(TEST_BATCH_SIZE)
Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
Expand Down Expand Up @@ -577,7 +577,7 @@ impl DataFrame {
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<Expr>>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan)
.distinct_on(on_expr, select_expr, sort_expr)?
Expand Down Expand Up @@ -776,6 +776,15 @@ impl DataFrame {
})
}

/// Apply a sort by provided expressions with default direction
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame> {
self.sort(
expr.into_iter()
.map(|e| e.sort(true, false))
.collect::<Vec<SortExpr>>(),
)
}

/// Sort the DataFrame by the specified sorting expressions.
///
/// Note that any expression can be turned into
Expand All @@ -797,7 +806,7 @@ impl DataFrame {
/// # Ok(())
/// # }
/// ```
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame> {
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?;
Ok(DataFrame {
session_state: self.session_state,
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::datasource::{
};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use crate::logical_expr::Expr;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::TableOptions;
Expand All @@ -41,6 +40,7 @@ use datafusion_common::{
};

use async_trait::async_trait;
use datafusion_expr::SortExpr;

/// Options that control the reading of CSV files.
///
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct CsvReadOptions<'a> {
/// File compression type
pub file_compression_type: FileCompressionType,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for CsvReadOptions<'a> {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a> CsvReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct ParquetReadOptions<'a> {
/// based on data in file.
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
Expand Down Expand Up @@ -278,7 +278,7 @@ impl<'a> ParquetReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -397,7 +397,7 @@ pub struct NdJsonReadOptions<'a> {
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for NdJsonReadOptions<'a> {
Expand Down Expand Up @@ -452,7 +452,7 @@ impl<'a> NdJsonReadOptions<'a> {
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
}

// TODO other expressions are not handled yet:
// - AGGREGATE, WINDOW and SORT should not end up in filter conditions, except maybe in some edge cases
// - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
// - Can `Wildcard` be considered as a `Literal`?
// - ScalarVariable could be `applicable`, but that would require access to the context
Expr::AggregateFunction { .. }
| Expr::Sort { .. }
| Expr::WindowFunction { .. }
| Expr::Wildcard { .. }
| Expr::Unnest { .. }
Expand Down
14 changes: 4 additions & 10 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::TableType;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
Expand Down Expand Up @@ -222,7 +222,7 @@ pub struct ListingOptions {
/// ordering (encapsulated by a `Vec<Expr>`). If there aren't
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<Expr>>,
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl ListingOptions {
Expand Down Expand Up @@ -385,7 +385,7 @@ impl ListingOptions {
///
/// assert_eq!(listing_options.file_sort_order, file_sort_order);
/// ```
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
Expand Down Expand Up @@ -909,8 +909,7 @@ impl TableProvider for ListingTable {
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
let order_requirements = if self.options().file_sort_order != unsorted {
let order_requirements = if !self.options().file_sort_order.is_empty() {
// Multiple sort orders in outer vec are equivalent, so we pass only the first one
let ordering = self
.try_create_output_ordering()?
Expand Down Expand Up @@ -1160,11 +1159,6 @@ mod tests {
// (file_sort_order, expected_result)
let cases = vec![
(vec![], Ok(vec![])),
// not a sort expr
(
vec![vec![col("string_col")]],
Err("Expected Expr::Sort in output_ordering, but got string_col"),
),
// sort expr, but non column
(
vec![vec![
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_expr::SortExpr;
use futures::StreamExt;
use log::debug;
use parking_lot::Mutex;
Expand All @@ -64,7 +65,7 @@ pub struct MemTable {
column_defaults: HashMap<String, Expr>,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
/// inserting data into this table removes the order
pub sort_order: Arc<Mutex<Vec<Vec<Expr>>>>,
pub sort_order: Arc<Mutex<Vec<Vec<SortExpr>>>>,
}

impl MemTable {
Expand Down Expand Up @@ -118,7 +119,7 @@ impl MemTable {
///
/// Note that multiple sort orders are supported, if some are known to be
/// equivalent,
pub fn with_sort_order(self, mut sort_order: Vec<Vec<Expr>>) -> Self {
pub fn with_sort_order(self, mut sort_order: Vec<Vec<SortExpr>>) -> Self {
std::mem::swap(self.sort_order.lock().as_mut(), &mut sort_order);
self
}
Expand Down
41 changes: 21 additions & 20 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,38 +50,39 @@ pub use statistics::get_statistics_with_limit;

use arrow_schema::{Schema, SortOptions};
use datafusion_common::{plan_err, Result};
use datafusion_expr::Expr;
use datafusion_expr::{Expr, SortExpr};
use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};

fn create_ordering(
schema: &Schema,
sort_order: &[Vec<Expr>],
sort_order: &[Vec<SortExpr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];

for exprs in sort_order {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for expr in exprs {
match expr {
Expr::Sort(sort) => match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
for sort in exprs {
match sort.expr.as_ref() {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
sort_exprs.push(PhysicalSortExpr {
expr,
options: SortOptions {
descending: !sort.asc,
nulls_first: sort.nulls_first,
},
});
}
expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => {
return plan_err!(
"Expected single column references in output_ordering, got {expr}"
)
}
expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
}
}
if !sort_exprs.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ mod tests {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<datafusion_expr::Expr>,
sort: Vec<datafusion_expr::SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
Expand Down Expand Up @@ -248,7 +248,7 @@ impl StreamProvider for FileStreamProvider {
#[derive(Debug)]
pub struct StreamConfig {
source: Arc<dyn StreamProvider>,
order: Vec<Vec<Expr>>,
order: Vec<Vec<SortExpr>>,
constraints: Constraints,
}

Expand All @@ -263,7 +263,7 @@ impl StreamConfig {
}

/// Specify a sort order for the stream
pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
pub fn with_order(mut self, order: Vec<Vec<SortExpr>>) -> Self {
self.order = order;
self
}
Expand Down
32 changes: 14 additions & 18 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
WindowFrame, WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -1641,31 +1641,27 @@ pub fn create_aggregate_expr_and_maybe_filter(

/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
e: &Expr,
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
if let Expr::Sort(expr::Sort {
let SortExpr {
expr,
asc,
nulls_first,
}) = e
{
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
} else {
internal_err!("Expects a sort expression")
}
} = e;
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}

/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[Expr],
exprs: &[SortExpr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<LexOrdering> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::TableReference;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr};

Expand Down Expand Up @@ -360,7 +360,7 @@ pub fn register_unbounded_file_with_ordering(
schema: SchemaRef,
file_path: &Path,
table_name: &str,
file_sort_order: Vec<Vec<Expr>>,
file_sort_order: Vec<Vec<SortExpr>>,
) -> Result<()> {
let source = FileStreamProvider::new_file(schema, file_path.into());
let config = StreamConfig::new(Arc::new(source)).with_order(file_sort_order);
Expand Down
Loading