Skip to content

Commit

Permalink
fix(cubestore): skip WAL, partition data directly during ingestion (#…
Browse files Browse the repository at this point in the history
…2002)

On my machine, this produces 1.8x improvement in ingestion speed.
Wins in prod environment should be higher, since WAL uploads are more
expensive.
  • Loading branch information
ilya-biryukov committed Feb 8, 2021
1 parent 90f2365 commit 5442fad
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 104 deletions.
8 changes: 8 additions & 0 deletions rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,14 @@ mod tests {

#[async_trait]
impl ChunkDataStore for MockChunkStore {
async fn partition_data(
&self,
_table_id: u64,
_data: DataFrame,
) -> Result<Vec<u64>, CubeError> {
unimplemented!()
}

async fn partition(&self, _wal_id: u64) -> Result<(), CubeError> {
unimplemented!()
}
Expand Down
15 changes: 13 additions & 2 deletions rust/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::cluster::{ClusterImpl, ClusterMetaStoreClient};
use crate::import::limits::ConcurrencyLimits;
use crate::import::ImportServiceImpl;
use crate::metastore::{MetaStore, MetaStoreRpcClient, RocksMetaStore};
use crate::queryplanner::query_executor::{QueryExecutor, QueryExecutorImpl};
Expand Down Expand Up @@ -153,6 +154,7 @@ pub struct ConfigObjImpl {
pub download_concurrency: u64,
pub connection_timeout: u64,
pub server_name: String,
pub max_ingestion_data_frames: usize,
}

impl ConfigObj for ConfigObjImpl {
Expand Down Expand Up @@ -304,6 +306,10 @@ impl Config {
metastore_remote_address: env::var("CUBESTORE_META_ADDR").ok(),
upload_concurrency: 4,
download_concurrency: 8,
max_ingestion_data_frames: env::var("CUBESTORE_MAX_DATA_FRAMES")
.ok()
.map(|v| v.parse::<usize>().unwrap())
.unwrap_or(4),
wal_split_threshold: env::var("CUBESTORE_WAL_SPLIT_THRESHOLD")
.ok()
.map(|v| v.parse::<u64>().unwrap())
Expand Down Expand Up @@ -345,6 +351,7 @@ impl Config {
metastore_remote_address: None,
upload_concurrency: 4,
download_concurrency: 8,
max_ingestion_data_frames: 4,
wal_split_threshold: 262144,
connection_timeout: 60,
server_name: "localhost".to_string(),
Expand Down Expand Up @@ -526,10 +533,12 @@ impl Config {
remote_fs.clone(),
self.config_obj.clone(),
);
let concurrency_limits = ConcurrencyLimits::new(self.config_obj.max_ingestion_data_frames);
let import_service = ImportServiceImpl::new(
meta_store.clone(),
wal_store.clone(),
chunk_store.clone(),
self.config_obj.clone(),
concurrency_limits.clone(),
);
let query_planner = QueryPlannerImpl::new(meta_store.clone());
let query_executor = Arc::new(QueryExecutorImpl);
Expand All @@ -548,10 +557,12 @@ impl Config {

let sql_service = SqlServiceImpl::new(
meta_store.clone(),
wal_store.clone(),
chunk_store.clone(),
concurrency_limits,
query_planner.clone(),
query_executor.clone(),
cluster.clone(),
self.config_obj.wal_split_threshold as usize,
);
let scheduler = SchedulerImpl::new(
meta_store.clone(),
Expand Down
39 changes: 39 additions & 0 deletions rust/cubestore/src/import/limits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::CubeError;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};

/// Use to limit memory usage on parallel operations. The object has reference semantics, i.e.
/// cloning will share the same underlying limits.
///
/// Example:
/// ```ignore
/// use cubestore::import::limits::ConcurrencyLimits;
/// async fn do_work(limits: &ConcurrencyLimits) {
/// loop // run processing in parallel, but limit the number of active data frames.
/// {
/// let permit = limits.acquire_data_frame().await?;
/// // ... read data frame
/// tokio::spawn(async move || {
/// // ... process data frame
/// std::mem::drop(permit)
/// })
/// }
/// }
/// ```
#[derive(Clone)]
pub struct ConcurrencyLimits {
active_data_frames: Arc<Semaphore>,
}

impl ConcurrencyLimits {
pub fn new(max_data_frames: usize) -> ConcurrencyLimits {
assert!(1 <= max_data_frames, "no data frames can be processed");
ConcurrencyLimits {
active_data_frames: Arc::new(Semaphore::new(max_data_frames)),
}
}

pub async fn acquire_data_frame(&self) -> Result<OwnedSemaphorePermit, CubeError> {
Ok(self.active_data_frames.clone().acquire_owned().await?)
}
}
91 changes: 75 additions & 16 deletions rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
pub mod limits;

use crate::config::ConfigObj;
use crate::metastore::is_valid_hll;
use crate::import::limits::ConcurrencyLimits;
use crate::metastore::table::Table;
use crate::metastore::{is_valid_hll, IdRow};
use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore};
use crate::sql::timestamp_from_string;
use crate::store::{DataFrame, WALDataStore};
use crate::store::{ChunkDataStore, DataFrame};
use crate::sys::malloc::trim_allocs;
use crate::table::{Row, TableValue};
use crate::CubeError;
Expand All @@ -22,6 +26,7 @@ use std::sync::Arc;
use tokio::fs::{File, OpenOptions};
use tokio::io;
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::LinesStream;

impl ImportFormat {
Expand Down Expand Up @@ -207,20 +212,23 @@ pub trait ImportService: Send + Sync {

pub struct ImportServiceImpl {
meta_store: Arc<dyn MetaStore>,
wal_store: Arc<dyn WALDataStore>,
chunk_store: Arc<dyn ChunkDataStore>,
config_obj: Arc<dyn ConfigObj>,
limits: ConcurrencyLimits,
}

impl ImportServiceImpl {
pub fn new(
meta_store: Arc<dyn MetaStore>,
wal_store: Arc<dyn WALDataStore>,
chunk_store: Arc<dyn ChunkDataStore>,
config_obj: Arc<dyn ConfigObj>,
limits: ConcurrencyLimits,
) -> Arc<ImportServiceImpl> {
Arc::new(ImportServiceImpl {
meta_store,
wal_store,
chunk_store,
config_obj,
limits,
})
}
}
Expand Down Expand Up @@ -287,31 +295,82 @@ impl ImportService for ImportServiceImpl {

defer!(trim_allocs());

let mut ingestion = Ingestion::new(
self.meta_store.clone(),
self.chunk_store.clone(),
self.limits.clone(),
table.clone(),
);
let mut rows = Vec::new();
while let Some(row) = row_stream.next().await {
if let Some(row) = row? {
rows.push(row);
if rows.len() >= self.config_obj.wal_split_threshold() as usize {
let mut to_add = Vec::new();
mem::swap(&mut rows, &mut to_add);
self.wal_store
.add_wal(
table.clone(),
DataFrame::new(table.get_row().get_columns().clone(), to_add),
)
.await?;
ingestion.queue_data_frame(to_add).await?;
}
}
}

mem::drop(temp_files);

self.wal_store
.add_wal(
table.clone(),
DataFrame::new(table.get_row().get_columns().clone(), rows),
)
ingestion.queue_data_frame(rows).await?;
ingestion.wait_completion().await?;
}

Ok(())
}
}

/// Handles row-based data ingestion, e.g. on CSV import and SQL insert.
pub struct Ingestion {
meta_store: Arc<dyn MetaStore>,
chunk_store: Arc<dyn ChunkDataStore>,
limits: ConcurrencyLimits,
table: IdRow<Table>,

partition_jobs: Vec<JoinHandle<Result<(), CubeError>>>,
}

impl Ingestion {
pub fn new(
meta_store: Arc<dyn MetaStore>,
chunk_store: Arc<dyn ChunkDataStore>,
limits: ConcurrencyLimits,
table: IdRow<Table>,
) -> Ingestion {
Ingestion {
meta_store,
chunk_store,
limits,
table,
partition_jobs: Vec::new(),
}
}

pub async fn queue_data_frame(&mut self, rows: Vec<Row>) -> Result<(), CubeError> {
let active_data_frame = self.limits.acquire_data_frame().await?;

let meta_store = self.meta_store.clone();
let chunk_store = self.chunk_store.clone();
let columns = self.table.get_row().get_columns().clone().clone();
let table_id = self.table.get_id();
self.partition_jobs.push(tokio::spawn(async move {
let new_chunks = chunk_store
.partition_data(table_id, DataFrame::new(columns, rows))
.await?;
std::mem::drop(active_data_frame);
meta_store.activate_chunks(table_id, new_chunks).await?;
Ok(())
}));

Ok(())
}

pub async fn wait_completion(self) -> Result<(), CubeError> {
for j in self.partition_jobs {
j.await??;
}

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,9 @@ impl From<tempfile::PathPersistError> for CubeError {
return CubeError::from_error(v);
}
}

impl From<tokio::sync::AcquireError> for CubeError {
fn from(v: tokio::sync::AcquireError) -> Self {
return CubeError::from_error(v);
}
}
50 changes: 44 additions & 6 deletions rust/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,11 @@ pub trait MetaStore: Send + Sync {
uploaded_ids: Vec<u64>,
index_count: u64,
) -> Result<(), CubeError>;
async fn activate_chunks(
&self,
table_id: u64,
uploaded_chunk_ids: Vec<u64>,
) -> Result<(), CubeError>;
async fn is_chunk_used(&self, chunk_id: u64) -> Result<bool, CubeError>;
async fn delete_chunk(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError>;

Expand Down Expand Up @@ -1979,6 +1984,21 @@ impl RocksMetaStore {

Ok(chunks)
}

// Must be run under write_operation(). Returns activated row count.
fn activate_chunks_impl(
db_ref: DbTableRef,
batch_pipe: &mut BatchPipe,
uploaded_chunk_ids: &[u64],
) -> Result<u64, CubeError> {
let table = ChunkRocksTable::new(db_ref.clone());
let mut activated_row_count = 0;
for id in uploaded_chunk_ids {
activated_row_count += table.get_row_or_not_found(*id)?.get_row().get_row_count();
table.update_with_fn(*id, |row| row.set_uploaded(true), batch_pipe)?;
}
return Ok(activated_row_count);
}
}

#[async_trait]
Expand Down Expand Up @@ -2656,16 +2676,12 @@ impl MetaStore for RocksMetaStore {
);
self.write_operation(move |db_ref, batch_pipe| {
let wal_table = WALRocksTable::new(db_ref.clone());
let table = ChunkRocksTable::new(db_ref.clone());
let mut activated_row_count = 0;

let deactivated_row_count = wal_table.get_row_or_not_found(wal_id_to_delete)?.get_row().get_row_count();
wal_table.delete(wal_id_to_delete, batch_pipe)?;

for id in uploaded_ids.iter() {
activated_row_count += table.get_row_or_not_found(*id)?.get_row().get_row_count();
table.update_with_fn(*id, |row| row.set_uploaded(true), batch_pipe)?;
}
let activated_row_count = Self::activate_chunks_impl(db_ref, batch_pipe, &uploaded_ids)?;

if activated_row_count != deactivated_row_count * index_count {
return Err(CubeError::internal(format!(
"Deactivated WAL row count ({}) doesn't match activated row count ({}) during swap of ({}) to ({}) chunks",
Expand All @@ -2680,6 +2696,28 @@ impl MetaStore for RocksMetaStore {
.await
}

async fn activate_chunks(
&self,
table_id: u64,
uploaded_chunk_ids: Vec<u64>,
) -> Result<(), CubeError> {
trace!(
"Activating chunks ({})",
uploaded_chunk_ids.iter().join(", ")
);
self.write_operation(move |db_ref, batch_pipe| {
TableRocksTable::new(db_ref.clone()).update_with_fn(
table_id,
|t| t.update_has_data(true),
batch_pipe,
)?;
Self::activate_chunks_impl(db_ref, batch_pipe, &uploaded_chunk_ids)?;
Ok(())
})
.await?;
Ok(())
}

async fn swap_chunks(
&self,
deactivate_ids: Vec<u64>,
Expand Down

0 comments on commit 5442fad

Please sign in to comment.