Skip to content

Commit

Permalink
feat: employ sparse key encoding for shard lookup (#3410)
Browse files Browse the repository at this point in the history
* feat: employ short key encoding for shard lookup

* fix: license

* chore: simplify code

* refactor: only enable sparse encoding to speed lookup on metric engine

* fix: names
  • Loading branch information
v0y4g3r committed Mar 1, 2024
1 parent d4a54a0 commit 376409b
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,10 @@ uuid.workspace = true
[dev-dependencies]
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
rand.workspace = true

[[bench]]
name = "bench_merge_tree"
harness = false
21 changes: 21 additions & 0 deletions src/mito2/benches/bench_merge_tree.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod merge_tree_bench;

use criterion::criterion_main;

criterion_main! {
merge_tree_bench::benches
}
36 changes: 36 additions & 0 deletions src/mito2/benches/merge_tree_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::{criterion_group, criterion_main, Criterion};
use mito2::memtable::merge_tree::{MergeTreeConfig, MergeTreeMemtable};
use mito2::memtable::Memtable;
use mito2::test_util::memtable_util;

fn bench_merge_tree_memtable(c: &mut Criterion) {
let metadata = memtable_util::metadata_with_primary_key(vec![1, 0], true);
let timestamps = (0..100).collect::<Vec<_>>();

let memtable = MergeTreeMemtable::new(1, metadata.clone(), None, &MergeTreeConfig::default());

let _ = c.bench_function("MergeTreeMemtable", |b| {
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
memtable.write(&kvs).unwrap();
});
});
}

criterion_group!(benches, bench_merge_tree_memtable);
criterion_main!(benches);
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl DataBuffer {
}

/// Writes a row to data buffer.
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
self.ts_builder.push_value_ref(kv.timestamp());
self.pk_index_builder.push(Some(pk_index));
self.sequence_builder.push(Some(kv.sequence()));
Expand Down Expand Up @@ -953,7 +953,7 @@ impl DataParts {
}

/// Writes a row into parts.
pub fn write_row(&mut self, pk_index: PkIndex, kv: KeyValue) {
pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
self.active.write_row(pk_index, kv)
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/merge_tree/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ mod tests {
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
buffer.write_row(pk_index, &kv);
}

*sequence += rows;
Expand Down
39 changes: 29 additions & 10 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ impl Partition {
/// Writes to the partition with a primary key.
pub fn write_with_key(
&self,
primary_key: &[u8],
primary_key: &mut Vec<u8>,
row_codec: &McmpRowCodec,
key_value: KeyValue,
re_encode: bool,
metrics: &mut WriteMetrics,
) -> Result<()> {
let mut inner = self.inner.write().unwrap();
Expand All @@ -76,17 +78,30 @@ impl Partition {

// Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
// Key already in shards.
inner.write_to_shard(pk_id, key_value);
inner.write_to_shard(pk_id, &key_value);
inner.num_rows += 1;
return Ok(());
}

// Write to the shard builder.
inner
.shard_builder
.write_with_key(primary_key, key_value, metrics);
inner.num_rows += 1;
// Key does not yet exist in shard or builder, encode and insert the full primary key.
if re_encode {
// `primary_key` is sparse, re-encode the full primary key.
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
};

inner.num_rows += 1;
Ok(())
}

Expand All @@ -102,7 +117,7 @@ impl Partition {
shard_id: 0,
pk_index: 0,
};
inner.shards[0].write_with_pk_id(pk_id, key_value);
inner.shards[0].write_with_pk_id(pk_id, &key_value);
inner.num_rows += 1;
}

Expand Down Expand Up @@ -583,7 +598,11 @@ impl Inner {
self.pk_to_pk_id.get(primary_key).copied()
}

fn write_to_shard(&mut self, pk_id: PkId, key_value: KeyValue) {
fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) {
if pk_id.shard_id == self.shard_builder.current_shard_id() {
self.shard_builder.write_with_pk_id(pk_id, key_value);
return;
}
for shard in &mut self.shards {
if shard.shard_id == pk_id.shard_id {
shard.write_with_pk_id(pk_id, key_value);
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Shard {
}

/// Writes a key value into the shard.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: KeyValue) {
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
debug_assert_eq!(self.shard_id, pk_id.shard_id);

self.data_parts.write_row(pk_id.pk_index, key_value);
Expand Down Expand Up @@ -417,7 +417,7 @@ mod tests {
shard_id: shard.shard_id,
pk_index: *pk_index,
};
shard.write_with_pk_id(pk_id, kv);
shard.write_with_pk_id(pk_id, &kv);
}
}
assert!(!shard.is_empty());
Expand Down
23 changes: 19 additions & 4 deletions src/mito2/src/memtable/merge_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,26 @@ impl ShardBuilder {
}
}

/// Write a key value with given pk_index (caller must ensure the pk_index exist in dict_builder)
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
assert_eq!(self.current_shard_id, pk_id.shard_id);
self.data_buffer.write_row(pk_id.pk_index, key_value);
}

