Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 2 additions & 55 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use crate::error::{_internal_datafusion_err, _internal_err};
use crate::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use arrow::array::{ArrayRef, PrimitiveArray};
use arrow::buffer::OffsetBuffer;
use arrow::compute;
use arrow::compute::{partition, SortColumn, SortOptions};
use arrow::compute::{partition, take_arrays, SortColumn, SortOptions};
use arrow::datatypes::{Field, SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::cast::AsArray;
Expand Down Expand Up @@ -98,7 +97,7 @@ pub fn get_record_batch_at_indices(
record_batch: &RecordBatch,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<RecordBatch> {
let new_columns = take_arrays(record_batch.columns(), indices)?;
let new_columns = take_arrays(record_batch.columns(), indices, None)?;
RecordBatch::try_new_with_options(
record_batch.schema(),
new_columns,
Expand Down Expand Up @@ -290,24 +289,6 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
Ok(idents)
}

/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
///
/// TODO: use implementation in arrow-rs when available:
/// <https://github.com/apache/arrow-rs/pull/6475>
pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result<Vec<ArrayRef>> {
arrays
.iter()
.map(|array| {
compute::take(
array.as_ref(),
indices,
None, // None: no index check
)
.map_err(|e| arrow_datafusion_err!(e))
})
.collect()
}

pub(crate) fn parse_identifiers_normalized(s: &str, ignore_case: bool) -> Vec<String> {
parse_identifiers(s)
.unwrap_or_default()
Expand Down Expand Up @@ -1003,40 +984,6 @@ mod tests {
Ok(())
}

#[test]
fn test_take_arrays() -> Result<()> {
let arrays: Vec<ArrayRef> = vec![
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 9., 10.])),
Arc::new(Float64Array::from(vec![2.0, 3.0, 3.0, 4.0, 5.0])),
Arc::new(Float64Array::from(vec![5.0, 7.0, 8.0, 10., 11.0])),
Arc::new(Float64Array::from(vec![15.0, 13.0, 8.0, 5., 0.0])),
];

let row_indices_vec: Vec<Vec<u32>> = vec![
// Get rows 0 and 1
vec![0, 1],
// Get rows 0 and 1
vec![0, 2],
// Get rows 1 and 3
vec![1, 3],
// Get rows 2 and 4
vec![2, 4],
];
for row_indices in row_indices_vec {
let indices: PrimitiveArray<UInt32Type> =
PrimitiveArray::from_iter_values(row_indices.iter().cloned());
let chunk = take_arrays(&arrays, &indices)?;
for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
for (idx, orig_idx) in row_indices.iter().enumerate() {
let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
let res2 = ScalarValue::try_from_array(arr_chunk, idx)?;
assert_eq!(res1, res2);
}
}
}
Ok(())
}

#[test]
fn test_get_at_indices() -> Result<()> {
let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ use arrow::array::new_empty_array;
use arrow::{
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
compute::take_arrays,
datatypes::UInt32Type,
};
use datafusion_common::{
arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue,
};
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result, ScalarValue};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};

Expand Down Expand Up @@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter {
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
// accumulator once per group with values
let values = take_arrays(values, &batch_indices)?;
let values = take_arrays(values, &batch_indices, None)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;

// invoke each accumulator with the appropriate rows, first
Expand Down
8 changes: 4 additions & 4 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::fmt::Debug;
use std::sync::{Arc, OnceLock};

use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::compute::{self, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays};
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -340,7 +340,7 @@ impl Accumulator for FirstValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices, None)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
Expand Down Expand Up @@ -670,7 +670,7 @@ impl Accumulator for LastValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
take_arrays(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices, None)?
};

if !ordered_states[0].is_empty() {
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};

use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use arrow::record_batch::RecordBatch;
use arrow_array::{PrimitiveArray, RecordBatchOptions};
use datafusion_common::utils::{take_arrays, transpose};
use datafusion_common::utils::transpose;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
Expand Down Expand Up @@ -300,7 +301,7 @@ impl BatchPartitioner {
let _timer = partitioner_timer.timer();

// Produce batches based on indices
let columns = take_arrays(batch.columns(), &indices)?;
let columns = take_arrays(batch.columns(), &indices, None)?;

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices.len()));
Expand Down
5 changes: 2 additions & 3 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ use crate::{
SendableRecordBatchStream, Statistics,
};

use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::utils::take_arrays;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -618,7 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};

let columns = take_arrays(batch.columns(), &indices)?;
let columns = take_arrays(batch.columns(), &indices, None)?;

let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ use crate::{
use ahash::RandomState;
use arrow::{
array::{Array, ArrayRef, RecordBatchOptions, UInt32Builder},
compute::{concat, concat_batches, sort_to_indices},
compute::{concat, concat_batches, sort_to_indices, take_arrays},
datatypes::SchemaRef,
record_batch::RecordBatch,
};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::stats::Precision;
use datafusion_common::utils::{
evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices,
get_row_at_idx, take_arrays,
get_row_at_idx,
};
use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -536,7 +536,9 @@ impl PartitionSearcher for LinearSearch {
// We should emit columns according to row index ordering.
let sorted_indices = sort_to_indices(&all_indices, None, None)?;
// Construct new column according to row ordering. This fixes ordering
take_arrays(&new_columns, &sorted_indices).map(Some)
take_arrays(&new_columns, &sorted_indices, None)
.map(Some)
.map_err(|e| arrow_datafusion_err!(e))
}

fn evaluate_partition_batches(
Expand Down