Skip to content

Commit

Permalink
fix(cubestore): Fix write metastore locking
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 2, 2021
1 parent 139c8f6 commit cbbacce
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::CubeError;
use arrow::datatypes::TimeUnit::Microsecond;
use arrow::datatypes::{DataType, Field};
use chunks::ChunkRocksTable;
use core::fmt;
use core::{fmt, mem};
use futures::future::join_all;
use index::{IndexRocksIndex, IndexRocksTable};
use itertools::Itertools;
Expand Down Expand Up @@ -1110,6 +1110,7 @@ trait RocksTable: Debug + Send + Sync {
.get(seq_key.to_bytes())?
.map(|v| Cursor::new(v).read_u64::<BigEndian>().unwrap());

// TODO revert back merge operator if locking works
let next_seq = self
.mem_seq()
.next_seq(self.table_id(), before_merge.unwrap_or(0))?;
Expand Down Expand Up @@ -1520,17 +1521,18 @@ impl RocksMetaStore {
+ 'static,
R: Send + 'static,
{
let db = self.db.write().await.clone();
let db = self.db.write().await;
let mem_seq = MemorySequence {
seq_store: self.seq_store.clone(),
};
let db_to_send = db.clone();
let (spawn_res, events) =
tokio::task::spawn_blocking(move || -> Result<(R, Vec<MetaStoreEvent>), CubeError> {
let mut batch = BatchPipe::new(db.as_ref());
let snapshot = db.snapshot();
let mut batch = BatchPipe::new(db_to_send.as_ref());
let snapshot = db_to_send.snapshot();
let res = f(
DbTableRef {
db: db.as_ref(),
db: db_to_send.as_ref(),
snapshot: &snapshot,
mem_seq,
},
Expand All @@ -1541,6 +1543,8 @@ impl RocksMetaStore {
})
.await??;

mem::drop(db);

self.write_notify.notify();

for listener in self.listeners.read().await.clone().iter_mut() {
Expand Down Expand Up @@ -1733,20 +1737,23 @@ impl RocksMetaStore {
F: for<'a> FnOnce(DbTableRef<'a>) -> R + Send + 'static,
R: Send + 'static,
{
let db = self.db.read().await.clone();
let db = self.db.read().await;
let mem_seq = MemorySequence {
seq_store: self.seq_store.clone(),
};
tokio::task::spawn_blocking(move || {
let snapshot = db.snapshot();
let db_to_send = db.clone();
let res = tokio::task::spawn_blocking(move || {
let snapshot = db_to_send.snapshot();
f(DbTableRef {
db: db.as_ref(),
db: db_to_send.as_ref(),
snapshot: &snapshot,
mem_seq,
})
})
.await
.unwrap()
}).await.unwrap();

mem::drop(db);

res
}

fn check_if_exists(name: &String, existing_keys_len: usize) -> Result<(), CubeError> {
Expand Down

0 comments on commit cbbacce

Please sign in to comment.