Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: freeze data buffer in shard #3468

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Default for MergeTreeConfig {

Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 32768,
data_freeze_threshold: 131072,
dedup: true,
fork_dictionary_bytes,
}
Expand Down
17 changes: 17 additions & 0 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,18 @@ impl DataParts {
self.active.write_row(pk_index, kv)
}

/// Returns the number of rows in the active buffer.
pub fn num_active_rows(&self) -> usize {
self.active.num_rows()
}

/// Freezes active buffer and creates a new active buffer.
pub fn freeze(&mut self) -> Result<()> {
let part = self.active.freeze(None, false)?;
self.frozen.push(part);
Ok(())
}

/// Reads data from all parts including active and frozen parts.
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
Expand All @@ -976,6 +988,11 @@ impl DataParts {
pub(crate) fn is_empty(&self) -> bool {
self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
}

#[cfg(test)]
pub(crate) fn frozen_len(&self) -> usize {
self.frozen.len()
}
}

pub struct DataPartsReaderBuilder {
Expand Down
61 changes: 43 additions & 18 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Partition {

// Finds key in shards, now we ensure one key only exists in one shard.
if let Some(pk_id) = inner.find_key_in_shards(primary_key) {
inner.write_to_shard(pk_id, &key_value);
inner.write_to_shard(pk_id, &key_value)?;
inner.num_rows += 1;
return Ok(());
}
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Partition {
}

/// Writes to the partition without a primary key.
pub fn write_no_key(&self, key_value: KeyValue) {
pub fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let mut inner = self.inner.write().unwrap();
// If no primary key, always write to the first shard.
debug_assert!(!inner.shards.is_empty());
Expand All @@ -117,12 +117,15 @@ impl Partition {
shard_id: 0,
pk_index: 0,
};
inner.shards[0].write_with_pk_id(pk_id, &key_value);
inner.shards[0].write_with_pk_id(pk_id, &key_value)?;
inner.num_rows += 1;

Ok(())
}

/// Scans data in the partition.
pub fn read(&self, mut context: ReadPartitionContext) -> Result<PartitionReader> {
let start = Instant::now();
let key_filter = if context.need_prune_key {
Some(PrimaryKeyFilter::new(
context.metadata.clone(),
Expand Down Expand Up @@ -150,7 +153,7 @@ impl Partition {
(builder_reader, shard_source)
};

context.metrics.num_shards = shard_reader_builders.len();
context.metrics.num_shards += shard_reader_builders.len();
let mut nodes = shard_reader_builders
.into_iter()
.map(|builder| {
Expand All @@ -161,7 +164,7 @@ impl Partition {
.collect::<Result<Vec<_>>>()?;

if let Some(builder) = builder_source {
context.metrics.read_builder = true;
context.metrics.num_builder += 1;
// Move the initialization of ShardBuilderReader out of read lock.
let shard_builder_reader =
builder.build(Some(&context.pk_weights), key_filter.clone())?;
Expand All @@ -172,8 +175,10 @@ impl Partition {
let merger = ShardMerger::try_new(nodes)?;
if self.dedup {
let source = DedupReader::try_new(merger)?;
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(source))
} else {
context.metrics.build_partition_reader += start.elapsed();
PartitionReader::new(context, Box::new(merger))
}
}
Expand Down Expand Up @@ -282,9 +287,10 @@ pub(crate) struct PartitionStats {

#[derive(Default)]
struct PartitionReaderMetrics {
build_partition_reader: Duration,
read_source: Duration,
data_batch_to_batch: Duration,
read_builder: bool,
num_builder: usize,
num_shards: usize,
}

Expand Down Expand Up @@ -440,9 +446,15 @@ impl Drop for ReadPartitionContext {
.observe(partition_data_batch_to_batch);

common_telemetry::debug!(
"TreeIter partitions metrics, read_builder: {}, num_shards: {}, partition_read_source: {}s, partition_data_batch_to_batch: {}s",
self.metrics.read_builder,
"TreeIter partitions metrics, \
num_builder: {}, \
num_shards: {}, \
build_partition_reader: {}s, \
partition_read_source: {}s, \
partition_data_batch_to_batch: {}s",
self.metrics.num_builder,
self.metrics.num_shards,
self.metrics.build_partition_reader.as_secs_f64(),
partition_read_source,
partition_data_batch_to_batch,
);
Expand Down Expand Up @@ -549,7 +561,16 @@ impl Inner {
fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self {
let (shards, current_shard_id) = if metadata.primary_key.is_empty() {
let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup);
(vec![Shard::new(0, None, data_parts, config.dedup)], 1)
(
vec![Shard::new(
0,
None,
data_parts,
config.dedup,
config.data_freeze_threshold,
)],
1,
)
} else {
(Vec::new(), 0)
};
Expand All @@ -569,18 +590,22 @@ impl Inner {
self.pk_to_pk_id.get(primary_key).copied()
}

fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) {
fn write_to_shard(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
if pk_id.shard_id == self.shard_builder.current_shard_id() {
self.shard_builder.write_with_pk_id(pk_id, key_value);
return;
}
for shard in &mut self.shards {
if shard.shard_id == pk_id.shard_id {
shard.write_with_pk_id(pk_id, key_value);
self.num_rows += 1;
return;
}
return Ok(());
}

// Safety: We find the shard by shard id.
let shard = self
.shards
.iter_mut()
.find(|shard| shard.shard_id == pk_id.shard_id)
.unwrap();
shard.write_with_pk_id(pk_id, key_value)?;
self.num_rows += 1;

Ok(())
}

fn freeze_active_shard(&mut self) -> Result<()> {
Expand Down
87 changes: 74 additions & 13 deletions src/mito2/src/memtable/merge_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Shard {
/// Data in the shard.
data_parts: DataParts,
dedup: bool,
/// Number of rows to freeze a data part.
data_freeze_threshold: usize,
}

impl Shard {
Expand All @@ -48,20 +50,29 @@ impl Shard {
key_dict: Option<KeyDictRef>,
data_parts: DataParts,
dedup: bool,
data_freeze_threshold: usize,
) -> Shard {
Shard {
shard_id,
key_dict,
data_parts,
dedup,
data_freeze_threshold,
}
}

/// Writes a key value into the shard.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) {
///
/// It will freezes the active buffer if it is full.
pub fn write_with_pk_id(&mut self, pk_id: PkId, key_value: &KeyValue) -> Result<()> {
debug_assert_eq!(self.shard_id, pk_id.shard_id);

if self.data_parts.num_active_rows() >= self.data_freeze_threshold {
self.data_parts.freeze()?;
}

self.data_parts.write_row(pk_id.pk_index, key_value);
Ok(())
}

/// Scans the shard.
Expand All @@ -83,6 +94,7 @@ impl Shard {
key_dict: self.key_dict.clone(),
data_parts: DataParts::new(metadata, DATA_INIT_CAP, self.dedup),
dedup: self.dedup,
data_freeze_threshold: self.data_freeze_threshold,
}
}

Expand Down Expand Up @@ -467,6 +479,7 @@ mod tests {
shard_id: ShardId,
metadata: RegionMetadataRef,
input: &[(KeyValues, PkIndex)],
data_freeze_threshold: usize,
) -> Shard {
let mut dict_builder = KeyDictBuilder::new(1024);
let mut metrics = WriteMetrics::default();
Expand All @@ -481,36 +494,84 @@ mod tests {
let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(shard_id, Some(Arc::new(dict)), data_parts, true)
Shard::new(
shard_id,
Some(Arc::new(dict)),
data_parts,
true,
data_freeze_threshold,
)
}

fn collect_timestamps(shard: &Shard) -> Vec<i64> {
let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {
let rb = reader.current_data_batch().slice_record_batch();
let ts_array = rb.column(1);
let ts_slice = timestamp_array_to_i64_slice(ts_array);
timestamps.extend_from_slice(ts_slice);

reader.next().unwrap();
}
timestamps
}

#[test]
fn test_write_read_shard() {
let metadata = metadata_for_test();
let input = input_with_key(&metadata);
let mut shard = new_shard_with_dict(8, metadata, &input);
let mut shard = new_shard_with_dict(8, metadata, &input, 100);
assert!(shard.is_empty());
for (key_values, pk_index) in &input {
for kv in key_values.iter() {
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *pk_index,
};
shard.write_with_pk_id(pk_id, &kv);
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert!(!shard.is_empty());

let mut reader = shard.read().unwrap().build(None).unwrap();
let mut timestamps = Vec::new();
while reader.is_valid() {
let rb = reader.current_data_batch().slice_record_batch();
let ts_array = rb.column(1);
let ts_slice = timestamp_array_to_i64_slice(ts_array);
timestamps.extend_from_slice(ts_slice);
let timestamps = collect_timestamps(&shard);
assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
}

reader.next().unwrap();
#[test]
fn test_shard_freeze() {
let metadata = metadata_for_test();
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[0].into_iter(),
[Some(0.0)].into_iter(),
0,
);
let mut shard = new_shard_with_dict(8, metadata.clone(), &[(kvs, 0)], 50);
let expected: Vec<_> = (0..200).collect();
for i in &expected {
let kvs = build_key_values_with_ts_seq_values(
&metadata,
"shard".to_string(),
0,
[*i].into_iter(),
[Some(0.0)].into_iter(),
*i as u64,
);
let pk_id = PkId {
shard_id: shard.shard_id,
pk_index: *i as PkIndex,
};
for kv in kvs.iter() {
shard.write_with_pk_id(pk_id, &kv).unwrap();
}
}
assert_eq!(vec![0, 1, 10, 11, 20, 21], timestamps);
assert!(!shard.is_empty());
assert_eq!(3, shard.data_parts.frozen_len());

let timestamps = collect_timestamps(&shard);
assert_eq!(expected, timestamps);
}
}
8 changes: 7 additions & 1 deletion src/mito2/src/memtable/merge_tree/shard_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ impl ShardBuilder {
let shard_id = self.current_shard_id;
self.current_shard_id += 1;

Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup)))
Ok(Some(Shard::new(
shard_id,
key_dict,
data_parts,
self.dedup,
self.data_freeze_threshold,
)))
}

/// Scans the shard builder.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl MergeTree {

if !has_pk {
// No primary key.
self.write_no_key(kv);
self.write_no_key(kv)?;
continue;
}

Expand Down Expand Up @@ -299,7 +299,7 @@ impl MergeTree {
)
}

fn write_no_key(&self, key_value: KeyValue) {
fn write_no_key(&self, key_value: KeyValue) -> Result<()> {
let partition_key = Partition::get_partition_key(&key_value, self.is_partitioned);
let partition = self.get_or_create_partition(partition_key);

Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ intermediate_path = ""
[datanode.region_engine.mito.memtable]
type = "experimental"
index_max_keys_per_shard = 8192
data_freeze_threshold = 32768
data_freeze_threshold = 131072
dedup = true
fork_dictionary_bytes = "1GiB"

Expand Down