From f936a3c824bc5d5fb5b94710298792db246c8747 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 14 Jul 2022 19:26:00 +0800 Subject: [PATCH 1/2] feat: Support reading multiple memtables --- src/storage/src/chunk.rs | 10 +++++--- src/storage/src/memtable.rs | 2 +- src/storage/src/memtable/version.rs | 22 +++++++++++++++- src/storage/src/region.rs | 7 ++++-- src/storage/src/region/tests.rs | 3 +-- src/storage/src/snapshot.rs | 39 +++++++++++++++++++++++++---- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index af92141ce18..99728285a0e 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -2,12 +2,14 @@ use async_trait::async_trait; use store_api::storage::{Chunk, ChunkReader, SchemaRef}; use crate::error::{Error, Result}; -use crate::memtable::BatchIteratorPtr; +use crate::memtable::Batch; + +type IteratorPtr = Box> + Send>; pub struct ChunkReaderImpl { schema: SchemaRef, - // Now we only read data from one memtable, so we just holds the memtable iterator here. - iter: BatchIteratorPtr, + // Now we only read data from memtables, so we just holds the iterator here. + iter: IteratorPtr, } #[async_trait] @@ -35,7 +37,7 @@ impl ChunkReader for ChunkReaderImpl { } impl ChunkReaderImpl { - pub fn new(schema: SchemaRef, iter: BatchIteratorPtr) -> ChunkReaderImpl { + pub fn new(schema: SchemaRef, iter: IteratorPtr) -> ChunkReaderImpl { ChunkReaderImpl { schema, iter } } } diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 7673dea8c32..0ba18611a96 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -32,7 +32,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { fn write(&self, kvs: &KeyValues) -> Result<()>; /// Iterates the memtable. - // TODO(yingwen): Consider passing a projector (does column projection). + // TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection). fn iter(&self, ctx: IterContext) -> Result; /// Returns the estimated bytes allocated by this memtable from heap. diff --git a/src/storage/src/memtable/version.rs b/src/storage/src/memtable/version.rs index d93338429ad..9065e664dd7 100644 --- a/src/storage/src/memtable/version.rs +++ b/src/storage/src/memtable/version.rs @@ -14,7 +14,7 @@ use crate::memtable::{MemtableId, MemtableRef}; pub struct MemtableVersion { mutable: MemtableSet, /// Immutable memtables. - immutables: Vec>, + immutables: Vec, } impl MemtableVersion { @@ -22,10 +22,20 @@ impl MemtableVersion { MemtableVersion::default() } + #[inline] pub fn mutable_memtables(&self) -> &MemtableSet { &self.mutable } + #[inline] + pub fn immutable_memtables(&self) -> &[MemtableSetRef] { + &self.immutables + } + + pub fn num_memtables(&self) -> usize { + self.mutable.len() + self.immutables.iter().map(|set| set.len()).sum::() + } + /// Clone current memtable version and freeze its mutable memtables, which moves /// all mutable memtables to immutable memtable list. pub fn freeze_mutable(&self) -> MemtableVersion { @@ -125,6 +135,8 @@ pub struct MemtableSet { max_memtable_id: MemtableId, } +pub type MemtableSetRef = Arc; + impl PartialEq for MemtableSet { fn eq(&self, other: &MemtableSet) -> bool { self.max_memtable_id == other.max_memtable_id @@ -164,11 +176,13 @@ impl MemtableSet { } /// Returns number of memtables in the set. + #[inline] pub fn len(&self) -> usize { self.memtables.len() } /// Returns true if there is no memtable in the set. + #[inline] pub fn is_empty(&self) -> bool { self.memtables.is_empty() } @@ -330,12 +344,14 @@ mod tests { let v1 = MemtableVersion::new(); assert!(v1.mutable_memtables().is_empty()); + assert_eq!(0, v1.num_memtables()); // Add one mutable let v2 = v1.add_mutable(s1.clone()); assert_ne!(v1, v2); let mutables = v2.mutable_memtables(); assert_eq!(s1, *mutables); + assert_eq!(3, v2.num_memtables()); // Add another mutable let v3 = v2.add_mutable(s2); @@ -344,6 +360,7 @@ mod tests { let mutables = v3.mutable_memtables(); assert_eq!(s3, *mutables); assert!(v3.memtables_to_flush().1.is_empty()); + assert_eq!(7, v3.num_memtables()); // Try to freeze s1, s2 let v4 = v3.freeze_mutable(); @@ -357,6 +374,7 @@ mod tests { let (max_id, tables) = v4.memtables_to_flush(); assert_eq!(6, max_id.unwrap()); assert_eq!(7, tables.len()); + assert_eq!(7, v4.num_memtables()); // Add another mutable let s4 = create_test_memtableset(&[7, 8]); @@ -374,6 +392,7 @@ mod tests { let (max_id, tables) = v6.memtables_to_flush(); assert_eq!(8, max_id.unwrap()); assert_eq!(9, tables.len()); + assert_eq!(9, v6.num_memtables()); // verify tables for (i, table) in tables.iter().enumerate() { assert_eq!(i as u32, table.memtable.id()); @@ -391,5 +410,6 @@ mod tests { let v8 = v7.remove_immutables(8); assert_eq!(v8.immutables.len(), 0); + assert_eq!(0, v8.num_memtables()); } } diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index d17fde5986d..462b72301dc 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -93,10 +93,13 @@ impl RegionImpl { RegionImpl { inner } } +} - #[cfg(test)] +// Private methods for tests. +#[cfg(test)] +impl RegionImpl { #[inline] - fn _committed_sequence(&self) -> store_api::storage::SequenceNumber { + fn committed_sequence(&self) -> store_api::storage::SequenceNumber { self.inner.version_control().committed_sequence() } } diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 112a070675e..fe8251cd88b 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -1,7 +1,6 @@ //! Region tests. -// TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data. -// mod read_write; +mod read_write; use datatypes::type_id::LogicalTypeId; use log_store::fs::noop::NoopLogStore; diff --git a/src/storage/src/snapshot.rs b/src/storage/src/snapshot.rs index 75511b0e12b..3603dc18f19 100644 --- a/src/storage/src/snapshot.rs +++ b/src/storage/src/snapshot.rs @@ -8,6 +8,7 @@ use store_api::storage::{ use crate::chunk::ChunkReaderImpl; use crate::error::{Error, Result}; +use crate::memtable::IterContext; use crate::version::VersionRef; /// [Snapshot] implementation. @@ -28,14 +29,42 @@ impl Snapshot for SnapshotImpl { async fn scan( &self, - _ctx: &ReadContext, + ctx: &ReadContext, request: ScanRequest, ) -> Result> { - let _visible_sequence = self.sequence_to_read(request.sequence); - let _mem = self.version.memtables(); + let visible_sequence = self.sequence_to_read(request.sequence); + let memtable_version = self.version.memtables(); - // TODO(yingwen): [flush] Scan and merge results from mutable memtables. - unimplemented!() + let mutables = memtable_version.mutable_memtables(); + let immutables = memtable_version.immutable_memtables(); + let mut batch_iters = Vec::with_capacity(memtable_version.num_memtables()); + + let iter_ctx = IterContext { + batch_size: ctx.batch_size, + visible_sequence, + ..Default::default() + }; + + for (_range, mem) in mutables.iter() { + let iter = mem.iter(iter_ctx.clone())?; + + batch_iters.push(iter); + } + + for mem_set in immutables { + for (_range, mem) in mem_set.iter() { + let iter = mem.iter(iter_ctx.clone())?; + + batch_iters.push(iter); + } + } + + // Now we just simply chain all iterators together, ignore duplications/ordering. + let iter = Box::new(batch_iters.into_iter().flatten()); + + let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter); + + Ok(ScanResponse { reader }) } async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result { From 4a3ed24c2f9a7c096c699db54d856c90c7cfe86d Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 18 Jul 2022 15:58:48 +0800 Subject: [PATCH 2/2] test: uncomment tests rely on snapshot read --- src/datanode/src/instance.rs | 40 +++++------ src/table-engine/src/engine.rs | 124 ++++++++++++++++----------------- 2 files changed, 80 insertions(+), 84 deletions(-) diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 8c167da8ecb..472a28cd8f9 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -152,27 +152,25 @@ mod tests { use super::*; use crate::test_util; - // TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data. - - // #[tokio::test] - // async fn test_execute_insert() { - // let catalog_list = memory::new_memory_catalog_list().unwrap(); - // let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); - // let instance = Instance::new(&opts, catalog_list).await.unwrap(); - // instance.start().await.unwrap(); - - // let output = instance - // .execute_sql( - // r#"insert into demo(host, cpu, memory, ts) values - // ('host1', 66.6, 1024, 1655276557000), - // ('host2', 88.8, 333.3, 1655276558000) - // "#, - // ) - // .await - // .unwrap(); - - // assert!(matches!(output, Output::AffectedRows(2))); - // } + #[tokio::test] + async fn test_execute_insert() { + let catalog_list = memory::new_memory_catalog_list().unwrap(); + let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts(); + let instance = Instance::new(&opts, catalog_list).await.unwrap(); + instance.start().await.unwrap(); + + let output = instance + .execute_sql( + r#"insert into demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await + .unwrap(); + + assert!(matches!(output, Output::AffectedRows(2))); + } #[tokio::test] async fn test_execute_query() { diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index 1cd44a5e14d..fba67bc597d 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -182,66 +182,64 @@ impl MitoEngineInner { } } -// TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data. - -// #[cfg(test)] -// mod tests { -// use common_recordbatch::util; -// use datafusion_common::field_util::FieldExt; -// use datafusion_common::field_util::SchemaExt; -// use datatypes::vectors::*; -// use table::requests::InsertRequest; - -// use super::*; -// use crate::table::test; - -// #[tokio::test] -// async fn test_create_table_insert_scan() { -// let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await; - -// assert_eq!(TableType::Base, table.table_type()); -// assert_eq!(schema, table.schema()); - -// let insert_req = InsertRequest { -// table_name: "demo".to_string(), -// columns_values: HashMap::default(), -// }; -// assert_eq!(0, table.insert(insert_req).await.unwrap()); - -// let mut columns_values: HashMap = HashMap::with_capacity(4); -// let hosts = StringVector::from(vec!["host1", "host2"]); -// let cpus = Float64Vector::from_vec(vec![55.5, 66.6]); -// let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]); -// let tss = Int64Vector::from_vec(vec![1, 2]); - -// columns_values.insert("host".to_string(), Arc::new(hosts.clone())); -// columns_values.insert("cpu".to_string(), Arc::new(cpus.clone())); -// columns_values.insert("memory".to_string(), Arc::new(memories.clone())); -// columns_values.insert("ts".to_string(), Arc::new(tss.clone())); - -// let insert_req = InsertRequest { -// table_name: "demo".to_string(), -// columns_values, -// }; -// assert_eq!(2, table.insert(insert_req).await.unwrap()); - -// let stream = table.scan(&None, &[], None).await.unwrap(); -// let batches = util::collect(stream).await.unwrap(); -// assert_eq!(1, batches.len()); -// assert_eq!(batches[0].df_recordbatch.num_columns(), 4); - -// let arrow_schema = batches[0].schema.arrow_schema(); -// assert_eq!(arrow_schema.fields().len(), 4); -// assert_eq!(arrow_schema.field(0).name(), "host"); -// assert_eq!(arrow_schema.field(1).name(), "ts"); -// assert_eq!(arrow_schema.field(2).name(), "cpu"); -// assert_eq!(arrow_schema.field(3).name(), "memory"); - -// let columns = batches[0].df_recordbatch.columns(); -// assert_eq!(4, columns.len()); -// assert_eq!(hosts.to_arrow_array(), columns[0]); -// assert_eq!(tss.to_arrow_array(), columns[1]); -// assert_eq!(cpus.to_arrow_array(), columns[2]); -// assert_eq!(memories.to_arrow_array(), columns[3]); -// } -// } +#[cfg(test)] +mod tests { + use common_recordbatch::util; + use datafusion_common::field_util::FieldExt; + use datafusion_common::field_util::SchemaExt; + use datatypes::vectors::*; + use table::requests::InsertRequest; + + use super::*; + use crate::table::test; + + #[tokio::test] + async fn test_create_table_insert_scan() { + let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await; + + assert_eq!(TableType::Base, table.table_type()); + assert_eq!(schema, table.schema()); + + let insert_req = InsertRequest { + table_name: "demo".to_string(), + columns_values: HashMap::default(), + }; + assert_eq!(0, table.insert(insert_req).await.unwrap()); + + let mut columns_values: HashMap = HashMap::with_capacity(4); + let hosts = StringVector::from(vec!["host1", "host2"]); + let cpus = Float64Vector::from_vec(vec![55.5, 66.6]); + let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]); + let tss = Int64Vector::from_vec(vec![1, 2]); + + columns_values.insert("host".to_string(), Arc::new(hosts.clone())); + columns_values.insert("cpu".to_string(), Arc::new(cpus.clone())); + columns_values.insert("memory".to_string(), Arc::new(memories.clone())); + columns_values.insert("ts".to_string(), Arc::new(tss.clone())); + + let insert_req = InsertRequest { + table_name: "demo".to_string(), + columns_values, + }; + assert_eq!(2, table.insert(insert_req).await.unwrap()); + + let stream = table.scan(&None, &[], None).await.unwrap(); + let batches = util::collect(stream).await.unwrap(); + assert_eq!(1, batches.len()); + assert_eq!(batches[0].df_recordbatch.num_columns(), 4); + + let arrow_schema = batches[0].schema.arrow_schema(); + assert_eq!(arrow_schema.fields().len(), 4); + assert_eq!(arrow_schema.field(0).name(), "host"); + assert_eq!(arrow_schema.field(1).name(), "ts"); + assert_eq!(arrow_schema.field(2).name(), "cpu"); + assert_eq!(arrow_schema.field(3).name(), "memory"); + + let columns = batches[0].df_recordbatch.columns(); + assert_eq!(4, columns.len()); + assert_eq!(hosts.to_arrow_array(), columns[0]); + assert_eq!(tss.to_arrow_array(), columns[1]); + assert_eq!(cpus.to_arrow_array(), columns[2]); + assert_eq!(memories.to_arrow_array(), columns[3]); + } +}