Skip to content

Commit

Permalink
feat: Implement write stall (#90)
Browse files Browse the repository at this point in the history
* feat: Implement write stall

* chore: Update comments
  • Loading branch information
evenyag committed Jul 18, 2022
1 parent e62fa89 commit 259d20e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 27 deletions.
9 changes: 9 additions & 0 deletions src/storage/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@ impl Context {
Context::default()
}

/// Marks this context as cancelled.
///
/// Job accessing this context should check `is_cancelled()` and exit if it
/// returns true.
pub fn cancel(&self) {
self.inner.cancelled.store(false, Ordering::Relaxed);
}

/// Returns true if this context is cancelled.
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::Relaxed)
}
Expand All @@ -41,12 +46,16 @@ pub struct JobHandle {
}

impl JobHandle {
/// Waits until this background job is finished.
pub async fn join(self) -> Result<()> {
self.handle.await.context(error::JoinTaskSnafu)?
}

/// Cancels this background job gracefully and waits until it exits.
#[allow(unused)]
pub async fn cancel(self) -> Result<()> {
// Tokio also provides an [`abort()`](https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort)
// method to abort current task, consider using it if we need to abort a background job.
self.ctx.cancel();

self.join().await
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::error::Result;
use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::schema::MemtableSchema;
pub use crate::memtable::version::{FreezeError, MemtableSet, MemtableVersion};
pub use crate::memtable::version::{MemtableSet, MemtableVersion};

/// Unique id for memtables under same region.
pub type MemtableId = u32;
Expand Down
21 changes: 7 additions & 14 deletions src/storage/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use common_time::RangeMillis;
use snafu::Snafu;

use crate::flush::MemtableWithMeta;
use crate::memtable::{MemtableId, MemtableRef};

#[derive(Debug, Snafu)]
#[snafu(display("Failed to freeze mutable memtable, immutable memtable already exists"))]
pub struct FreezeError;

/// A version of all memtables.
///
/// This structure is immutable now.
Expand All @@ -31,18 +26,16 @@ impl MemtableVersion {
&self.mutable
}

/// Clone current `MemtableVersion`, try to freeze mutable memtables in the new
/// version then returns that version.
///
/// Returns `Err` if immutable memtables already exists.
pub fn try_freeze_mutable(&self) -> Result<MemtableVersion, FreezeError> {
/// Clone current memtable version and freeze its mutable memtables, which moves
/// all mutable memtables to immutable memtable list.
pub fn freeze_mutable(&self) -> MemtableVersion {
let mut immutables = self.immutables.clone();
immutables.push(Arc::new(self.mutable.clone()));

Ok(MemtableVersion {
MemtableVersion {
mutable: MemtableSet::new(),
immutables,
})
}
}

pub fn mutable_bytes_allocated(&self) -> usize {
Expand Down Expand Up @@ -353,7 +346,7 @@ mod tests {
assert!(v3.memtables_to_flush().1.is_empty());

// Try to freeze s1, s2
let v4 = v3.try_freeze_mutable().unwrap();
let v4 = v3.freeze_mutable();
assert_ne!(v1, v4);
assert_ne!(v2, v4);
assert_ne!(v3, v4);
Expand All @@ -373,7 +366,7 @@ mod tests {
assert_eq!(v4.immutables, v5.immutables);

// Try to freeze s4
let v6 = v5.try_freeze_mutable().unwrap();
let v6 = v5.freeze_mutable();
assert_eq!(v6.immutables.len(), 2);
assert_eq!(v6.immutables[0], Arc::new(s3));
assert_eq!(v6.immutables[1], Arc::new(s4.clone()));
Expand Down
22 changes: 16 additions & 6 deletions src/storage/src/region/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,24 @@ impl WriterInner {
manifest: &RegionManifest,
) -> Result<()> {
let version_control = &shared.version_control;
if version_control.try_freeze_mutable().is_err() {
// TODO(yingwen): [flush] Write stall, wait for last flush.
unimplemented!()
// Freeze all mutable memtables so we can flush them later.
version_control.freeze_mutable();

if let Some(flush_handle) = self.flush_handle.take() {
// Previous flush job is incomplete, wait util it is finished (write stall).
// However the last flush job may fail, in which case, we just return error
// and abort current write request. The flush handle is left empty, so the next
// time we still have chance to trigger a new flush.
flush_handle.join().await.map_err(|e| {
logging::error!(
"Previous flush job failed, region: {}, err: {}",
shared.name,
e
);
e
})?;
}

// TODO(yingwen): [flush] Flush may fail, so we need to flush both old and new immutable memtables.
assert!(self.flush_handle.is_none());

let current_version = version_control.current();
let (max_memtable_id, mem_to_flush) = current_version.memtables().memtables_to_flush();

Expand Down
10 changes: 4 additions & 6 deletions src/storage/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;
use store_api::manifest::ManifestVersion;
use store_api::storage::{SchemaRef, SequenceNumber};

use crate::memtable::{FreezeError, MemtableId, MemtableSchema, MemtableSet, MemtableVersion};
use crate::memtable::{MemtableId, MemtableSchema, MemtableSet, MemtableVersion};
use crate::metadata::{RegionMetadata, RegionMetadataRef};
use crate::sst::LevelMetas;
use crate::sst::{FileHandle, FileMeta};
Expand Down Expand Up @@ -84,17 +84,15 @@ impl VersionControl {
version_to_update.commit();
}

/// Try to freeze mutable memtables.
pub fn try_freeze_mutable(&self) -> Result<(), FreezeError> {
/// Freeze all mutable memtables.
pub fn freeze_mutable(&self) {
let mut version_to_update = self.version.lock();

let memtable_version = version_to_update.memtables();
let freezed = memtable_version.try_freeze_mutable()?;
let freezed = memtable_version.freeze_mutable();
version_to_update.memtables = Arc::new(freezed);

version_to_update.commit();

Ok(())
}

pub fn apply_edit(&self, edit: VersionEdit) {
Expand Down

0 comments on commit 259d20e

Please sign in to comment.