Skip to content

Commit

Permalink
feat(cubestore): Decimal partition pruning (#4089)
Browse files Browse the repository at this point in the history
* feat(cubestore): Decimal partition pruning support

* chore(cubestore): CUBESTORE_DUMP_DIR env variable for debugging query dumps

* chore(cubestore): Do not fail on unsupported plan nodes
  • Loading branch information
paveltiunov committed Feb 15, 2022
1 parent 0ba18b7 commit c00efad
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 26 deletions.
36 changes: 28 additions & 8 deletions rust/cubestore/cubestore/src/config/mod.rs
Expand Up @@ -335,6 +335,8 @@ pub trait ConfigObj: DIService {
fn malloc_trim_every_secs(&self) -> u64;

fn max_cached_queries(&self) -> usize;

fn dump_dir(&self) -> &Option<PathBuf>;
}

#[derive(Debug, Clone)]
Expand All @@ -345,6 +347,7 @@ pub struct ConfigObjImpl {
pub compaction_chunks_count_threshold: u64,
pub wal_split_threshold: u64,
pub data_dir: PathBuf,
pub dump_dir: Option<PathBuf>,
pub store_provider: FileStoreProvider,
pub select_worker_pool_size: usize,
pub job_runners_count: usize,
Expand Down Expand Up @@ -489,6 +492,10 @@ impl ConfigObj for ConfigObjImpl {
fn max_cached_queries(&self) -> usize {
self.max_cached_queries
}

fn dump_dir(&self) -> &Option<PathBuf> {
&self.dump_dir
}
}

lazy_static! {
Expand Down Expand Up @@ -544,6 +551,9 @@ impl Config {
.ok()
.map(|v| PathBuf::from(v))
.unwrap_or(env::current_dir().unwrap().join(".cubestore").join("data")),
dump_dir: env::var("CUBESTORE_DUMP_DIR")
.ok()
.map(|v| PathBuf::from(v)),
partition_split_threshold: env_parse(
"CUBESTORE_PARTITION_SPLIT_THRESHOLD",
1048576 * 2,
Expand Down Expand Up @@ -637,6 +647,7 @@ impl Config {
data_dir: env::current_dir()
.unwrap()
.join(format!("{}-local-store", name)),
dump_dir: None,
partition_split_threshold: 20,
max_partition_split_threshold: 20,
compaction_chunks_count_threshold: 1,
Expand Down Expand Up @@ -896,14 +907,23 @@ impl Config {
self.injector
.register_typed_with_default::<dyn MetaStore, RocksMetaStore, _, _>(
async move |i| {
let meta_store = RocksMetaStore::load_from_remote(
&path,
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
i.get_service("original_remote_fs").await,
i.get_service_typed::<dyn ConfigObj>().await,
)
.await
.unwrap();
let config = i.get_service_typed::<dyn ConfigObj>().await;
// TODO metastore works with non queue remote fs as it requires loops to be started prior to load_from_remote call
let original_remote_fs = i.get_service("original_remote_fs").await;
let meta_store = if let Some(dump_dir) = config.clone().dump_dir() {
RocksMetaStore::load_from_dump(
&path,
dump_dir,
original_remote_fs,
config,
)
.await
.unwrap()
} else {
RocksMetaStore::load_from_remote(&path, original_remote_fs, config)
.await
.unwrap()
};
meta_store.add_listener(event_sender).await;
meta_store
},
Expand Down
28 changes: 28 additions & 0 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Expand Up @@ -2033,6 +2033,34 @@ impl RocksMetaStore {
Self::with_listener(path, vec![], remote_fs, config)
}

pub async fn load_from_dump(
path: impl AsRef<Path>,
dump_path: impl AsRef<Path>,
remote_fs: Arc<dyn RemoteFs>,
config: Arc<dyn ConfigObj>,
) -> Result<Arc<RocksMetaStore>, CubeError> {
if !fs::metadata(path.as_ref()).await.is_ok() {
let mut backup =
rocksdb::backup::BackupEngine::open(&BackupEngineOptions::default(), dump_path)?;
backup.restore_from_latest_backup(
&path,
&path,
&rocksdb::backup::RestoreOptions::default(),
)?;
} else {
info!(
"Using existing metastore in {}",
path.as_ref().as_os_str().to_string_lossy()
);
}

let meta_store = Self::new(path, remote_fs, config);

RocksMetaStore::check_all_indexes(&meta_store).await?;

Ok(meta_store)
}

pub async fn load_from_remote(
path: impl AsRef<Path>,
remote_fs: Arc<dyn RemoteFs>,
Expand Down
43 changes: 43 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/partition_filter.rs
@@ -1,4 +1,5 @@
use crate::table::{cmp_same_types, TableValue};
use crate::util::decimal::Decimal;
use arrow::datatypes::{DataType, Schema};
use datafusion::logical_plan::{Column, Expr, Operator};
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -377,6 +378,7 @@ impl Builder<'_> {
}
match t {
t if Self::is_signed_int(t) => Self::extract_signed_int(v),
DataType::Int64Decimal(scale) => Self::extract_decimal(v, *scale),
DataType::Boolean => Self::extract_bool(v),
DataType::Utf8 => Self::extract_string(v),
_ => None,
Expand Down Expand Up @@ -418,6 +420,47 @@ impl Builder<'_> {
Some(TableValue::String(s.unwrap()))
}

fn extract_decimal(v: &ScalarValue, scale: usize) -> Option<TableValue> {
let decimal_value = match v {
ScalarValue::Int64Decimal(v, input_scale) => {
Builder::int_to_decimal_value(v.unwrap(), scale as i64 - (*input_scale as i64))
}
ScalarValue::Int16(v) => Builder::int_to_decimal_value(v.unwrap() as i64, scale as i64),
ScalarValue::Int32(v) => Builder::int_to_decimal_value(v.unwrap() as i64, scale as i64),
ScalarValue::Int64(v) => Builder::int_to_decimal_value(v.unwrap() as i64, scale as i64),
ScalarValue::Float64(v) => {
Builder::int_to_decimal_value(v.unwrap() as i64, scale as i64)
}
ScalarValue::Float32(v) => {
Builder::int_to_decimal_value(v.unwrap() as i64, scale as i64)
}
ScalarValue::Utf8(s) | ScalarValue::LargeUtf8(s) => {
match s.as_ref().unwrap().parse::<i64>() {
Ok(v) => Builder::int_to_decimal_value(v, scale as i64),
Err(_) => {
log::error!("could not convert string to int: {}", s.as_ref().unwrap());
return None;
}
}
}
_ => return None, // TODO: casts.
};
Some(decimal_value)
}

fn int_to_decimal_value(mut value: i64, diff_scale: i64) -> TableValue {
if diff_scale > 0 {
for _ in 0..diff_scale {
value *= 10;
}
} else if diff_scale < 0 {
for _ in 0..-diff_scale {
value /= 10;
}
}
TableValue::Decimal(Decimal::new(value))
}

fn extract_signed_int(v: &ScalarValue) -> Option<TableValue> {
let ival = match v {
ScalarValue::Int8(v) => v.unwrap() as i64,
Expand Down
31 changes: 29 additions & 2 deletions rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs
Expand Up @@ -9,11 +9,14 @@ use datafusion::physical_plan::hash_aggregate::{
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::merge_join::MergeJoinExec;
use datafusion::physical_plan::merge_sort::{MergeReSortExec, MergeSortExec};
use datafusion::physical_plan::merge_sort::{
LastRowByUniqueKeyExec, MergeReSortExec, MergeSortExec,
};
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use itertools::{repeat_n, Itertools};

use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec;
use crate::queryplanner::planning::{ClusterSendNode, WorkerExec};
use crate::queryplanner::query_executor::{ClusterSendExec, CubeTable, CubeTableExec};
use crate::queryplanner::serialized_plan::{IndexSnapshot, RowRange};
Expand All @@ -22,10 +25,14 @@ use crate::queryplanner::topk::{AggregateTopKExec, SortColumn};
use crate::queryplanner::CubeTableLogical;
use datafusion::cube_ext::join::CrossJoinExec;
use datafusion::cube_ext::joinagg::CrossJoinAggExec;
use datafusion::cube_ext::rolling::RollingWindowAggExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::skip::SkipExec;
use datafusion::physical_plan::union::UnionExec;

#[derive(Default, Clone, Copy)]
Expand Down Expand Up @@ -385,8 +392,28 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
}
} else if let Some(_) = a.downcast_ref::<UnionExec>() {
*out += "Union";
} else if let Some(_) = a.downcast_ref::<FilterByKeyRangeExec>() {
*out += "FilterByKeyRange";
} else if let Some(p) = a.downcast_ref::<ParquetExec>() {
*out += &format!(
"ParquetScan, files: {}",
p.partitions()
.iter()
.map(|p| p.filenames.iter())
.flatten()
.join(",")
);
} else if let Some(_) = a.downcast_ref::<SkipExec>() {
*out += "SkipRows";
} else if let Some(_) = a.downcast_ref::<RollingWindowAggExec>() {
*out += "RollingWindowAgg";
} else if let Some(_) = a.downcast_ref::<LastRowByUniqueKeyExec>() {
*out += "LastRowByUniqueKey";
} else if let Some(_) = a.downcast_ref::<MemoryExec>() {
*out += "MemoryScan";
} else {
panic!("unhandled ExecutionPlan: {:?}", p);
let to_string = format!("{:?}", p);
*out += &to_string.split(" ").next().unwrap_or(&to_string);
}

if o.show_output_hints {
Expand Down
49 changes: 33 additions & 16 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Expand Up @@ -7,6 +7,7 @@ use crate::metastore::{Column, ColumnType, IdRow, Index, Partition};
use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec;
use crate::queryplanner::optimizations::CubeQueryPlanner;
use crate::queryplanner::planning::get_worker_plan;
use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan};
use crate::queryplanner::serialized_plan::{IndexSnapshot, RowFilter, RowRange, SerializedPlan};
use crate::store::DataFrame;
use crate::table::{Row, TableValue, TimestampValue};
Expand Down Expand Up @@ -100,7 +101,10 @@ impl QueryExecutor for QueryExecutorImpl {
let (physical_plan, logical_plan) = self.router_plan(plan, cluster).await?;
let split_plan = physical_plan;

trace!("Router Query Physical Plan: {:#?}", &split_plan);
trace!(
"Router Query Physical Plan: {}",
pp_phys_plan(split_plan.as_ref())
);

let execution_time = SystemTime::now();

Expand All @@ -109,17 +113,27 @@ impl QueryExecutor for QueryExecutorImpl {
debug!("Query data processing time: {:?}", execution_time,);
app_metrics::DATA_QUERY_TIME_MS.report(execution_time.as_millis() as i64);
if execution_time.as_millis() > 200 {
warn!("Slow Query ({:?}):\n{:#?}", execution_time, logical_plan);
warn!(
"Slow Query ({:?}):\n{}",
execution_time,
pp_plan(&logical_plan)
);
debug!(
"Slow Query Physical Plan ({:?}): {:#?}",
execution_time, &split_plan
"Slow Query Physical Plan ({:?}): {}",
execution_time,
pp_phys_plan(split_plan.as_ref())
);
}
if results.is_err() {
error!("Error Query ({:?}):\n{:#?}", execution_time, logical_plan);
error!(
"Error Query Physical Plan ({:?}): {:#?}",
execution_time, &split_plan
"Error Query ({:?}):\n{}",
execution_time,
pp_plan(&logical_plan)
);
error!(
"Error Query Physical Plan ({:?}): {}",
execution_time,
pp_phys_plan(split_plan.as_ref())
);
}
Ok((split_plan.schema(), results?))
Expand Down Expand Up @@ -148,7 +162,10 @@ impl QueryExecutor for QueryExecutorImpl {
));
}

trace!("Partition Query Physical Plan: {:#?}", &worker_plan);
trace!(
"Partition Query Physical Plan: {}",
pp_phys_plan(worker_plan.as_ref())
);

let execution_time = SystemTime::now();
let results = collect(worker_plan.clone())
Expand All @@ -163,26 +180,26 @@ impl QueryExecutor for QueryExecutorImpl {
);
if execution_time.elapsed()?.as_millis() > 200 || results.is_err() {
warn!(
"Slow Partition Query ({:?}):\n{:#?}",
"Slow Partition Query ({:?}):\n{}",
execution_time.elapsed()?,
logical_plan
pp_plan(&logical_plan)
);
debug!(
"Slow Partition Query Physical Plan ({:?}): {:#?}",
"Slow Partition Query Physical Plan ({:?}): {}",
execution_time.elapsed()?,
&worker_plan
pp_phys_plan(worker_plan.as_ref())
);
}
if results.is_err() {
error!(
"Error Partition Query ({:?}):\n{:#?}",
"Error Partition Query ({:?}):\n{}",
execution_time.elapsed()?,
logical_plan
pp_plan(&logical_plan)
);
error!(
"Error Partition Query Physical Plan ({:?}): {:#?}",
"Error Partition Query Physical Plan ({:?}): {}",
execution_time.elapsed()?,
&worker_plan
pp_phys_plan(worker_plan.as_ref())
);
}
// TODO: stream results as they become available.
Expand Down

0 comments on commit c00efad

Please sign in to comment.