Skip to content

Commit

Permalink
feat: lazy initialize vector builder on write (#3210)
Browse files Browse the repository at this point in the history
* feat: lazy initialize vector builder on write

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* avoid using ConstantVector

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify expression

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/metric-engine/src/engine/create.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
  • Loading branch information
waynexia and v0y4g3r committed Jan 22, 2024
1 parent de13de1 commit 278e4c8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 9 deletions.
9 changes: 8 additions & 1 deletion src/datatypes/src/vectors.rs
Expand Up @@ -211,9 +211,16 @@ pub trait MutableVector: Send + Sync {
});
}

// Push null to this mutable vector.
/// Push null to this mutable vector.
fn push_null(&mut self);

/// Push nulls to this mutable vector.
fn push_nulls(&mut self, num_nulls: usize) {
for _ in 0..num_nulls {
self.push_null();
}
}

/// Extend this mutable vector by slice of `vector`.
///
/// Returns error if data types mismatch.
Expand Down
76 changes: 68 additions & 8 deletions src/mito2/src/memtable/time_series.rs
Expand Up @@ -28,7 +28,7 @@ use datafusion_expr::ColumnarValue;
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::DataType;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::value::ValueRef;
use datatypes::vectors::{
Expand All @@ -53,7 +53,7 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

/// Initial vector builder capacity.
const INITIAL_BUILDER_CAPACITY: usize = 32;
const INITIAL_BUILDER_CAPACITY: usize = 0;

/// Builder to build [TimeSeriesMemtable].
#[derive(Debug, Default)]
Expand Down Expand Up @@ -607,7 +607,8 @@ struct ValueBuilder {
timestamp: Box<dyn MutableVector>,
sequence: UInt64VectorBuilder,
op_type: UInt8VectorBuilder,
fields: Vec<Box<dyn MutableVector>>,
fields: Vec<Option<Box<dyn MutableVector>>>,
field_types: Vec<ConcreteDataType>,
}

impl ValueBuilder {
Expand All @@ -620,16 +621,18 @@ impl ValueBuilder {
let sequence = UInt64VectorBuilder::with_capacity(capacity);
let op_type = UInt8VectorBuilder::with_capacity(capacity);

let fields = region_metadata
let field_types = region_metadata
.field_columns()
.map(|c| c.column_schema.data_type.create_mutable_vector(capacity))
.collect();
.map(|c| c.column_schema.data_type.clone())
.collect::<Vec<_>>();
let fields = (0..field_types.len()).map(|_| None).collect();

Self {
timestamp,
sequence,
op_type,
fields,
field_types,
}
}

Expand All @@ -640,8 +643,20 @@ impl ValueBuilder {
self.timestamp.push_value_ref(ts);
self.sequence.push_value_ref(ValueRef::UInt64(sequence));
self.op_type.push_value_ref(ValueRef::UInt8(op_type));
let num_rows = self.timestamp.len();
for (idx, field_value) in fields.into_iter().enumerate() {
self.fields[idx].push_value_ref(field_value);
if !field_value.is_null() || self.fields[idx].is_some() {
self.fields[idx]
.get_or_insert_with(|| {
// lazy initialize on first non-null value
let mut mutable_vector =
self.field_types[idx].create_mutable_vector(num_rows);
// fill previous rows with nulls
mutable_vector.push_nulls(num_rows - 1);
mutable_vector
})
.push_value_ref(field_value);
}
}
}

Expand Down Expand Up @@ -726,10 +741,20 @@ impl Values {

impl From<ValueBuilder> for Values {
fn from(mut value: ValueBuilder) -> Self {
let num_rows = value.len();
let fields = value
.fields
.iter_mut()
.map(|v| v.to_vector())
.enumerate()
.map(|(i, v)| {
if let Some(v) = v {
v.to_vector()
} else {
let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
single_null.push_nulls(num_rows);
single_null.to_vector()
}
})
.collect::<Vec<_>>();
let sequence = Arc::new(value.sequence.finish());
let op_type = Arc::new(value.op_type.finish());
Expand Down Expand Up @@ -863,6 +888,41 @@ mod tests {
assert_eq!(1, series.frozen.len());
}

#[test]
fn test_series_with_nulls() {
let region_metadata = schema_for_test();
let mut series = Series::new(&region_metadata);
// col1: NULL 1 2 3
// col2: NULL NULL 10.2 NULL
series.push(
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Null, ValueRef::Null],
);
series.push(
ts_value_ref(1),
0,
OpType::Put,
vec![ValueRef::Int64(1), ValueRef::Null],
);
series.push(ts_value_ref(1), 2, OpType::Put, field_value_ref(2, 10.2));
series.push(
ts_value_ref(1),
3,
OpType::Put,
vec![ValueRef::Int64(2), ValueRef::Null],
);
assert_eq!(4, series.active.timestamp.len());
assert_eq!(0, series.frozen.len());

let values = series.compact(&region_metadata).unwrap();
assert_eq!(values.fields[0].null_count(), 1);
assert_eq!(values.fields[1].null_count(), 3);
assert_eq!(0, series.active.timestamp.len());
assert_eq!(1, series.frozen.len());
}

fn check_value(batch: &Batch, expect: Vec<Vec<Value>>) {
let ts_len = batch.timestamps().len();
assert_eq!(batch.sequences().len(), ts_len);
Expand Down

0 comments on commit 278e4c8

Please sign in to comment.