diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index a0604ee8170..2170593ef59 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -211,9 +211,16 @@ pub trait MutableVector: Send + Sync { }); } - // Push null to this mutable vector. + /// Push null to this mutable vector. fn push_null(&mut self); + /// Push nulls to this mutable vector. + fn push_nulls(&mut self, num_nulls: usize) { + for _ in 0..num_nulls { + self.push_null(); + } + } + /// Extend this mutable vector by slice of `vector`. /// /// Returns error if data types mismatch. diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 3b9b713a10e..afebe927fef 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -28,7 +28,7 @@ use datafusion_expr::ColumnarValue; use datatypes::arrow; use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::data_type::DataType; +use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; use datatypes::vectors::{ @@ -53,7 +53,7 @@ use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Initial vector builder capacity. -const INITIAL_BUILDER_CAPACITY: usize = 32; +const INITIAL_BUILDER_CAPACITY: usize = 0; /// Builder to build [TimeSeriesMemtable]. #[derive(Debug, Default)] @@ -607,7 +607,8 @@ struct ValueBuilder { timestamp: Box, sequence: UInt64VectorBuilder, op_type: UInt8VectorBuilder, - fields: Vec>, + fields: Vec>>, + field_types: Vec, } impl ValueBuilder { @@ -620,16 +621,18 @@ impl ValueBuilder { let sequence = UInt64VectorBuilder::with_capacity(capacity); let op_type = UInt8VectorBuilder::with_capacity(capacity); - let fields = region_metadata + let field_types = region_metadata .field_columns() - .map(|c| c.column_schema.data_type.create_mutable_vector(capacity)) - .collect(); + .map(|c| c.column_schema.data_type.clone()) + .collect::>(); + let fields = (0..field_types.len()).map(|_| None).collect(); Self { timestamp, sequence, op_type, fields, + field_types, } } @@ -640,8 +643,20 @@ impl ValueBuilder { self.timestamp.push_value_ref(ts); self.sequence.push_value_ref(ValueRef::UInt64(sequence)); self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + let num_rows = self.timestamp.len(); for (idx, field_value) in fields.into_iter().enumerate() { - self.fields[idx].push_value_ref(field_value); + if !field_value.is_null() || self.fields[idx].is_some() { + self.fields[idx] + .get_or_insert_with(|| { + // lazy initialize on first non-null value + let mut mutable_vector = + self.field_types[idx].create_mutable_vector(num_rows); + // fill previous rows with nulls + mutable_vector.push_nulls(num_rows - 1); + mutable_vector + }) + .push_value_ref(field_value); + } } } @@ -726,10 +741,20 @@ impl Values { impl From for Values { fn from(mut value: ValueBuilder) -> Self { + let num_rows = value.len(); let fields = value .fields .iter_mut() - .map(|v| v.to_vector()) + .enumerate() + .map(|(i, v)| { + if let Some(v) = v { + v.to_vector() + } else { + let mut single_null = value.field_types[i].create_mutable_vector(num_rows); + single_null.push_nulls(num_rows); + single_null.to_vector() + } + }) .collect::>(); let sequence = Arc::new(value.sequence.finish()); let op_type = Arc::new(value.op_type.finish()); @@ -863,6 +888,41 @@ mod tests { assert_eq!(1, series.frozen.len()); } + #[test] + fn test_series_with_nulls() { + let region_metadata = schema_for_test(); + let mut series = Series::new(®ion_metadata); + // col1: NULL 1 2 3 + // col2: NULL NULL 10.2 NULL + series.push( + ts_value_ref(1), + 0, + OpType::Put, + vec![ValueRef::Null, ValueRef::Null], + ); + series.push( + ts_value_ref(1), + 0, + OpType::Put, + vec![ValueRef::Int64(1), ValueRef::Null], + ); + series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2)); + series.push( + ts_value_ref(1), + 3, + OpType::Put, + vec![ValueRef::Int64(2), ValueRef::Null], + ); + assert_eq!(4, series.active.timestamp.len()); + assert_eq!(0, series.frozen.len()); + + let values = series.compact(®ion_metadata).unwrap(); + assert_eq!(values.fields[0].null_count(), 1); + assert_eq!(values.fields[1].null_count(), 3); + assert_eq!(0, series.active.timestamp.len()); + assert_eq!(1, series.frozen.len()); + } + fn check_value(batch: &Batch, expect: Vec>) { let ts_len = batch.timestamps().len(); assert_eq!(batch.sequences().len(), ts_len);