Skip to content

Commit

Permalink
feat: flush job (#80)
Browse files Browse the repository at this point in the history
* feat: flush job

* fix cr comments

* move file name instead of clone
  • Loading branch information
v0y4g3r committed Jul 14, 2022
1 parent 270c1a5 commit 46cd39e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 26 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
store-api = { path = "../store-api" }
regex = "1.5"
tokio = { version = "1.18", features = ["full"] }
uuid = { version = "1.1" , features=["v4"]}

[dev-dependencies]
tempdir = "0.3"
11 changes: 8 additions & 3 deletions src/storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ pub enum Error {

#[snafu(display("Invalid timestamp in write batch, source: {}", source))]
InvalidTimestamp { source: crate::write_batch::Error },

#[snafu(display("Task already cancelled"))]
Cancelled { backtrace: Backtrace },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -133,9 +136,11 @@ impl ErrorExt for Error {
| BatchMissingColumn { .. }
| InvalidTimestamp { .. } => StatusCode::InvalidArguments,

Utf8 { .. } | EncodeJson { .. } | DecodeJson { .. } | JoinTask { .. } => {
StatusCode::Unexpected
}
Utf8 { .. }
| EncodeJson { .. }
| DecodeJson { .. }
| JoinTask { .. }
| Cancelled { .. } => StatusCode::Unexpected,

FlushIo { .. }
| InitBackend { .. }
Expand Down
61 changes: 54 additions & 7 deletions src/storage/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use common_time::RangeMillis;
use store_api::manifest::Manifest;
use store_api::manifest::ManifestVersion;
use store_api::storage::SequenceNumber;
use uuid::Uuid;

use crate::background::{Context, Job, JobHandle, JobPoolRef};
use crate::error::Result;
use crate::error::{CancelledSnafu, Result};
use crate::manifest::action::*;
use crate::memtable::MemtableRef;
use crate::memtable::{IterContext, MemtableRef};
use crate::region::RegionWriterRef;
use crate::region::SharedDataRef;
use crate::sst::{AccessLayerRef, FileMeta};
use crate::sst::{AccessLayerRef, FileMeta, WriteOptions};
use crate::version::VersionEdit;

/// Default write buffer size (32M).
Expand Down Expand Up @@ -148,12 +149,40 @@ pub struct FlushJob {
impl FlushJob {
async fn write_memtables_to_layer(&self, ctx: &Context) -> Result<Vec<FileMeta>> {
if ctx.is_cancelled() {
// TODO(yingwen): [flush] Returns an cancelled error.
unimplemented!();
return CancelledSnafu {}.fail();
}

// TODO(yingwen): [flush] Flush memtables to sst layer.
unimplemented!()
let mut futures = Vec::with_capacity(self.memtables.len());
for m in &self.memtables {
let file_name = Self::generate_sst_file_name();
// TODO(hl): Check if random file name already exists in meta.

let iter_ctx = IterContext {
for_flush: true,
..Default::default()
};

let iter = m.memtable.iter(iter_ctx)?;
futures.push(async move {
self.sst_layer
.write_sst(&file_name, iter, WriteOptions::default())
.await
});
}

let metas = futures_util::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
.map(|f| FileMeta {
file_path: f,
level: 0,
})
.collect();

logging::info!("Successfully flush memtables to files: {:?}", metas);
Ok(metas)
}

async fn write_to_manifest(&self, file_metas: &[FileMeta]) -> Result<ManifestVersion> {
Expand All @@ -169,6 +198,11 @@ impl FlushJob {
.update(RegionMetaAction::Edit(edit))
.await
}

/// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
fn generate_sst_file_name() -> String {
format!("{}.parquet", Uuid::new_v4().hyphenated())
}
}

#[async_trait]
Expand All @@ -193,6 +227,8 @@ impl Job for FlushJob {

#[cfg(test)]
mod tests {
use regex::Regex;

use super::*;

#[test]
Expand All @@ -201,4 +237,15 @@ mod tests {
assert_eq!(8, get_mutable_limitation(10));
assert_eq!(56, get_mutable_limitation(64));
}

#[test]
pub fn test_uuid_generate() {
let file_name = FlushJob::generate_sst_file_name();
let regex = Regex::new(r"^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$").unwrap();
assert!(
regex.is_match(&file_name),
"illegal sst file name: {}",
file_name
);
}
}
15 changes: 7 additions & 8 deletions src/storage/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ pub struct WriteOptions {
/// Sst access layer.
#[async_trait]
pub trait AccessLayer: Send + Sync {
// Writes SST file with given name and returns the full path.
async fn write_sst(
&self,
file_name: &str,
iter: BatchIteratorPtr,
opts: WriteOptions,
) -> Result<()>;
) -> Result<String>;
}

pub type AccessLayerRef = Arc<dyn AccessLayer>;
Expand Down Expand Up @@ -159,15 +160,13 @@ impl AccessLayer for FsAccessLayer {
file_name: &str,
iter: BatchIteratorPtr,
opts: WriteOptions,
) -> Result<()> {
) -> Result<String> {
// Now we only supports parquet format. We may allow caller to specific sst format in
// WriteOptions in the future.
let writer = ParquetWriter::new(
&self.sst_file_path(file_name),
iter,
self.object_store.clone(),
);
let file_path = self.sst_file_path(file_name);
let writer = ParquetWriter::new(&file_path, iter, self.object_store.clone());

writer.write_sst(opts).await
writer.write_sst(opts).await?;
Ok(file_path)
}
}
12 changes: 6 additions & 6 deletions src/storage/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ use crate::metadata::ColumnMetadata;
use crate::sst;

/// Parquet sst writer.
pub struct ParquetWriter {
file_name: String,
pub struct ParquetWriter<'a> {
file_name: &'a str,
iter: BatchIteratorPtr,
object_store: ObjectStore,
}

impl ParquetWriter {
impl<'a> ParquetWriter<'a> {
pub fn new(
file_name: &str,
file_name: &'a str,
iter: BatchIteratorPtr,
object_store: ObjectStore,
) -> ParquetWriter {
ParquetWriter {
file_name: file_name.to_string(),
file_name,
iter,
object_store,
}
Expand All @@ -48,7 +48,7 @@ impl ParquetWriter {
/// in config will be written to a single row group.
async fn write_rows(mut self, extra_meta: Option<HashMap<String, String>>) -> Result<()> {
let schema = memtable_schema_to_arrow_schema(self.iter.schema());
let object = self.object_store.object(&self.file_name);
let object = self.object_store.object(self.file_name);

// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
// but in s3/azblob backend the Content-Length field of HTTP request is set
Expand Down

0 comments on commit 46cd39e

Please sign in to comment.