Skip to content

Commit

Permalink
chore: add parameter to control pk_index replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 21, 2024
1 parent 5428d83 commit ac56c1f
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datatypes::arrow;
use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array};
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::data_type::DataType;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::schema::ColumnSchema;
use datatypes::types::TimestampType;
use datatypes::vectors::{
Expand Down Expand Up @@ -153,6 +153,7 @@ impl DataBuffer {
pk_weights,
true,
true,
true,
)?;
DataBufferIter::new(batch)
}
Expand Down Expand Up @@ -191,12 +192,16 @@ impl LazyMutableVectorBuilder {
}

/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
/// `keep_data`: whether to keep the original data inside `DataBuffer`.
/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`.
/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight.
fn data_buffer_to_record_batches(
schema: SchemaRef,
buffer: &mut DataBuffer,
pk_weights: &[u16],
keep_data: bool,
dedup: bool,
replace_pk_index: bool,
) -> Result<RecordBatch> {
let num_rows = buffer.ts_builder.len();

Expand Down Expand Up @@ -227,12 +232,17 @@ fn data_buffer_to_record_batches(

let mut columns = Vec::with_capacity(4 + buffer.field_builders.len());

// replace pk index values with pk weights.
let weights_of_pks = Arc::new(UInt16Array::from_iter_values(
rows.into_iter().map(|(_, key)| key.pk_weight),
)) as Arc<_>;
let pk_array = if replace_pk_index {
// replace pk index values with pk weights.
Arc::new(UInt16Array::from_iter_values(
rows.into_iter().map(|(_, key)| key.pk_weight),
)) as Arc<_>
} else {
arrow::compute::take(&pk_index_v.to_arrow_array(), &indices_to_take, None)
.context(error::ComputeArrowSnafu)?
};

columns.push(weights_of_pks);
columns.push(pk_array);

columns.push(
arrow::compute::take(&ts_v.to_arrow_array(), &indices_to_take, None)
Expand Down Expand Up @@ -503,6 +513,7 @@ impl<'a> DataPartEncoder<'a> {
self.pk_weights,
false,
true,
true,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?;
Expand Down Expand Up @@ -566,7 +577,8 @@ mod tests {
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], keep_data, true, true)
.unwrap();

assert_eq!(
vec![1, 2, 1, 2],
Expand Down Expand Up @@ -630,7 +642,7 @@ mod tests {
assert_eq!(4, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, &[0, 1], true, true, true).unwrap();

assert_eq!(3, batch.num_rows());
assert_eq!(
Expand Down Expand Up @@ -684,7 +696,7 @@ mod tests {
assert_eq!(5, buffer.num_rows());
let schema = memtable_schema_to_encoded_schema(&meta);
let batch =
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false).unwrap();
data_buffer_to_record_batches(schema, &mut buffer, &[3, 1], true, false, true).unwrap();

assert_eq!(
vec![1, 1, 3, 3, 3],
Expand Down

0 comments on commit ac56c1f

Please sign in to comment.