diff --git a/Cargo.lock b/Cargo.lock index 7956ddcc8c9..bc787ab5b6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3254,6 +3254,7 @@ dependencies = [ "store-api", "tempdir", "tokio", + "uuid", ] [[package]] @@ -3926,9 +3927,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.0.0" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cfcd319456c4d6ea10087ed423473267e1a071f3bc0aa89f80d60997843c6f0" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" dependencies = [ "getrandom", ] diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 7ec93949ad3..51eaf182dcb 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -24,6 +24,7 @@ snafu = { version = "0.7", features = ["backtraces"] } store-api = { path = "../store-api" } regex = "1.5" tokio = { version = "1.18", features = ["full"] } +uuid = { version = "1.1" , features=["v4"]} [dev-dependencies] tempdir = "0.3" diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 1e3c03d788e..355d4586566 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -118,6 +118,9 @@ pub enum Error { #[snafu(display("Invalid timestamp in write batch, source: {}", source))] InvalidTimestamp { source: crate::write_batch::Error }, + + #[snafu(display("Task already cancelled"))] + Cancelled { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -133,9 +136,11 @@ impl ErrorExt for Error { | BatchMissingColumn { .. } | InvalidTimestamp { .. } => StatusCode::InvalidArguments, - Utf8 { .. } | EncodeJson { .. } | DecodeJson { .. } | JoinTask { .. } => { - StatusCode::Unexpected - } + Utf8 { .. } + | EncodeJson { .. } + | DecodeJson { .. } + | JoinTask { .. } + | Cancelled { .. } => StatusCode::Unexpected, FlushIo { .. } | InitBackend { .. } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 804ad64a7cb..00fb4831d9c 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -6,14 +6,15 @@ use common_time::RangeMillis; use store_api::manifest::Manifest; use store_api::manifest::ManifestVersion; use store_api::storage::SequenceNumber; +use uuid::Uuid; use crate::background::{Context, Job, JobHandle, JobPoolRef}; -use crate::error::Result; +use crate::error::{CancelledSnafu, Result}; use crate::manifest::action::*; -use crate::memtable::MemtableRef; +use crate::memtable::{IterContext, MemtableRef}; use crate::region::RegionWriterRef; use crate::region::SharedDataRef; -use crate::sst::{AccessLayerRef, FileMeta}; +use crate::sst::{AccessLayerRef, FileMeta, WriteOptions}; use crate::version::VersionEdit; /// Default write buffer size (32M). @@ -148,12 +149,40 @@ pub struct FlushJob { impl FlushJob { async fn write_memtables_to_layer(&self, ctx: &Context) -> Result> { if ctx.is_cancelled() { - // TODO(yingwen): [flush] Returns an cancelled error. - unimplemented!(); + return CancelledSnafu {}.fail(); } - // TODO(yingwen): [flush] Flush memtables to sst layer. - unimplemented!() + let mut futures = Vec::with_capacity(self.memtables.len()); + for m in &self.memtables { + let file_name = Self::generate_sst_file_name(); + // TODO(hl): Check if random file name already exists in meta. + + let iter_ctx = IterContext { + for_flush: true, + ..Default::default() + }; + + let iter = m.memtable.iter(iter_ctx)?; + futures.push(async move { + self.sst_layer + .write_sst(&file_name, iter, WriteOptions::default()) + .await + }); + } + + let metas = futures_util::future::join_all(futures) + .await + .into_iter() + .collect::>>()? + .into_iter() + .map(|f| FileMeta { + file_path: f, + level: 0, + }) + .collect(); + + logging::info!("Successfully flush memtables to files: {:?}", metas); + Ok(metas) } async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result { @@ -169,6 +198,11 @@ impl FlushJob { .update(RegionMetaAction::Edit(edit)) .await } + + /// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$` + fn generate_sst_file_name() -> String { + format!("{}.parquet", Uuid::new_v4().hyphenated()) + } } #[async_trait] @@ -193,6 +227,8 @@ impl Job for FlushJob { #[cfg(test)] mod tests { + use regex::Regex; + use super::*; #[test] @@ -201,4 +237,15 @@ mod tests { assert_eq!(8, get_mutable_limitation(10)); assert_eq!(56, get_mutable_limitation(64)); } + + #[test] + pub fn test_uuid_generate() { + let file_name = FlushJob::generate_sst_file_name(); + let regex = Regex::new(r"^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$").unwrap(); + assert!( + regex.is_match(&file_name), + "illegal sst file name: {}", + file_name + ); + } } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 6facda935d0..35fb190e992 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -122,12 +122,13 @@ pub struct WriteOptions { /// Sst access layer. #[async_trait] pub trait AccessLayer: Send + Sync { + // Writes SST file with given name and returns the full path. async fn write_sst( &self, file_name: &str, iter: BatchIteratorPtr, opts: WriteOptions, - ) -> Result<()>; + ) -> Result; } pub type AccessLayerRef = Arc; @@ -159,15 +160,13 @@ impl AccessLayer for FsAccessLayer { file_name: &str, iter: BatchIteratorPtr, opts: WriteOptions, - ) -> Result<()> { + ) -> Result { // Now we only supports parquet format. We may allow caller to specific sst format in // WriteOptions in the future. - let writer = ParquetWriter::new( - &self.sst_file_path(file_name), - iter, - self.object_store.clone(), - ); + let file_path = self.sst_file_path(file_name); + let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone()); - writer.write_sst(opts).await + writer.write_sst(opts).await?; + Ok(file_path) } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index ffd7b095654..242be108d41 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -20,20 +20,20 @@ use crate::metadata::ColumnMetadata; use crate::sst; /// Parquet sst writer. -pub struct ParquetWriter { - file_name: String, +pub struct ParquetWriter<'a> { + file_name: &'a str, iter: BatchIteratorPtr, object_store: ObjectStore, } -impl ParquetWriter { +impl<'a> ParquetWriter<'a> { pub fn new( - file_name: &str, + file_name: &'a str, iter: BatchIteratorPtr, object_store: ObjectStore, ) -> ParquetWriter { ParquetWriter { - file_name: file_name.to_string(), + file_name, iter, object_store, } @@ -48,7 +48,7 @@ impl ParquetWriter { /// in config will be written to a single row group. async fn write_rows(mut self, extra_meta: Option>) -> Result<()> { let schema = memtable_schema_to_arrow_schema(self.iter.schema()); - let object = self.object_store.object(&self.file_name); + let object = self.object_store.object(self.file_name); // FIXME(hl): writer size is not used in fs backend so just leave it to 0, // but in s3/azblob backend the Content-Length field of HTTP request is set