Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 2 additions & 26 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
use datafusion::row::RowType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -37,38 +36,15 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);

c.bench_function("compact row serializer", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
)
})
});

c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});

c.bench_function("compact row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
.unwrap(),
)
criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
})
});

c.bench_function("word aligned row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
})
});
}
Expand Down
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
use datafusion_row::reader::{read_row, RowReader};
use datafusion_row::{MutableRecordBatch, RowType};
use datafusion_row::MutableRecordBatch;
use hashbrown::raw::RawTable;

/// Grouping aggregate with row-format aggregation states inside.
///
/// For each aggregation entry, we use:
/// - [Compact] row represents grouping keys for fast hash computation and comparison directly on raw bytes.
/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes.
/// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often.
///
/// The architecture is the following:
Expand All @@ -68,8 +68,8 @@ use hashbrown::raw::RawTable;
/// 4. The state's RecordBatch is `merge`d to a new state
/// 5. The state is mapped to the final value
///
/// [Compact]: datafusion_row::layout::RowType::Compact
/// [WordAligned]: datafusion_row::layout::RowType::WordAligned
/// [Arrow-row]: OwnedRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// [WordAligned]: datafusion_row::layout
pub(crate) struct GroupedHashAggregateStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
Expand Down Expand Up @@ -203,8 +203,7 @@ impl GroupedHashAggregateStream {
.collect(),
)?;

let row_aggr_layout =
Arc::new(RowLayout::new(&row_aggr_schema, RowType::WordAligned));
let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema));

let name = format!("GroupedHashAggregateStream[{partition}]");
let aggr_state = AggregationState {
Expand Down Expand Up @@ -632,15 +631,14 @@ impl GroupedHashAggregateStream {
// Store row accumulator results (either final output or intermediate state):
let row_columns = match self.mode {
AggregateMode::Partial => {
read_as_batch(&state_buffers, &self.row_aggr_schema, RowType::WordAligned)
read_as_batch(&state_buffers, &self.row_aggr_schema)
}
AggregateMode::Final
| AggregateMode::FinalPartitioned
| AggregateMode::Single => {
let mut results = vec![];
for (idx, acc) in self.row_accumulators.iter().enumerate() {
let mut state_accessor =
RowAccessor::new(&self.row_aggr_schema, RowType::WordAligned);
let mut state_accessor = RowAccessor::new(&self.row_aggr_schema);
let current = state_buffers
.iter_mut()
.map(|buffer| {
Expand Down Expand Up @@ -727,10 +725,10 @@ impl GroupedHashAggregateStream {
}
}

fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, row_type: RowType) -> Vec<ArrayRef> {
fn read_as_batch(rows: &[Vec<u8>], schema: &Schema) -> Vec<ArrayRef> {
let row_num = rows.len();
let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone()));
let mut row = RowReader::new(schema, row_type);
let mut row = RowReader::new(schema);

for data in rows {
row.point_to(0, data);
Expand Down
29 changes: 2 additions & 27 deletions datafusion/core/tests/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,13 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_row::layout::RowType::{Compact, WordAligned};
use datafusion_row::reader::read_as_batch;
use datafusion_row::writer::write_batch_unchecked;
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use std::sync::Arc;

#[tokio::test]
async fn test_with_parquet() -> Result<()> {
let ctx = SessionContext::new();
let state = ctx.state();
let task_ctx = state.task_ctx();
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
let exec =
get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?;
let schema = exec.schema().clone();

let batches = collect(exec, task_ctx).await?;
assert_eq!(1, batches.len());
let batch = &batches[0];

let mut vector = vec![0; 20480];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? };
assert_eq!(*batch, output_batch);

Ok(())
}

#[tokio::test]
async fn test_with_parquet_word_aligned() -> Result<()> {
let ctx = SessionContext::new();
let state = ctx.state();
let task_ctx = state.task_ctx();
Expand All @@ -67,9 +43,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> {
let batch = &batches[0];

let mut vector = vec![0; 20480];
let row_offsets =
{ write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? };
let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) };
let output_batch = { read_as_batch(&vector, schema, &row_offsets)? };
assert_eq!(*batch, output_batch);

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions datafusion/row/src/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! [`RowAccessor`] provides a Read/Write/Modify access for row with all fixed-sized fields:

use crate::layout::{RowLayout, RowType};
use crate::layout::RowLayout;
use crate::validity::NullBitsFormatter;
use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx};
use arrow::datatypes::{DataType, Schema};
Expand Down Expand Up @@ -116,9 +116,9 @@ macro_rules! fn_get_idx_scalar {

impl<'a> RowAccessor<'a> {
/// new
pub fn new(schema: &Schema, row_type: RowType) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This is always great to cut out the unused code. For public functions though it can be a user interface change for downstream projects btw?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for bringing this out! I've created an issue, added an API change label, and edited the PR descriptions accordingly.

pub fn new(schema: &Schema) -> Self {
Self {
layout: Arc::new(RowLayout::new(schema, row_type)),
layout: Arc::new(RowLayout::new(schema)),
data: &mut [],
base_offset: 0,
}
Expand Down
Loading