From 488d3e89a0f06b4cf5c4904899aa9bbfce8ac592 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 21 Jan 2024 15:30:59 +0800 Subject: [PATCH 1/6] feat: lazy initialize vector builder on write Signed-off-by: Ruihang Xia --- src/datatypes/src/vectors.rs | 9 ++- src/metric-engine/src/engine/create.rs | 1 + src/mito2/src/memtable/time_series.rs | 86 +++++++++++++++++++++++--- 3 files changed, 86 insertions(+), 10 deletions(-) diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index a0604ee8170..2170593ef59 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -211,9 +211,16 @@ pub trait MutableVector: Send + Sync { }); } - // Push null to this mutable vector. + /// Push null to this mutable vector. fn push_null(&mut self); + /// Push nulls to this mutable vector. + fn push_nulls(&mut self, num_nulls: usize) { + for _ in 0..num_nulls { + self.push_null(); + } + } + /// Extend this mutable vector by slice of `vector`. /// /// Returns error if data types mismatch. diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index 48a70835fd4..e47f39ced5d 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -363,6 +363,7 @@ impl MetricEngineInner { .for_each(|metadata| { if metadata.semantic_type == SemanticType::Tag { metadata.semantic_type = SemanticType::Field; + metadata.column_schema.is_nullable(); } }); diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 3b9b713a10e..b14407d61c3 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -28,11 +28,11 @@ use datafusion_expr::ColumnarValue; use datatypes::arrow; use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::record_batch::RecordBatch; -use datatypes::data_type::DataType; +use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; use datatypes::vectors::{ - Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, + ConstantVector, Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, }; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -53,7 +53,7 @@ use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; /// Initial vector builder capacity. -const INITIAL_BUILDER_CAPACITY: usize = 32; +const INITIAL_BUILDER_CAPACITY: usize = 0; /// Builder to build [TimeSeriesMemtable]. #[derive(Debug, Default)] @@ -607,7 +607,9 @@ struct ValueBuilder { timestamp: Box, sequence: UInt64VectorBuilder, op_type: UInt8VectorBuilder, - fields: Vec>, + fields: Vec>>, + field_types: Vec, + region_metadata: RegionMetadataRef, } impl ValueBuilder { @@ -620,16 +622,19 @@ impl ValueBuilder { let sequence = UInt64VectorBuilder::with_capacity(capacity); let op_type = UInt8VectorBuilder::with_capacity(capacity); - let fields = region_metadata + let field_types = region_metadata .field_columns() - .map(|c| c.column_schema.data_type.create_mutable_vector(capacity)) - .collect(); + .map(|c| c.column_schema.data_type.clone()) + .collect::>(); + let fields = (0..field_types.len()).map(|_| None).collect(); Self { timestamp, sequence, op_type, fields, + field_types, + region_metadata: region_metadata.clone(), } } @@ -640,8 +645,26 @@ impl ValueBuilder { self.timestamp.push_value_ref(ts); self.sequence.push_value_ref(ValueRef::UInt64(sequence)); self.op_type.push_value_ref(ValueRef::UInt8(op_type)); + let num_rows = self.timestamp.len(); for (idx, field_value) in fields.into_iter().enumerate() { - self.fields[idx].push_value_ref(field_value); + if !field_value.is_null() || self.fields[idx].is_some() { + self.fields[idx] + .get_or_insert_with(|| { + // lazy initialize on first non-null value + let mut mutable_vector = self + .region_metadata + .field_columns() + .nth(idx) + .unwrap() + .column_schema + .data_type + .create_mutable_vector(num_rows); + // fill previous rows with nulls + mutable_vector.push_nulls(num_rows - 1); + mutable_vector + }) + .push_value_ref(field_value); + } } } @@ -726,10 +749,20 @@ impl Values { impl From for Values { fn from(mut value: ValueBuilder) -> Self { + let num_rows = value.len(); let fields = value .fields .iter_mut() - .map(|v| v.to_vector()) + .enumerate() + .map(|(i, v)| { + if let Some(v) = v { + v.to_vector() + } else { + let mut single_null = value.field_types[i].create_mutable_vector(1); + single_null.push_null(); + Arc::new(ConstantVector::new(single_null.to_vector(), num_rows)) + } + }) .collect::>(); let sequence = Arc::new(value.sequence.finish()); let op_type = Arc::new(value.op_type.finish()); @@ -863,6 +896,41 @@ mod tests { assert_eq!(1, series.frozen.len()); } + #[test] + fn test_series_with_nulls() { + let region_metadata = schema_for_test(); + let mut series = Series::new(®ion_metadata); + // col1: NULL 1 2 3 + // col2: NULL NULL 10.2 NULL + series.push( + ts_value_ref(1), + 0, + OpType::Put, + vec![ValueRef::Null, ValueRef::Null], + ); + series.push( + ts_value_ref(1), + 0, + OpType::Put, + vec![ValueRef::Int64(1), ValueRef::Null], + ); + series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2)); + series.push( + ts_value_ref(1), + 3, + OpType::Put, + vec![ValueRef::Int64(2), ValueRef::Null], + ); + assert_eq!(4, series.active.timestamp.len()); + assert_eq!(0, series.frozen.len()); + + let values = series.compact(®ion_metadata).unwrap(); + assert_eq!(values.fields[0].null_count(), 1); + assert_eq!(values.fields[1].null_count(), 3); + assert_eq!(0, series.active.timestamp.len()); + assert_eq!(1, series.frozen.len()); + } + fn check_value(batch: &Batch, expect: Vec>) { let ts_len = batch.timestamps().len(); assert_eq!(batch.sequences().len(), ts_len); From 5d2d34d95903086c99fc10c8220084e3a129a6ec Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 21 Jan 2024 15:50:49 +0800 Subject: [PATCH 2/6] avoid using ConstantVector Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/time_series.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index b14407d61c3..9ccdf518cbf 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -758,9 +758,9 @@ impl From for Values { if let Some(v) = v { v.to_vector() } else { - let mut single_null = value.field_types[i].create_mutable_vector(1); - single_null.push_null(); - Arc::new(ConstantVector::new(single_null.to_vector(), num_rows)) + let mut single_null = value.field_types[i].create_mutable_vector(num_rows); + single_null.push_nulls(num_rows); + single_null.to_vector() } }) .collect::>(); From d277cd0966924f79e1f2b88300018283c3153ec2 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 21 Jan 2024 15:56:01 +0800 Subject: [PATCH 3/6] fix clippy Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/time_series.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 9ccdf518cbf..7beb9d34174 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -32,7 +32,7 @@ use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; use datatypes::value::ValueRef; use datatypes::vectors::{ - ConstantVector, Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, + Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, }; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; From 4b0615691be62f31790f224b34f7f021e704b09e Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 22 Jan 2024 14:32:55 +0800 Subject: [PATCH 4/6] simplify expression Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/time_series.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 7beb9d34174..30db601cef7 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -651,14 +651,8 @@ impl ValueBuilder { self.fields[idx] .get_or_insert_with(|| { // lazy initialize on first non-null value - let mut mutable_vector = self - .region_metadata - .field_columns() - .nth(idx) - .unwrap() - .column_schema - .data_type - .create_mutable_vector(num_rows); + let mut mutable_vector = + self.field_types[idx].create_mutable_vector(num_rows); // fill previous rows with nulls mutable_vector.push_nulls(num_rows - 1); mutable_vector From a917e2614591aac178e4735e2796f92d79f3b092 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 22 Jan 2024 14:34:09 +0800 Subject: [PATCH 5/6] Update src/metric-engine/src/engine/create.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- src/metric-engine/src/engine/create.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/metric-engine/src/engine/create.rs b/src/metric-engine/src/engine/create.rs index e47f39ced5d..48a70835fd4 100644 --- a/src/metric-engine/src/engine/create.rs +++ b/src/metric-engine/src/engine/create.rs @@ -363,7 +363,6 @@ impl MetricEngineInner { .for_each(|metadata| { if metadata.semantic_type == SemanticType::Tag { metadata.semantic_type = SemanticType::Field; - metadata.column_schema.is_nullable(); } }); From 0c2b2e8eb760b2eb9824d0ac80481e0f14597be7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 22 Jan 2024 14:43:01 +0800 Subject: [PATCH 6/6] fix clippy Signed-off-by: Ruihang Xia --- src/mito2/src/memtable/time_series.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 30db601cef7..afebe927fef 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -609,7 +609,6 @@ struct ValueBuilder { op_type: UInt8VectorBuilder, fields: Vec>>, field_types: Vec, - region_metadata: RegionMetadataRef, } impl ValueBuilder { @@ -634,7 +633,6 @@ impl ValueBuilder { op_type, fields, field_types, - region_metadata: region_metadata.clone(), } }