/// Write a key value with its encoded primary key.
pub fn write_with_key(&mut self, key: &[u8], key_value: KeyValue, metrics: &mut WriteMetrics) {
pub fn write_with_key(
&mut self,
primary_key: &[u8],
key_value: &KeyValue,
metrics: &mut WriteMetrics,
) -> PkId {
// Safety: we check whether the builder need to freeze before.
let pk_index = self.dict_builder.insert_key(key, metrics);
let pk_index = self.dict_builder.insert_key(primary_key, metrics);
self.data_buffer.write_row(pk_index, key_value);
PkId {
shard_id: self.current_shard_id,
pk_index,
}
}

/// Returns true if the builder need to freeze.
Expand Down Expand Up @@ -261,7 +276,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, kv, &mut metrics);
shard_builder.write_with_key(&key, &kv, &mut metrics);
}
}
let shard = shard_builder
Expand All @@ -283,7 +298,7 @@ mod tests {
for key_values in &input {
for kv in key_values.iter() {
let key = encode_key_by_kv(&kv);
shard_builder.write_with_key(&key, kv, &mut metrics);
shard_builder.write_with_key(&key, &kv, &mut metrics);
}
}

Expand Down
67 changes: 61 additions & 6 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ use api::v1::OpType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use snafu::ensure;
use datatypes::prelude::ValueRef;
use memcomparable::Serializer;
use serde::Serialize;
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::{PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{PrimaryKeyLengthMismatchSnafu, Result, SerializeFieldSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::metrics::WriteMetrics;
Expand All @@ -54,6 +57,7 @@ pub struct MergeTree {
is_partitioned: bool,
/// Manager to report size of the tree.
write_buffer_manager: Option<WriteBufferManagerRef>,
sparse_encoder: Arc<SparseEncoder>,
}

impl MergeTree {
Expand All @@ -69,6 +73,15 @@ impl MergeTree {
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let sparse_encoder = SparseEncoder {
fields: metadata
.primary_key_columns()
.map(|c| FieldWithId {
field: SortField::new(c.column_schema.data_type.clone()),
column_id: c.column_id,
})
.collect(),
};
let is_partitioned = Partition::has_multi_partitions(&metadata);

MergeTree {
Expand All @@ -78,6 +91,7 @@ impl MergeTree {
partitions: Default::default(),
is_partitioned,
write_buffer_manager,
sparse_encoder: Arc::new(sparse_encoder),
}
}

Expand Down Expand Up @@ -116,9 +130,15 @@ impl MergeTree {

// Encode primary key.
pk_buffer.clear();
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
if self.is_partitioned {
// Use sparse encoder for metric engine.
self.sparse_encoder
.encode_to_vec(kv.primary_keys(), pk_buffer)?;
} else {
self.row_codec.encode_to_vec(kv.primary_keys(), pk_buffer)?;
}

// Write rows with primary keys.
// Write rows with
self.write_with_key(pk_buffer, kv, metrics)?;
}

Expand Down Expand Up @@ -252,6 +272,7 @@ impl MergeTree {
partitions: RwLock::new(forked),
is_partitioned: self.is_partitioned,
write_buffer_manager: self.write_buffer_manager.clone(),
sparse_encoder: self.sparse_encoder.clone(),
}
}

Expand All @@ -262,14 +283,20 @@ impl MergeTree {

fn write_with_key(
&self,
primary_key: &[u8],
primary_key: &mut Vec<u8>,
key_value: KeyValue,
metrics: &mut WriteMetrics,
) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key);

partition.write_with_key(primary_key, key_value, metrics)
partition.write_with_key(
primary_key,
&self.row_codec,
key_value,
self.is_partitioned, // If tree is partitioned, re-encode is required to get the full primary key.
metrics,
)
}

fn write_no_key(&self, key_value: KeyValue) {
Expand Down Expand Up @@ -324,6 +351,34 @@ impl MergeTree {
}
}

struct FieldWithId {
field: SortField,
column_id: ColumnId,
}

struct SparseEncoder {
fields: Vec<FieldWithId>,
}

impl SparseEncoder {
fn encode_to_vec<'a, I>(&self, row: I, buffer: &mut Vec<u8>) -> Result<()>
where
I: Iterator<Item = ValueRef<'a>>,
{
let mut serializer = Serializer::new(buffer);
for (value, field) in row.zip(self.fields.iter()) {
if !value.is_null() {
field
.column_id
.serialize(&mut serializer)
.context(SerializeFieldSnafu)?;
field.field.serialize(&mut serializer, &value)?;
}
}
Ok(())
}
}

#[derive(Default)]
struct TreeIterMetrics {
iter_elapsed: Duration,
Expand Down

0 comments on commit 376409b

Please sign in to comment.