Skip to content

Commit

Permalink
modify sort_batch to use arrow row format for multi-column sorts
Browse files Browse the repository at this point in the history
  • Loading branch information
jaylmiller committed Feb 10, 2023
1 parent 8a262c3 commit ec44910
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ use crate::physical_plan::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionConfig;
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData};
use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array};
pub use arrow::compute::SortOptions;
use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions};
use arrow::datatypes::SchemaRef;
use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use arrow::row::{Row, RowConverter, SortField};
use datafusion_physical_expr::EquivalenceProperties;
use futures::{Stream, StreamExt, TryStreamExt};
use log::{debug, error};
Expand Down Expand Up @@ -820,7 +821,29 @@ fn sort_batch(
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()?;

let indices = lexsort_to_indices(&sort_columns, fetch)?;
let indices = if sort_columns.len() == 1 {
lexsort_to_indices(&sort_columns, fetch)?
} else {
let sort_fields = sort_columns
.iter()
.map(|c| {
let datatype = c.values.data_type().to_owned();
SortField::new_with_options(datatype, c.options.unwrap_or_default())
})
.collect::<Vec<_>>();
let arrays: Vec<ArrayRef> =
sort_columns.iter().map(|c| c.values.clone()).collect();
let mut row_converter = RowConverter::new(sort_fields)?;
let rows = row_converter.convert_columns(&arrays)?;

let mut to_sort: Vec<(usize, Row)> = rows.into_iter().enumerate().collect();
to_sort.sort_unstable_by(|(_, row_a), (_, row_b)| row_a.cmp(row_b));
let limit = match fetch {
Some(lim) => lim.min(to_sort.len()),
None => to_sort.len(),
};
UInt32Array::from_iter(to_sort.into_iter().take(limit).map(|(idx, _)| idx as u32))
};

// reorder all rows based on sorted indices
let sorted_batch = RecordBatch::try_new(
Expand Down

0 comments on commit ec44910

Please sign in to comment.