Skip to content

Commit

Permalink
feat(tskv): bring release of cache partition rwlock ahead
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed May 15, 2023
1 parent f0cf701 commit d097fc9
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 32 deletions.
1 change: 1 addition & 0 deletions tskv/src/kvcore.rs
Expand Up @@ -295,6 +295,7 @@ impl TsKv {
let runtime = self.runtime.clone();
let f = async move {
while let Some(x) = receiver.recv().await {
// TODO(zipper): this make config `flush_req_channel_cap` wasted
runtime.spawn(run_flush_memtable_job(
x,
ctx.clone(),
Expand Down
163 changes: 131 additions & 32 deletions tskv/src/memcache.rs
Expand Up @@ -20,7 +20,7 @@ use crate::error::Result;
use crate::Error::CommonError;
use crate::{byte_utils, Error, TseriesFamilyId};

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub enum FieldVal {
Float(f64),
Integer(i64),
Expand Down Expand Up @@ -97,7 +97,22 @@ impl Display for FieldVal {
}
}

#[derive(Debug)]
impl PartialEq for FieldVal {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(FieldVal::Unsigned(a), FieldVal::Unsigned(b)) => a == b,
(FieldVal::Integer(a), FieldVal::Integer(b)) => a == b,
(FieldVal::Float(a), FieldVal::Float(b)) => a.to_be_bytes() == b.to_be_bytes(),
(FieldVal::Boolean(a), FieldVal::Boolean(b)) => a == b,
(FieldVal::Bytes(a), FieldVal::Bytes(b)) => a == b,
_ => false,
}
}
}

impl Eq for FieldVal {}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RowData {
pub ts: i64,
pub fields: Vec<Option<FieldVal>>,
Expand Down Expand Up @@ -188,7 +203,7 @@ impl RowData {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RowGroup {
pub schema: Arc<TskvTableSchema>,
pub range: TimeRange,
Expand Down Expand Up @@ -378,13 +393,17 @@ impl MemCache {
.try_grow(group.size)
.map_err(|_| Error::MemoryExhausted)?;
let index = (sid as usize) % self.part_count;
let entry = self.partions[index]
.write()
.entry(sid)
.or_insert_with(|| Arc::new(RwLock::new(SeriesData::new(sid))))
.clone();

entry.write().write(group);
let mut series_map = self.partions[index].write();
if let Some(series_data) = series_map.get(&sid) {
let series_data_ptr = series_data.clone();
let mut series_data_ptr_w = series_data_ptr.write();
drop(series_map);
series_data_ptr_w.write(group);
} else {
let mut series_data = SeriesData::new(sid);
series_data.write(group);
series_map.insert(sid, Arc::new(RwLock::new(series_data)));
}
Ok(())
}

Expand All @@ -397,10 +416,9 @@ impl MemCache {
) {
let (column_id, sid) = split_id(field_id);
let index = (sid as usize) % self.part_count;
let part = self.partions[index].read();

if let Some(series) = part.get(&sid) {
series
let series_data = self.partions[index].read().get(&sid).cloned();
if let Some(series_data) = series_data {
series_data
.read()
.read_data(column_id, time_predicate, value_predicate, handle_data)
}
Expand All @@ -414,10 +432,9 @@ impl MemCache {
) {
for sid in series_ids.iter() {
let index = (*sid as usize) % self.part_count;
let part = self.partions[index].read();

if let Some(series) = part.get(sid) {
series
let series_data = self.partions[index].read().get(sid).cloned();
if let Some(series_data) = series_data {
series_data
.read()
.read_timestamps(&mut time_predicate, &mut handle_data);
}
Expand All @@ -438,39 +455,39 @@ impl MemCache {
for fid in field_ids {
let (column_id, sid) = split_id(*fid);
let index = (sid as usize) % self.part_count;
let part = self.partions[index].read();
if let Some(data) = part.get(&sid) {
data.write().delete_column(column_id);
let series_data = self.partions[index].read().get(&sid).cloned();
if let Some(series_data) = series_data {
series_data.write().delete_column(column_id);
}
}
}

pub fn change_column(&self, sids: &[SeriesId], column_name: &str, new_column: &TableColumn) {
for sid in sids {
let index = (*sid as usize) % self.part_count;
let part = self.partions[index].read();
if let Some(data) = part.get(sid) {
data.write().change_column(column_name, new_column);
let series_data = self.partions[index].read().get(sid).cloned();
if let Some(series_data) = series_data {
series_data.write().change_column(column_name, new_column);
}
}
}

pub fn add_column(&self, sids: &[SeriesId], new_column: &TableColumn) {
for sid in sids {
let index = (*sid as usize) % self.part_count;
let part = self.partions[index].read();
if let Some(data) = part.get(sid) {
data.write().add_column(new_column);
let series_data = self.partions[index].read().get(sid).cloned();
if let Some(series_data) = series_data {
series_data.write().add_column(new_column);
}
}
}

pub fn delete_series(&self, sids: &[SeriesId], range: &TimeRange) {
for sid in sids {
let index = (*sid as usize) % self.part_count;
let part = self.partions[index].read();
if let Some(data) = part.get(sid) {
data.write().delete_series(range);
let series_data = self.partions[index].read().get(sid).cloned();
if let Some(series_data) = series_data {
series_data.write().delete_series(range);
}
}
}
Expand Down Expand Up @@ -590,9 +607,11 @@ pub(crate) mod test {
use std::mem::size_of;
use std::sync::Arc;

use datafusion::arrow::datatypes::TimeUnit;
use memory_pool::{GreedyMemoryPool, MemoryPool};
use models::predicate::domain::TimeRange;
use models::schema::TskvTableSchema;
use models::{SchemaId, SeriesId, Timestamp};
use models::schema::{ColumnType, TableColumn, TskvTableSchema};
use models::{SchemaId, SeriesId, Timestamp, ValueType};
use parking_lot::RwLock;

use super::{FieldVal, MemCache, RowData, RowGroup};
Expand Down Expand Up @@ -661,4 +680,84 @@ pub(crate) mod test {

fname_vals_map
}

#[test]
fn test_write_group() {
let sid: SeriesId = 1;

let memory_pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(1024 * 1024 * 1024));
let mem_cache = MemCache::new(1, 1000, 1, &memory_pool);
{
let series_part = &mem_cache.partions[sid as usize].read();
let series_data = series_part.get(&sid);
assert!(series_data.is_none());
}

#[rustfmt::skip]
let mut schema_1 = TskvTableSchema::new(
"test_tenant".to_string(), "test_db".to_string(), "test_table".to_string(),
vec![
TableColumn::new_time_column(1, TimeUnit::Nanosecond),
TableColumn::new_tag_column(2, "tag_col_1".to_string()),
TableColumn::new_tag_column(3, "tag_col_2".to_string()),
TableColumn::new(4, "f_col_1".to_string(), ColumnType::Field(ValueType::Float), Default::default()),
],
);
schema_1.schema_id = 1;
#[rustfmt::skip]
let row_group_1 = RowGroup {
schema: Arc::new(schema_1),
range: TimeRange::new(1, 3),
rows: vec![
RowData { ts: 1, fields: vec![Some(FieldVal::Float(1.0))] },
RowData { ts: 3, fields: vec![Some(FieldVal::Float(3.0))] }
],
size: 10,
};
mem_cache.write_group(sid, 1, row_group_1.clone()).unwrap();
{
let series_part = &mem_cache.partions[sid as usize].read();
let series_data = series_part.get(&sid);
assert!(series_data.is_some());
let series_data = series_data.unwrap().read();
assert_eq!(sid, series_data.series_id);
assert_eq!(TimeRange::new(1, 3), series_data.range);
assert_eq!(1, series_data.groups.len());
assert_eq!(row_group_1, series_data.groups[0]);
}

#[rustfmt::skip]
let mut schema_2 = TskvTableSchema::new(
"test_tenant".to_string(), "test_db".to_string(), "test_table".to_string(),
vec![
TableColumn::new_time_column(1, TimeUnit::Nanosecond),
TableColumn::new_tag_column(2, "tag_col_1".to_string()),
TableColumn::new_tag_column(3, "tag_col_2".to_string()),
TableColumn::new(4, "f_col_1".to_string(), ColumnType::Field(ValueType::Float), Default::default()),
TableColumn::new(5, "f_col_2".to_string(), ColumnType::Field(ValueType::Integer), Default::default()),
],
);
schema_2.schema_id = 2;
#[rustfmt::skip]
let row_group_2 = RowGroup {
schema: Arc::new(schema_2),
range: TimeRange::new(3, 5),
rows: vec![
RowData { ts: 3, fields: vec![None, Some(FieldVal::Integer(3))] },
RowData { ts: 5, fields: vec![Some(FieldVal::Float(5.0)), Some(FieldVal::Integer(5))] }
],
size: 10,
};
mem_cache.write_group(sid, 2, row_group_2.clone()).unwrap();
{
let series_part = &mem_cache.partions[sid as usize].read();
let series_data = series_part.get(&sid);
assert!(series_data.is_some());
let series_data = series_data.unwrap().read();
assert_eq!(sid, series_data.series_id);
assert_eq!(TimeRange::new(1, 5), series_data.range);
assert_eq!(2, series_data.groups.len());
assert_eq!(row_group_2, series_data.groups[1]);
}
}
}

0 comments on commit d097fc9

Please sign in to comment.