From c9614187c6480d29a703afde5b149671964e180a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 16 Apr 2023 00:57:22 +0800 Subject: [PATCH 1/4] Remove compact row since it's no longer used --- datafusion/core/benches/jit.rs | 28 +-- .../src/physical_plan/aggregates/row_hash.rs | 20 +- datafusion/core/tests/row.rs | 29 +-- datafusion/row/src/accessor.rs | 6 +- datafusion/row/src/jit/mod.rs | 203 ++---------------- datafusion/row/src/jit/reader.rs | 12 +- datafusion/row/src/jit/writer.rs | 11 +- datafusion/row/src/layout.rs | 202 +++++------------ datafusion/row/src/lib.rs | 192 +++-------------- datafusion/row/src/reader.rs | 63 +----- datafusion/row/src/writer.rs | 107 +-------- 11 files changed, 134 insertions(+), 739 deletions(-) diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs index 0c6de319d2ce..d42df8e033ab 100644 --- a/datafusion/core/benches/jit.rs +++ b/datafusion/core/benches/jit.rs @@ -25,7 +25,6 @@ use crate::criterion::Criterion; use crate::data_utils::{create_record_batches, create_schema}; use datafusion::row::jit::writer::bench_write_batch_jit; use datafusion::row::writer::bench_write_batch; -use datafusion::row::RowType; use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { @@ -37,38 +36,15 @@ fn criterion_benchmark(c: &mut Criterion) { let batches = create_record_batches(schema.clone(), array_len, partitions_len, batch_size); - c.bench_function("compact row serializer", |b| { - b.iter(|| { - criterion::black_box( - bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(), - ) - }) - }); - c.bench_function("word aligned row serializer", |b| { b.iter(|| { - criterion::black_box( - bench_write_batch(&batches, schema.clone(), RowType::WordAligned) - .unwrap(), - ) - }) - }); - - c.bench_function("compact row serializer jit", |b| { - b.iter(|| { - criterion::black_box( - bench_write_batch_jit(&batches, schema.clone(), RowType::Compact) - .unwrap(), - ) + criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap()) }) }); c.bench_function("word aligned row serializer jit", |b| { b.iter(|| { - criterion::black_box( - bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned) - .unwrap(), - ) + criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap()) }) }); } diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 9a75da02fb35..d9e42e478de0 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -51,13 +51,13 @@ use datafusion_expr::Accumulator; use datafusion_row::accessor::RowAccessor; use datafusion_row::layout::RowLayout; use datafusion_row::reader::{read_row, RowReader}; -use datafusion_row::{MutableRecordBatch, RowType}; +use datafusion_row::MutableRecordBatch; use hashbrown::raw::RawTable; /// Grouping aggregate with row-format aggregation states inside. /// /// For each aggregation entry, we use: -/// - [Compact] row represents grouping keys for fast hash computation and comparison directly on raw bytes. +/// - [Arrow-row] represents grouping keys for fast hash computation and comparison directly on raw bytes. /// - [WordAligned] row to store aggregation state, designed to be CPU-friendly when updates over every field are often. /// /// The architecture is the following: @@ -68,8 +68,8 @@ use hashbrown::raw::RawTable; /// 4. The state's RecordBatch is `merge`d to a new state /// 5. The state is mapped to the final value /// -/// [Compact]: datafusion_row::layout::RowType::Compact -/// [WordAligned]: datafusion_row::layout::RowType::WordAligned +/// [Arrow-row]: OwnedRow +/// [WordAligned]: datafusion_row::layout pub(crate) struct GroupedHashAggregateStream { schema: SchemaRef, input: SendableRecordBatchStream, @@ -203,8 +203,7 @@ impl GroupedHashAggregateStream { .collect(), )?; - let row_aggr_layout = - Arc::new(RowLayout::new(&row_aggr_schema, RowType::WordAligned)); + let row_aggr_layout = Arc::new(RowLayout::new(&row_aggr_schema)); let name = format!("GroupedHashAggregateStream[{partition}]"); let aggr_state = AggregationState { @@ -632,15 +631,14 @@ impl GroupedHashAggregateStream { // Store row accumulator results (either final output or intermediate state): let row_columns = match self.mode { AggregateMode::Partial => { - read_as_batch(&state_buffers, &self.row_aggr_schema, RowType::WordAligned) + read_as_batch(&state_buffers, &self.row_aggr_schema) } AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Single => { let mut results = vec![]; for (idx, acc) in self.row_accumulators.iter().enumerate() { - let mut state_accessor = - RowAccessor::new(&self.row_aggr_schema, RowType::WordAligned); + let mut state_accessor = RowAccessor::new(&self.row_aggr_schema); let current = state_buffers .iter_mut() .map(|buffer| { @@ -727,10 +725,10 @@ impl GroupedHashAggregateStream { } } -fn read_as_batch(rows: &[Vec], schema: &Schema, row_type: RowType) -> Vec { +fn read_as_batch(rows: &[Vec], schema: &Schema) -> Vec { let row_num = rows.len(); let mut output = MutableRecordBatch::new(row_num, Arc::new(schema.clone())); - let mut row = RowReader::new(schema, row_type); + let mut row = RowReader::new(schema); for data in rows { row.point_to(0, data); diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 5eeb237e187e..55310bb611c3 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -23,7 +23,6 @@ use datafusion::execution::context::SessionState; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; -use datafusion_row::layout::RowType::{Compact, WordAligned}; use datafusion_row::reader::read_as_batch; use datafusion_row::writer::write_batch_unchecked; use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; @@ -31,29 +30,6 @@ use std::sync::Arc; #[tokio::test] async fn test_with_parquet() -> Result<()> { - let ctx = SessionContext::new(); - let state = ctx.state(); - let task_ctx = state.task_ctx(); - let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = - get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?; - let schema = exec.schema().clone(); - - let batches = collect(exec, task_ctx).await?; - assert_eq!(1, batches.len()); - let batch = &batches[0]; - - let mut vector = vec![0; 20480]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; - assert_eq!(*batch, output_batch); - - Ok(()) -} - -#[tokio::test] -async fn test_with_parquet_word_aligned() -> Result<()> { let ctx = SessionContext::new(); let state = ctx.state(); let task_ctx = state.task_ctx(); @@ -67,9 +43,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let batch = &batches[0]; let mut vector = vec![0; 20480]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? }; + let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(*batch, output_batch); Ok(()) diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs index e7b4ed85016a..bba44f0e56a5 100644 --- a/datafusion/row/src/accessor.rs +++ b/datafusion/row/src/accessor.rs @@ -17,7 +17,7 @@ //! [`RowAccessor`] provides a Read/Write/Modify access for row with all fixed-sized fields: -use crate::layout::{RowLayout, RowType}; +use crate::layout::RowLayout; use crate::validity::NullBitsFormatter; use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx}; use arrow::datatypes::{DataType, Schema}; @@ -116,9 +116,9 @@ macro_rules! fn_get_idx_scalar { impl<'a> RowAccessor<'a> { /// new - pub fn new(schema: &Schema, row_type: RowType) -> Self { + pub fn new(schema: &Schema) -> Self { Self { - layout: Arc::new(RowLayout::new(schema, row_type)), + layout: Arc::new(RowLayout::new(schema)), data: &mut [], base_offset: 0, } diff --git a/datafusion/row/src/jit/mod.rs b/datafusion/row/src/jit/mod.rs index 03b8eed18664..803c96b176b7 100644 --- a/datafusion/row/src/jit/mod.rs +++ b/datafusion/row/src/jit/mod.rs @@ -45,7 +45,6 @@ fn fn_name(f: T) -> &'static str { mod tests { use crate::jit::reader::read_as_batch_jit; use crate::jit::writer::write_batch_unchecked_jit; - use crate::layout::RowType::{Compact, WordAligned}; use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; use datafusion_common::Result; @@ -54,26 +53,26 @@ mod tests { use DataType::*; macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => { + ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { paste::item! { #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); let a = $ARRAY::from($VEC); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let assembler = Assembler::default(); let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? }; + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; assert_eq!(batch, output_batch); Ok(()) } #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); let a = $ARRAY::from(v); @@ -81,8 +80,8 @@ mod tests { let mut vector = vec![0; 1024]; let assembler = Assembler::default(); let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? }; + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -93,240 +92,78 @@ mod tests { fn_test_single_type!( BooleanArray, Boolean, - vec![Some(true), Some(false), None, Some(true), None], - Compact - ); - - fn_test_single_type!( - BooleanArray, - Boolean, - vec![Some(true), Some(false), None, Some(true), None], - WordAligned - ); - - fn_test_single_type!( - Int8Array, - Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(true), Some(false), None, Some(true), None] ); fn_test_single_type!( Int8Array, Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int16Array, Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int16Array, - Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int32Array, Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int32Array, - Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int64Array, Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int64Array, - Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt8Array, - UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt8Array, UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt16Array, UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - UInt16Array, - UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt32Array, - UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt32Array, UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt64Array, - UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt64Array, UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Float32Array, Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - Compact - ); - - fn_test_single_type!( - Float32Array, - Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - WordAligned + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] ); fn_test_single_type!( Float64Array, Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - Compact - ); - - fn_test_single_type!( - Float64Array, - Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - WordAligned + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] ); fn_test_single_type!( Date32Array, Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Date32Array, - Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - Date64Array, - Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Date64Array, Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); - - fn_test_single_type!( - StringArray, - Utf8, - vec![Some("hello"), Some("world"), None, Some(""), Some("")], - Compact - ); - - #[test] - fn test_single_binary_jit() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); - let values: Vec> = - vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; - let a = BinaryArray::from_opt_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let assembler = Assembler::default(); - let row_offsets = { - write_batch_unchecked_jit( - &mut vector, - 0, - &batch, - 0, - schema.clone(), - &assembler, - Compact, - )? - }; - let output_batch = - { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - fn test_single_binary_jit_null_free() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); - let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; - let a = BinaryArray::from_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let assembler = Assembler::default(); - let row_offsets = { - write_batch_unchecked_jit( - &mut vector, - 0, - &batch, - 0, - schema.clone(), - &assembler, - Compact, - )? - }; - let output_batch = - { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? }; - assert_eq!(batch, output_batch); - Ok(()) - } } diff --git a/datafusion/row/src/jit/reader.rs b/datafusion/row/src/jit/reader.rs index d67035424557..ffb0aacb5a1e 100644 --- a/datafusion/row/src/jit/reader.rs +++ b/datafusion/row/src/jit/reader.rs @@ -18,7 +18,6 @@ //! Accessing row from raw bytes with JIT use crate::jit::fn_name; -use crate::layout::RowType; use crate::reader::RowReader; use crate::reader::*; use crate::reg_fn; @@ -39,11 +38,10 @@ pub fn read_as_batch_jit( schema: Arc, offsets: &[usize], assembler: &Assembler, - row_type: RowType, ) -> Result { let row_num = offsets.len(); let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema, row_type); + let mut row = RowReader::new(&schema); register_read_functions(assembler)?; let gen_func = gen_read_row(&schema, assembler)?; let mut jit = assembler.create_jit(); @@ -84,8 +82,6 @@ fn register_read_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, read_field_f64, reader_param.clone(), None); reg_fn!(asm, read_field_date32, reader_param.clone(), None); reg_fn!(asm, read_field_date64, reader_param.clone(), None); - reg_fn!(asm, read_field_utf8, reader_param.clone(), None); - reg_fn!(asm, read_field_binary, reader_param.clone(), None); reg_fn!(asm, read_field_bool_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_u8_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_u16_null_free, reader_param.clone(), None); @@ -99,8 +95,6 @@ fn register_read_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_utf8_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_binary_null_free, reader_param, None); Ok(()) } @@ -134,8 +128,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result b.call_stmt("read_field_f64", params)?, Date32 => b.call_stmt("read_field_date32", params)?, Date64 => b.call_stmt("read_field_date64", params)?, - Utf8 => b.call_stmt("read_field_utf8", params)?, - Binary => b.call_stmt("read_field_binary", params)?, _ => unimplemented!(), } } else { @@ -153,8 +145,6 @@ fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result b.call_stmt("read_field_f64_null_free", params)?, Date32 => b.call_stmt("read_field_date32_null_free", params)?, Date64 => b.call_stmt("read_field_date64_null_free", params)?, - Utf8 => b.call_stmt("read_field_utf8_null_free", params)?, - Binary => b.call_stmt("read_field_binary_null_free", params)?, _ => unimplemented!(), } } diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs index 38c3bd411402..8a787f9c051f 100644 --- a/datafusion/row/src/jit/writer.rs +++ b/datafusion/row/src/jit/writer.rs @@ -18,7 +18,6 @@ //! Reusable JIT version of row writer backed by Vec to stitch attributes together use crate::jit::fn_name; -use crate::layout::RowType; use crate::reg_fn; use crate::schema_null_free; use crate::writer::RowWriter; @@ -44,9 +43,8 @@ pub fn write_batch_unchecked_jit( row_idx: usize, schema: Arc, assembler: &Assembler, - row_type: RowType, ) -> Result> { - let mut writer = RowWriter::new(&schema, row_type); + let mut writer = RowWriter::new(&schema); let mut current_offset = offset; let mut offsets = vec![]; register_write_functions(assembler)?; @@ -61,7 +59,6 @@ pub fn write_batch_unchecked_jit( for cur_row in row_idx..batch.num_rows() { offsets.push(current_offset); code_fn(&mut writer, cur_row, batch); - writer.end_padding(); let row_width = writer.row_width; output[current_offset..current_offset + row_width] .copy_from_slice(writer.get_row()); @@ -76,10 +73,9 @@ pub fn write_batch_unchecked_jit( pub fn bench_write_batch_jit( batches: &[Vec], schema: Arc, - row_type: RowType, ) -> Result> { let assembler = Assembler::default(); - let mut writer = RowWriter::new(&schema, row_type); + let mut writer = RowWriter::new(&schema); let mut lengths = vec![]; register_write_functions(&assembler)?; let gen_func = gen_write_row(&schema, &assembler)?; @@ -92,7 +88,6 @@ pub fn bench_write_batch_jit( for batch in batches.iter().flatten() { for cur_row in 0..batch.num_rows() { code_fn(&mut writer, cur_row, batch); - writer.end_padding(); lengths.push(writer.row_width); writer.reset() } @@ -124,8 +119,6 @@ fn register_write_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, write_field_f64, reader_param.clone(), None); reg_fn!(asm, write_field_date32, reader_param.clone(), None); reg_fn!(asm, write_field_date64, reader_param.clone(), None); - reg_fn!(asm, write_field_utf8, reader_param.clone(), None); - reg_fn!(asm, write_field_binary, reader_param, None); Ok(()) } diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs index 6a8e8a78ec9d..3f8a812a0664 100644 --- a/datafusion/row/src/layout.rs +++ b/datafusion/row/src/layout.rs @@ -21,64 +21,54 @@ use crate::schema_null_free; use arrow::datatypes::{DataType, Schema}; use arrow::util::bit_util::{ceil, round_upto_power_of_2}; -const UTF8_DEFAULT_SIZE: usize = 20; -const BINARY_DEFAULT_SIZE: usize = 100; - -#[derive(Copy, Clone, Debug)] -/// Type of a RowLayout -pub enum RowType { - /// Stores each field with minimum bytes for space efficiency. - /// - /// Its typical use case represents a sorting payload that - /// accesses all row fields as a unit. - /// - /// Each tuple consists of up to three parts: "`null bit set`" , - /// "`values`" and "`var length data`" - /// - /// The null bit set is used for null tracking and is aligned to 1-byte. It stores - /// one bit per field. - /// - /// In the region of the values, we store the fields in the order they are defined in the schema. - /// - For fixed-length, sequential access fields, we store them directly. - /// E.g., 4 bytes for int and 1 byte for bool. - /// - For fixed-length, update often fields, we store one 8-byte word per field. - /// - For fields of non-primitive or variable-length types, - /// we append their actual content to the end of the var length region and - /// store their offset relative to row base and their length, packed into an 8-byte word. - /// - /// ```plaintext - /// ┌────────────────┬──────────────────────────┬───────────────────────┐ ┌───────────────────────┬────────────┐ - /// │Validity Bitmask│ Fixed Width Field │ Variable Width Field │ ... │ vardata area │ padding │ - /// │ (byte aligned) │ (native type width) │(vardata offset + len) │ │ (variable length) │ bytes │ - /// └────────────────┴──────────────────────────┴───────────────────────┘ └───────────────────────┴────────────┘ - /// ``` - /// - /// For example, given the schema (Int8, Utf8, Float32, Utf8) - /// - /// Encoding the tuple (1, "FooBar", NULL, "baz") - /// - /// Requires 32 bytes (31 bytes payload and 1 byte padding to make each tuple 8-bytes aligned): - /// - /// ```plaintext - /// ┌──────────┬──────────┬──────────────────────┬──────────────┬──────────────────────┬───────────────────────┬──────────┐ - /// │0b00001011│ 0x01 │0x00000016 0x00000006│ 0x00000000 │0x0000001C 0x00000003│ FooBarbaz │ 0x00 │ - /// └──────────┴──────────┴──────────────────────┴──────────────┴──────────────────────┴───────────────────────┴──────────┘ - /// 0 1 2 10 14 22 31 32 - /// ``` - Compact, - - /// This type of layout stores one 8-byte word per field for CPU-friendly, - /// It is mainly used to represent the rows with frequently updated content, - /// for example, grouping state for hash aggregation. - WordAligned, - // RawComparable, -} - -/// Reveals how the fields of a record are stored in the raw-bytes format +/// Row layout stores one or multiple 8-byte word(s) per field for CPU-friendly +/// and efficient processing. +/// +/// It is mainly used to represent the rows with frequently updated content, +/// for example, grouping state for hash aggregation. +/// +/// Each tuple consists of two parts: "`null bit set`" and "`values`". +/// +/// For null-free tuples, the null bit set can be omitted. +/// +/// The null bit set, when present, is aligned to 8 bytes. It stores one bit per field. +/// +/// In the region of the values, we store the fields in the order they are defined in the schema. +/// Each field is stored in one or multiple 8-byte words. +/// +/// ```plaintext +/// ┌─────────────────┬─────────────────────┐ +/// │Validity Bitmask │ Fields │ +/// │ (8-byte aligned)│ (8-byte words) │ +/// └─────────────────┴─────────────────────┘ +/// ``` +/// +/// For example, given the schema (Int8, Float32, Int64) with a null-free tuple +/// +/// Encoding the tuple (1, 3.14, 42) +/// +/// Requires 24 bytes (3 fields * 8 bytes each): +/// +/// ```plaintext +/// ┌──────────────────────┬──────────────────────┬──────────────────────┐ +/// │ 0x01 │ 0x4048F5C3 │ 0x0000002A │ +/// └──────────────────────┴──────────────────────┴──────────────────────┘ +/// 0 8 16 24 +/// ``` +/// +/// If the schema allows null values and the tuple is (1, NULL, 42) +/// +/// Encoding the tuple requires 32 bytes (1 * 8 bytes for the null bit set + 3 fields * 8 bytes each): +/// +/// ```plaintext +/// ┌──────────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┐ +/// │ 0b00000101 │ 0x01 │ 0x00000000 │ 0x0000002A │ +/// │ (7 bytes padding after) │ │ │ │ +/// └──────────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┘ +/// 0 8 16 24 32 +/// ``` #[derive(Debug, Clone)] pub struct RowLayout { - /// Type of the layout - row_type: RowType, /// If a row is null free according to its schema pub(crate) null_free: bool, /// The number of bytes used to store null bits for each field. @@ -93,27 +83,20 @@ pub struct RowLayout { impl RowLayout { /// new - pub fn new(schema: &Schema, row_type: RowType) -> Self { + pub fn new(schema: &Schema) -> Self { assert!( - row_supported(schema, row_type), - "{row_type:?}Row with {schema:?} not supported yet.", + row_supported(schema), + "Row with {schema:?} not supported yet.", ); let null_free = schema_null_free(schema); let field_count = schema.fields().len(); let null_width = if null_free { 0 } else { - match row_type { - RowType::Compact => ceil(field_count, 8), - RowType::WordAligned => round_upto_power_of_2(ceil(field_count, 8), 8), - } - }; - let (field_offsets, values_width) = match row_type { - RowType::Compact => compact_offsets(null_width, schema), - RowType::WordAligned => word_aligned_offsets(null_width, schema), + round_upto_power_of_2(ceil(field_count, 8), 8) }; + let (field_offsets, values_width) = word_aligned_offsets(null_width, schema); Self { - row_type, null_free, null_width, values_width, @@ -129,36 +112,6 @@ impl RowLayout { } } -/// Get relative offsets for each field and total width for values -fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { - let mut offsets = vec![]; - let mut offset = null_width; - for f in schema.fields() { - offsets.push(offset); - offset += compact_type_width(f.data_type()); - } - (offsets, offset - null_width) -} - -fn var_length(dt: &DataType) -> bool { - use DataType::*; - matches!(dt, Utf8 | Binary) -} - -fn compact_type_width(dt: &DataType) -> usize { - use DataType::*; - if var_length(dt) { - return std::mem::size_of::(); - } - match dt { - Boolean | UInt8 | Int8 => 1, - UInt16 | Int16 => 2, - UInt32 | Int32 | Float32 | Date32 => 4, - UInt64 | Int64 | Float64 | Date64 => 8, - _ => unreachable!(), - } -} - fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { let mut offsets = vec![]; let mut offset = null_width; @@ -175,59 +128,17 @@ fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usiz (offsets, offset - null_width) } -/// Estimate row width based on schema -pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize { - let mut width = layout.fixed_part_width(); - if matches!(layout.row_type, RowType::WordAligned) { - return width; - } - for f in schema.fields() { - match f.data_type() { - DataType::Utf8 => width += UTF8_DEFAULT_SIZE, - DataType::Binary => width += BINARY_DEFAULT_SIZE, - _ => {} - } - } - round_upto_power_of_2(width, 8) -} - /// Return true of data in `schema` can be converted to raw-bytes /// based rows. /// /// Note all schemas can be supported in the row format -pub fn row_supported(schema: &Schema, row_type: RowType) -> bool { +pub fn row_supported(schema: &Schema) -> bool { schema .fields() .iter() - .all(|f| supported_type(f.data_type(), row_type)) -} - -fn supported_type(dt: &DataType, row_type: RowType) -> bool { - use DataType::*; - - match row_type { - RowType::Compact => { - matches!( - dt, - Boolean - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Int8 - | Int16 - | Int32 - | Int64 - | Float32 - | Float64 - | Date32 - | Date64 - | Utf8 - | Binary - ) - } - // only fixed length types are supported for fast in-place update. - RowType::WordAligned => { + .all(|f| { + let dt = f.data_type(); + use DataType::*; matches!( dt, Boolean @@ -245,6 +156,5 @@ fn supported_type(dt: &DataType, row_type: RowType) -> bool { | Date64 | Decimal128(_, _) ) - } - } + }) } diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs index aab929f2702e..d124e3988587 100644 --- a/datafusion/row/src/lib.rs +++ b/datafusion/row/src/lib.rs @@ -36,7 +36,6 @@ use arrow::datatypes::Schema; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; pub use layout::row_supported; -pub use layout::RowType; use std::sync::Arc; pub mod accessor; @@ -102,7 +101,6 @@ fn get_columns(mut arrays: Vec>) -> Vec { #[cfg(test)] mod tests { use super::*; - use crate::layout::RowType::{Compact, WordAligned}; use crate::reader::read_as_batch; use crate::writer::write_batch_unchecked; use arrow::record_batch::RecordBatch; @@ -111,33 +109,33 @@ mod tests { use DataType::*; macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => { + ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { paste::item! { #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); let a = $ARRAY::from($VEC); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); let a = $ARRAY::from(v); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -148,229 +146,89 @@ mod tests { fn_test_single_type!( BooleanArray, Boolean, - vec![Some(true), Some(false), None, Some(true), None], - Compact - ); - - fn_test_single_type!( - BooleanArray, - Boolean, - vec![Some(true), Some(false), None, Some(true), None], - WordAligned - ); - - fn_test_single_type!( - Int8Array, - Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(true), Some(false), None, Some(true), None] ); fn_test_single_type!( Int8Array, Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int16Array, Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int16Array, - Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int32Array, Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int32Array, - Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Int64Array, Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Int64Array, - Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt8Array, - UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt8Array, UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt16Array, UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - UInt16Array, - UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt32Array, - UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt32Array, UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - UInt64Array, - UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( UInt64Array, UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Float32Array, Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - Compact - ); - - fn_test_single_type!( - Float32Array, - Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - WordAligned + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] ); fn_test_single_type!( Float64Array, Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - Compact - ); - - fn_test_single_type!( - Float64Array, - Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], - WordAligned + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] ); fn_test_single_type!( Date32Array, Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact - ); - - fn_test_single_type!( - Date32Array, - Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - Date64Array, - Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); fn_test_single_type!( Date64Array, Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)], - WordAligned - ); - - fn_test_single_type!( - StringArray, - Utf8, - vec![Some("hello"), Some("world"), None, Some(""), Some("")], - Compact + vec![Some(5), Some(7), None, Some(0), Some(111)] ); #[test] #[should_panic(expected = "not supported yet")] - fn test_unsupported_word_aligned_type() { + fn test_unsupported_type() { let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"])); let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); let schema = batch.schema(); let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned); - } - - #[test] - fn test_single_binary() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); - let values: Vec> = - vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; - let a = BinaryArray::from_opt_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - fn test_single_binary_null_free() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); - let values: Vec<&[u8]> = vec![b"one", b"two", b"", b"three"]; - let a = BinaryArray::from_vec(values); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; 8192]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; - assert_eq!(batch, output_batch); - Ok(()) + write_batch_unchecked(&mut vector, 0, &batch, 0, schema); } #[test] @@ -380,16 +238,16 @@ mod tests { let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); let schema = batch.schema(); let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact); + write_batch_unchecked(&mut vector, 0, &batch, 0, schema); } #[test] #[should_panic(expected = "not supported yet")] fn test_unsupported_type_read() { let schema = - Arc::new(Schema::new(vec![Field::new("a", Decimal128(5, 2), false)])); + Arc::new(Schema::new(vec![Field::new("a", Utf8, false)])); let vector = vec![0; 1024]; let row_offsets = vec![0]; - read_as_batch(&vector, schema, &row_offsets, Compact).unwrap(); + read_as_batch(&vector, schema, &row_offsets).unwrap(); } } diff --git a/datafusion/row/src/reader.rs b/datafusion/row/src/reader.rs index a8dc8211f0c7..10c9896df70a 100644 --- a/datafusion/row/src/reader.rs +++ b/datafusion/row/src/reader.rs @@ -17,7 +17,7 @@ //! [`read_as_batch`] converts raw bytes to [`RecordBatch`] -use crate::layout::{RowLayout, RowType}; +use crate::layout::RowLayout; use crate::validity::{all_valid, NullBitsFormatter}; use crate::MutableRecordBatch; use arrow::array::*; @@ -47,11 +47,10 @@ pub fn read_as_batch( data: &[u8], schema: Arc, offsets: &[usize], - row_type: RowType, ) -> Result { let row_num = offsets.len(); let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema, row_type); + let mut row = RowReader::new(&schema); for offset in offsets.iter().take(row_num) { row.point_to(*offset, data); @@ -129,9 +128,9 @@ impl<'a> std::fmt::Debug for RowReader<'a> { impl<'a> RowReader<'a> { /// new - pub fn new(schema: &Schema, row_type: RowType) -> Self { + pub fn new(schema: &Schema) -> Self { Self { - layout: RowLayout::new(schema, row_type), + layout: RowLayout::new(schema), data: &[], base_offset: 0, } @@ -217,25 +216,6 @@ impl<'a> RowReader<'a> { get_idx!(i128, self, idx, 16) } - fn get_utf8(&self, idx: usize) -> &str { - self.assert_index_valid(idx); - let offset_size = self.get_u64(idx); - let offset = (offset_size >> 32) as usize; - let len = (offset_size & 0xffff_ffff) as usize; - let varlena_offset = self.base_offset + offset; - let bytes = &self.data[varlena_offset..varlena_offset + len]; - unsafe { std::str::from_utf8_unchecked(bytes) } - } - - fn get_binary(&self, idx: usize) -> &[u8] { - self.assert_index_valid(idx); - let offset_size = self.get_u64(idx); - let offset = (offset_size >> 32) as usize; - let len = (offset_size & 0xffff_ffff) as usize; - let varlena_offset = self.base_offset + offset; - &self.data[varlena_offset..varlena_offset + len] - } - fn_get_idx_opt!(bool); fn_get_idx_opt!(u8); fn_get_idx_opt!(u16); @@ -271,14 +251,6 @@ impl<'a> RowReader<'a> { None } } - - fn get_utf8_opt(&self, idx: usize) -> Option<&str> { - if self.is_valid_at(idx) { - Some(self.get_utf8(idx)) - } else { - None - } - } } /// Read the row currently pointed by RowWriter to the output columnar batch buffer @@ -339,31 +311,8 @@ fn_read_field!(f32, Float32Builder); fn_read_field!(f64, Float64Builder); fn_read_field!(date32, Date32Builder); fn_read_field!(date64, Date64Builder); -fn_read_field!(utf8, StringBuilder); fn_read_field!(decimal128, Decimal128Builder); -pub(crate) fn read_field_binary( - to: &mut Box, - col_idx: usize, - row: &RowReader, -) { - let to = to.as_any_mut().downcast_mut::().unwrap(); - if row.is_valid_at(col_idx) { - to.append_value(row.get_binary(col_idx)); - } else { - to.append_null(); - } -} - -pub(crate) fn read_field_binary_null_free( - to: &mut Box, - col_idx: usize, - row: &RowReader, -) { - let to = to.as_any_mut().downcast_mut::().unwrap(); - to.append_value(row.get_binary(col_idx)); -} - fn read_field( to: &mut Box, dt: &DataType, @@ -385,8 +334,6 @@ fn read_field( Float64 => read_field_f64(to, col_idx, row), Date32 => read_field_date32(to, col_idx, row), Date64 => read_field_date64(to, col_idx, row), - Utf8 => read_field_utf8(to, col_idx, row), - Binary => read_field_binary(to, col_idx, row), Decimal128(_, _) => read_field_decimal128(to, col_idx, row), _ => unimplemented!(), } @@ -413,8 +360,6 @@ fn read_field_null_free( Float64 => read_field_f64_null_free(to, col_idx, row), Date32 => read_field_date32_null_free(to, col_idx, row), Date64 => read_field_date64_null_free(to, col_idx, row), - Utf8 => read_field_utf8_null_free(to, col_idx, row), - Binary => read_field_binary_null_free(to, col_idx, row), Decimal128(_, _) => read_field_decimal128_null_free(to, col_idx, row), _ => unimplemented!(), } diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs index 7bf9ac0267b7..14ce6afe6832 100644 --- a/datafusion/row/src/writer.rs +++ b/datafusion/row/src/writer.rs @@ -17,17 +17,13 @@ //! [`RowWriter`] writes [`RecordBatch`]es to `Vec` to stitch attributes together -use crate::layout::{estimate_row_width, RowLayout, RowType}; +use crate::layout::RowLayout; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw}; -use datafusion_common::cast::{ - as_binary_array, as_date32_array, as_date64_array, as_decimal128_array, - as_string_array, -}; +use arrow::util::bit_util::{set_bit_raw, unset_bit_raw}; +use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array}; use datafusion_common::Result; -use std::cmp::max; use std::sync::Arc; /// Append batch from `row_idx` to `output` buffer start from `offset` @@ -40,9 +36,8 @@ pub fn write_batch_unchecked( batch: &RecordBatch, row_idx: usize, schema: Arc, - row_type: RowType, ) -> Vec { - let mut writer = RowWriter::new(&schema, row_type); + let mut writer = RowWriter::new(&schema); let mut current_offset = offset; let mut offsets = vec![]; let columns = batch.columns(); @@ -62,9 +57,8 @@ pub fn write_batch_unchecked( pub fn bench_write_batch( batches: &[Vec], schema: Arc, - row_type: RowType, ) -> Result> { - let mut writer = RowWriter::new(&schema, row_type); + let mut writer = RowWriter::new(&schema); let mut lengths = vec![]; for batch in batches.iter().flatten() { @@ -125,33 +119,24 @@ pub struct RowWriter { data: Vec, /// Length in bytes for the current tuple, 8-bytes word aligned. pub(crate) row_width: usize, - /// Length in bytes for `variable length data` part of the current tuple. - varlena_width: usize, - /// Current offset for the next variable length field to write to. - varlena_offset: usize, } impl RowWriter { /// New - pub fn new(schema: &Schema, row_type: RowType) -> Self { - let layout = RowLayout::new(schema, row_type); - let init_capacity = estimate_row_width(schema, &layout); - let varlena_offset = layout.fixed_part_width(); + pub fn new(schema: &Schema) -> Self { + let layout = RowLayout::new(schema); + let init_capacity = layout.fixed_part_width(); Self { layout, data: vec![0; init_capacity], - row_width: 0, - varlena_width: 0, - varlena_offset, + row_width: init_capacity, } } /// Reset the row writer state for new tuple pub fn reset(&mut self) { self.data.fill(0); - self.row_width = 0; - self.varlena_width = 0; - self.varlena_offset = self.layout.fixed_part_width(); + self.row_width = self.layout.fixed_part_width(); } #[inline] @@ -230,45 +215,6 @@ impl RowWriter { set_idx!(16, self, idx, value) } - fn set_offset_size(&mut self, idx: usize, size: u32) { - let offset_and_size: u64 = (self.varlena_offset as u64) << 32 | (size as u64); - self.set_u64(idx, offset_and_size); - } - - fn set_utf8(&mut self, idx: usize, value: &str) { - self.assert_index_valid(idx); - let bytes = value.as_bytes(); - let size = bytes.len(); - self.set_offset_size(idx, size as u32); - let varlena_offset = self.varlena_offset; - self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes); - self.varlena_offset += size; - self.varlena_width += size; - } - - fn set_binary(&mut self, idx: usize, value: &[u8]) { - self.assert_index_valid(idx); - let size = value.len(); - self.set_offset_size(idx, size as u32); - let varlena_offset = self.varlena_offset; - self.data[varlena_offset..varlena_offset + size].copy_from_slice(value); - self.varlena_offset += size; - self.varlena_width += size; - } - - fn current_width(&self) -> usize { - self.layout.fixed_part_width() + self.varlena_width - } - - /// End each row at 8-byte word boundary. - pub(crate) fn end_padding(&mut self) { - let payload_width = self.current_width(); - self.row_width = round_upto_power_of_2(payload_width, 8); - if self.data.len() < self.row_width { - self.data.resize(self.row_width, 0); - } - } - /// Get raw bytes pub fn get_row(&self) -> &[u8] { &self.data[0..self.row_width] @@ -298,7 +244,6 @@ pub fn write_row( } } - row_writer.end_padding(); row_writer.row_width } @@ -350,36 +295,6 @@ pub(crate) fn write_field_date64( to.set_date64(col_idx, from.value(row_idx)); } -pub(crate) fn write_field_utf8( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_string_array(from).unwrap(); - let s = from.value(row_idx); - let new_width = to.current_width() + s.as_bytes().len(); - if new_width > to.data.len() { - to.data.resize(max(to.data.capacity(), new_width), 0); - } - to.set_utf8(col_idx, s); -} - -pub(crate) fn write_field_binary( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_binary_array(from).unwrap(); - let s = from.value(row_idx); - let new_width = to.current_width() + s.len(); - if new_width > to.data.len() { - to.data.resize(max(to.data.capacity(), new_width), 0); - } - to.set_binary(col_idx, s); -} - pub(crate) fn write_field_decimal128( to: &mut RowWriter, from: &Arc, @@ -412,8 +327,6 @@ fn write_field( Float64 => write_field_f64(row, col, col_idx, row_idx), Date32 => write_field_date32(row, col, col_idx, row_idx), Date64 => write_field_date64(row, col, col_idx, row_idx), - Utf8 => write_field_utf8(row, col, col_idx, row_idx), - Binary => write_field_binary(row, col, col_idx, row_idx), Decimal128(_, _) => write_field_decimal128(row, col, col_idx, row_idx), _ => unimplemented!(), } From aebb211836d0d4281930d39d9b5017ab4232bb25 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 16 Apr 2023 01:07:46 +0800 Subject: [PATCH 2/4] rm --- datafusion/row/src/jit/writer.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs index 8a787f9c051f..0b8635770d1f 100644 --- a/datafusion/row/src/jit/writer.rs +++ b/datafusion/row/src/jit/writer.rs @@ -195,8 +195,6 @@ fn write_typed_field_stmt( Float64 => b.call_stmt("write_field_f64", params)?, Date32 => b.call_stmt("write_field_date32", params)?, Date64 => b.call_stmt("write_field_date64", params)?, - Utf8 => b.call_stmt("write_field_utf8", params)?, - Binary => b.call_stmt("write_field_binary", params)?, _ => unimplemented!(), } Ok(()) From ff1646b33bc7a44f76f04f4ed54c8991dd7ddee7 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 16 Apr 2023 01:09:13 +0800 Subject: [PATCH 3/4] fmt --- datafusion/row/src/layout.rs | 45 +++++++++++++++++------------------- datafusion/row/src/lib.rs | 3 +-- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs index 3f8a812a0664..71471327536a 100644 --- a/datafusion/row/src/layout.rs +++ b/datafusion/row/src/layout.rs @@ -133,28 +133,25 @@ fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usiz /// /// Note all schemas can be supported in the row format pub fn row_supported(schema: &Schema) -> bool { - schema - .fields() - .iter() - .all(|f| { - let dt = f.data_type(); - use DataType::*; - matches!( - dt, - Boolean - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Int8 - | Int16 - | Int32 - | Int64 - | Float32 - | Float64 - | Date32 - | Date64 - | Decimal128(_, _) - ) - }) + schema.fields().iter().all(|f| { + let dt = f.data_type(); + use DataType::*; + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + | Decimal128(_, _) + ) + }) } diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs index d124e3988587..6d00bb44c811 100644 --- a/datafusion/row/src/lib.rs +++ b/datafusion/row/src/lib.rs @@ -244,8 +244,7 @@ mod tests { #[test] #[should_panic(expected = "not supported yet")] fn test_unsupported_type_read() { - let schema = - Arc::new(Schema::new(vec![Field::new("a", Utf8, false)])); + let schema = Arc::new(Schema::new(vec![Field::new("a", Utf8, false)])); let vector = vec![0; 1024]; let row_offsets = vec![0]; read_as_batch(&vector, schema, &row_offsets).unwrap(); From 21a97dc1d56fc577ae6905bfc6a1506ec47b875e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 16 Apr 2023 10:01:27 +0800 Subject: [PATCH 4/4] clippy --- datafusion/row/src/jit/reader.rs | 2 +- datafusion/row/src/jit/writer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/row/src/jit/reader.rs b/datafusion/row/src/jit/reader.rs index ffb0aacb5a1e..5d91de7be380 100644 --- a/datafusion/row/src/jit/reader.rs +++ b/datafusion/row/src/jit/reader.rs @@ -94,7 +94,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, read_field_f32_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_f64_null_free, reader_param.clone(), None); reg_fn!(asm, read_field_date32_null_free, reader_param.clone(), None); - reg_fn!(asm, read_field_date64_null_free, reader_param.clone(), None); + reg_fn!(asm, read_field_date64_null_free, reader_param, None); Ok(()) } diff --git a/datafusion/row/src/jit/writer.rs b/datafusion/row/src/jit/writer.rs index 0b8635770d1f..cb96f34fab3d 100644 --- a/datafusion/row/src/jit/writer.rs +++ b/datafusion/row/src/jit/writer.rs @@ -118,7 +118,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> { reg_fn!(asm, write_field_f32, reader_param.clone(), None); reg_fn!(asm, write_field_f64, reader_param.clone(), None); reg_fn!(asm, write_field_date32, reader_param.clone(), None); - reg_fn!(asm, write_field_date64, reader_param.clone(), None); + reg_fn!(asm, write_field_date64, reader_param, None); Ok(()) }