Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support reading multiple memtables #93

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 19 additions & 21 deletions src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,27 +152,25 @@ mod tests {
use super::*;
use crate::test_util;

// TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data.

// #[tokio::test]
// async fn test_execute_insert() {
// let catalog_list = memory::new_memory_catalog_list().unwrap();
// let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
// let instance = Instance::new(&opts, catalog_list).await.unwrap();
// instance.start().await.unwrap();

// let output = instance
// .execute_sql(
// r#"insert into demo(host, cpu, memory, ts) values
// ('host1', 66.6, 1024, 1655276557000),
// ('host2', 88.8, 333.3, 1655276558000)
// "#,
// )
// .await
// .unwrap();

// assert!(matches!(output, Output::AffectedRows(2)));
// }
#[tokio::test]
async fn test_execute_insert() {
let catalog_list = memory::new_memory_catalog_list().unwrap();
let (opts, _tmp_dir) = test_util::create_tmp_dir_and_datanode_opts();
let instance = Instance::new(&opts, catalog_list).await.unwrap();
instance.start().await.unwrap();

let output = instance
.execute_sql(
r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await
.unwrap();

assert!(matches!(output, Output::AffectedRows(2)));
}

#[tokio::test]
async fn test_execute_query() {
Expand Down
10 changes: 6 additions & 4 deletions src/storage/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use async_trait::async_trait;
use store_api::storage::{Chunk, ChunkReader, SchemaRef};

use crate::error::{Error, Result};
use crate::memtable::BatchIteratorPtr;
use crate::memtable::Batch;

type IteratorPtr = Box<dyn Iterator<Item = Result<Batch>> + Send>;

pub struct ChunkReaderImpl {
schema: SchemaRef,
// Now we only read data from one memtable, so we just holds the memtable iterator here.
iter: BatchIteratorPtr,
// Now we only read data from memtables, so we just holds the iterator here.
iter: IteratorPtr,
}

#[async_trait]
Expand Down Expand Up @@ -35,7 +37,7 @@ impl ChunkReader for ChunkReaderImpl {
}

impl ChunkReaderImpl {
pub fn new(schema: SchemaRef, iter: BatchIteratorPtr) -> ChunkReaderImpl {
pub fn new(schema: SchemaRef, iter: IteratorPtr) -> ChunkReaderImpl {
ChunkReaderImpl { schema, iter }
}
}
2 changes: 1 addition & 1 deletion src/storage/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
fn write(&self, kvs: &KeyValues) -> Result<()>;

/// Iterates the memtable.
// TODO(yingwen): Consider passing a projector (does column projection).
// TODO(yingwen): 1. Use reference of IterContext? 2. Consider passing a projector (does column projection).
fn iter(&self, ctx: IterContext) -> Result<BatchIteratorPtr>;

/// Returns the estimated bytes allocated by this memtable from heap.
Expand Down
22 changes: 21 additions & 1 deletion src/storage/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,28 @@ use crate::memtable::{MemtableId, MemtableRef};
pub struct MemtableVersion {
mutable: MemtableSet,
/// Immutable memtables.
immutables: Vec<Arc<MemtableSet>>,
immutables: Vec<MemtableSetRef>,
}

impl MemtableVersion {
pub fn new() -> MemtableVersion {
MemtableVersion::default()
}

#[inline]
pub fn mutable_memtables(&self) -> &MemtableSet {
&self.mutable
}

#[inline]
pub fn immutable_memtables(&self) -> &[MemtableSetRef] {
&self.immutables
}

pub fn num_memtables(&self) -> usize {
self.mutable.len() + self.immutables.iter().map(|set| set.len()).sum::<usize>()
}

/// Clone current memtable version and freeze its mutable memtables, which moves
/// all mutable memtables to immutable memtable list.
pub fn freeze_mutable(&self) -> MemtableVersion {
Expand Down Expand Up @@ -125,6 +135,8 @@ pub struct MemtableSet {
max_memtable_id: MemtableId,
}

pub type MemtableSetRef = Arc<MemtableSet>;

impl PartialEq for MemtableSet {
fn eq(&self, other: &MemtableSet) -> bool {
self.max_memtable_id == other.max_memtable_id
Expand Down Expand Up @@ -164,11 +176,13 @@ impl MemtableSet {
}

/// Returns number of memtables in the set.
#[inline]
pub fn len(&self) -> usize {
self.memtables.len()
}

/// Returns true if there is no memtable in the set.
#[inline]
pub fn is_empty(&self) -> bool {
self.memtables.is_empty()
}
Expand Down Expand Up @@ -330,12 +344,14 @@ mod tests {

let v1 = MemtableVersion::new();
assert!(v1.mutable_memtables().is_empty());
assert_eq!(0, v1.num_memtables());

// Add one mutable
let v2 = v1.add_mutable(s1.clone());
assert_ne!(v1, v2);
let mutables = v2.mutable_memtables();
assert_eq!(s1, *mutables);
assert_eq!(3, v2.num_memtables());

// Add another mutable
let v3 = v2.add_mutable(s2);
Expand All @@ -344,6 +360,7 @@ mod tests {
let mutables = v3.mutable_memtables();
assert_eq!(s3, *mutables);
assert!(v3.memtables_to_flush().1.is_empty());
assert_eq!(7, v3.num_memtables());

// Try to freeze s1, s2
let v4 = v3.freeze_mutable();
Expand All @@ -357,6 +374,7 @@ mod tests {
let (max_id, tables) = v4.memtables_to_flush();
assert_eq!(6, max_id.unwrap());
assert_eq!(7, tables.len());
assert_eq!(7, v4.num_memtables());

// Add another mutable
let s4 = create_test_memtableset(&[7, 8]);
Expand All @@ -374,6 +392,7 @@ mod tests {
let (max_id, tables) = v6.memtables_to_flush();
assert_eq!(8, max_id.unwrap());
assert_eq!(9, tables.len());
assert_eq!(9, v6.num_memtables());
// verify tables
for (i, table) in tables.iter().enumerate() {
assert_eq!(i as u32, table.memtable.id());
Expand All @@ -391,5 +410,6 @@ mod tests {

let v8 = v7.remove_immutables(8);
assert_eq!(v8.immutables.len(), 0);
assert_eq!(0, v8.num_memtables());
}
}
7 changes: 5 additions & 2 deletions src/storage/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,13 @@ impl<S> RegionImpl<S> {

RegionImpl { inner }
}
}

#[cfg(test)]
// Private methods for tests.
#[cfg(test)]
impl<S> RegionImpl<S> {
#[inline]
fn _committed_sequence(&self) -> store_api::storage::SequenceNumber {
fn committed_sequence(&self) -> store_api::storage::SequenceNumber {
self.inner.version_control().committed_sequence()
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/region/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Region tests.

// TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data.
// mod read_write;
mod read_write;

use datatypes::type_id::LogicalTypeId;
use log_store::fs::noop::NoopLogStore;
Expand Down
39 changes: 34 additions & 5 deletions src/storage/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use store_api::storage::{

use crate::chunk::ChunkReaderImpl;
use crate::error::{Error, Result};
use crate::memtable::IterContext;
use crate::version::VersionRef;

/// [Snapshot] implementation.
Expand All @@ -28,14 +29,42 @@ impl Snapshot for SnapshotImpl {

async fn scan(
&self,
_ctx: &ReadContext,
ctx: &ReadContext,
request: ScanRequest,
) -> Result<ScanResponse<ChunkReaderImpl>> {
let _visible_sequence = self.sequence_to_read(request.sequence);
let _mem = self.version.memtables();
let visible_sequence = self.sequence_to_read(request.sequence);
let memtable_version = self.version.memtables();

// TODO(yingwen): [flush] Scan and merge results from mutable memtables.
unimplemented!()
let mutables = memtable_version.mutable_memtables();
let immutables = memtable_version.immutable_memtables();
let mut batch_iters = Vec::with_capacity(memtable_version.num_memtables());

let iter_ctx = IterContext {
batch_size: ctx.batch_size,
visible_sequence,
..Default::default()
};

for (_range, mem) in mutables.iter() {
let iter = mem.iter(iter_ctx.clone())?;

batch_iters.push(iter);
}

for mem_set in immutables {
for (_range, mem) in mem_set.iter() {
let iter = mem.iter(iter_ctx.clone())?;

batch_iters.push(iter);
}
}

// Now we just simply chain all iterators together, ignore duplications/ordering.
let iter = Box::new(batch_iters.into_iter().flatten());

let reader = ChunkReaderImpl::new(self.version.schema().clone(), iter);

Ok(ScanResponse { reader })
}

async fn get(&self, _ctx: &ReadContext, _request: GetRequest) -> Result<GetResponse> {
Expand Down
124 changes: 61 additions & 63 deletions src/table-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,66 +182,64 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
}
}

// TODO(yingwen): [flush] Uncomment this once we supports flush and scanning flushed data.

// #[cfg(test)]
// mod tests {
// use common_recordbatch::util;
// use datafusion_common::field_util::FieldExt;
// use datafusion_common::field_util::SchemaExt;
// use datatypes::vectors::*;
// use table::requests::InsertRequest;

// use super::*;
// use crate::table::test;

// #[tokio::test]
// async fn test_create_table_insert_scan() {
// let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await;

// assert_eq!(TableType::Base, table.table_type());
// assert_eq!(schema, table.schema());

// let insert_req = InsertRequest {
// table_name: "demo".to_string(),
// columns_values: HashMap::default(),
// };
// assert_eq!(0, table.insert(insert_req).await.unwrap());

// let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
// let hosts = StringVector::from(vec!["host1", "host2"]);
// let cpus = Float64Vector::from_vec(vec![55.5, 66.6]);
// let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]);
// let tss = Int64Vector::from_vec(vec![1, 2]);

// columns_values.insert("host".to_string(), Arc::new(hosts.clone()));
// columns_values.insert("cpu".to_string(), Arc::new(cpus.clone()));
// columns_values.insert("memory".to_string(), Arc::new(memories.clone()));
// columns_values.insert("ts".to_string(), Arc::new(tss.clone()));

// let insert_req = InsertRequest {
// table_name: "demo".to_string(),
// columns_values,
// };
// assert_eq!(2, table.insert(insert_req).await.unwrap());

// let stream = table.scan(&None, &[], None).await.unwrap();
// let batches = util::collect(stream).await.unwrap();
// assert_eq!(1, batches.len());
// assert_eq!(batches[0].df_recordbatch.num_columns(), 4);

// let arrow_schema = batches[0].schema.arrow_schema();
// assert_eq!(arrow_schema.fields().len(), 4);
// assert_eq!(arrow_schema.field(0).name(), "host");
// assert_eq!(arrow_schema.field(1).name(), "ts");
// assert_eq!(arrow_schema.field(2).name(), "cpu");
// assert_eq!(arrow_schema.field(3).name(), "memory");

// let columns = batches[0].df_recordbatch.columns();
// assert_eq!(4, columns.len());
// assert_eq!(hosts.to_arrow_array(), columns[0]);
// assert_eq!(tss.to_arrow_array(), columns[1]);
// assert_eq!(cpus.to_arrow_array(), columns[2]);
// assert_eq!(memories.to_arrow_array(), columns[3]);
// }
// }
#[cfg(test)]
mod tests {
use common_recordbatch::util;
use datafusion_common::field_util::FieldExt;
use datafusion_common::field_util::SchemaExt;
use datatypes::vectors::*;
use table::requests::InsertRequest;

use super::*;
use crate::table::test;

#[tokio::test]
async fn test_create_table_insert_scan() {
let (_engine, table, schema, _dir) = test::setup_test_engine_and_table().await;

assert_eq!(TableType::Base, table.table_type());
assert_eq!(schema, table.schema());

let insert_req = InsertRequest {
table_name: "demo".to_string(),
columns_values: HashMap::default(),
};
assert_eq!(0, table.insert(insert_req).await.unwrap());

let mut columns_values: HashMap<String, VectorRef> = HashMap::with_capacity(4);
let hosts = StringVector::from(vec!["host1", "host2"]);
let cpus = Float64Vector::from_vec(vec![55.5, 66.6]);
let memories = Float64Vector::from_vec(vec![1024f64, 4096f64]);
let tss = Int64Vector::from_vec(vec![1, 2]);

columns_values.insert("host".to_string(), Arc::new(hosts.clone()));
columns_values.insert("cpu".to_string(), Arc::new(cpus.clone()));
columns_values.insert("memory".to_string(), Arc::new(memories.clone()));
columns_values.insert("ts".to_string(), Arc::new(tss.clone()));

let insert_req = InsertRequest {
table_name: "demo".to_string(),
columns_values,
};
assert_eq!(2, table.insert(insert_req).await.unwrap());

let stream = table.scan(&None, &[], None).await.unwrap();
let batches = util::collect(stream).await.unwrap();
assert_eq!(1, batches.len());
assert_eq!(batches[0].df_recordbatch.num_columns(), 4);

let arrow_schema = batches[0].schema.arrow_schema();
assert_eq!(arrow_schema.fields().len(), 4);
assert_eq!(arrow_schema.field(0).name(), "host");
assert_eq!(arrow_schema.field(1).name(), "ts");
assert_eq!(arrow_schema.field(2).name(), "cpu");
assert_eq!(arrow_schema.field(3).name(), "memory");

let columns = batches[0].df_recordbatch.columns();
assert_eq!(4, columns.len());
assert_eq!(hosts.to_arrow_array(), columns[0]);
assert_eq!(tss.to_arrow_array(), columns[1]);
assert_eq!(cpus.to_arrow_array(), columns[2]);
assert_eq!(memories.to_arrow_array(), columns[3]);
}
}