Skip to content

Commit

Permalink
fix(cubestore): allow to disable top-k with env var
Browse files Browse the repository at this point in the history
Also adds more verbose logs for a panic that we hit in practice.
This also reverts commits:
  - ca668ea: Revert "chore(cubestore): temporarily disable top-k (#2559)"
  - eebb2fa: Revert "chore(cubestore): revert streaming once again: ignore topk tests"
  • Loading branch information
ilya-biryukov committed Apr 15, 2021
1 parent 47394c1 commit 9c2838a
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 12 deletions.
22 changes: 21 additions & 1 deletion rust/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub trait ConfigObj: DIService {
fn max_ingestion_data_frames(&self) -> usize;

fn upload_to_remote(&self) -> bool;

fn enable_topk(&self) -> bool;
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -237,6 +239,7 @@ pub struct ConfigObjImpl {
pub server_name: String,
pub max_ingestion_data_frames: usize,
pub upload_to_remote: bool,
pub enable_topk: bool,
}

crate::di_service!(ConfigObjImpl, [ConfigObj]);
Expand Down Expand Up @@ -326,6 +329,10 @@ impl ConfigObj for ConfigObjImpl {
fn upload_to_remote(&self) -> bool {
self.upload_to_remote
}

fn enable_topk(&self) -> bool {
self.enable_topk
}
}

lazy_static! {
Expand All @@ -338,6 +345,17 @@ lazy_static! {
tokio::sync::RwLock::new(false);
}

fn env_bool(name: &str, default: bool) -> bool {
env::var(name)
.ok()
.map(|x| match x.as_str() {
"0" => false,
"1" => true,
_ => panic!("expected '0' or '1' for '{}', found '{}'", name, &x),
})
.unwrap_or(default)
}

impl Config {
pub fn default() -> Config {
let query_timeout = env::var("CUBESTORE_QUERY_TIMEOUT")
Expand Down Expand Up @@ -422,6 +440,7 @@ impl Config {
.ok()
.unwrap_or("localhost".to_string()),
upload_to_remote: !env::var("CUBESTORE_NO_UPLOAD").ok().is_some(),
enable_topk: env_bool("CUBESTORE_ENABLE_TOPK", true),
}),
}
}
Expand Down Expand Up @@ -461,6 +480,7 @@ impl Config {
connection_timeout: 60,
server_name: "localhost".to_string(),
upload_to_remote: true,
enable_topk: true,
}),
}
}
Expand Down Expand Up @@ -753,7 +773,7 @@ impl Config {

self.injector
.register_typed::<dyn QueryPlanner, _, _, _>(async move |i| {
QueryPlannerImpl::new(i.get_service_typed().await)
QueryPlannerImpl::new(i.get_service_typed().await, i.get_service_typed().await)
})
.await;

Expand Down
19 changes: 14 additions & 5 deletions rust/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ pub use topk::MIN_TOPK_STREAM_ROWS;
pub mod udfs;

use crate::config::injection::DIService;
use crate::config::ConfigObj;
use crate::metastore::table::TablePath;
use crate::metastore::{MetaStore, MetaStoreTable};
use crate::queryplanner::planning::choose_index;
use crate::queryplanner::planning::choose_index_ext;
use crate::queryplanner::query_executor::batch_to_dataframe;
use crate::queryplanner::serialized_plan::SerializedPlan;
use crate::queryplanner::udfs::aggregate_udf_by_kind;
Expand Down Expand Up @@ -55,6 +56,7 @@ crate::di_service!(MockQueryPlanner, [QueryPlanner]);

pub struct QueryPlannerImpl {
meta_store: Arc<dyn MetaStore>,
config: Arc<dyn ConfigObj>,
}

crate::di_service!(QueryPlannerImpl, [QueryPlanner]);
Expand All @@ -81,8 +83,12 @@ impl QueryPlanner for QueryPlannerImpl {
trace!("Logical Plan: {:#?}", &logical_plan);

let plan = if SerializedPlan::is_data_select_query(&logical_plan) {
let (logical_plan, index_snapshots) =
choose_index(&logical_plan, &self.meta_store.as_ref()).await?;
let (logical_plan, index_snapshots) = choose_index_ext(
&logical_plan,
&self.meta_store.as_ref(),
self.config.enable_topk(),
)
.await?;
QueryPlan::Select(SerializedPlan::try_new(logical_plan, index_snapshots).await?)
} else {
QueryPlan::Meta(logical_plan)
Expand Down Expand Up @@ -113,8 +119,11 @@ impl QueryPlanner for QueryPlannerImpl {
}

impl QueryPlannerImpl {
pub fn new(meta_store: Arc<dyn MetaStore>) -> Arc<QueryPlannerImpl> {
Arc::new(QueryPlannerImpl { meta_store })
pub fn new(
meta_store: Arc<dyn MetaStore>,
config: Arc<dyn ConfigObj>,
) -> Arc<QueryPlannerImpl> {
Arc::new(QueryPlannerImpl { meta_store, config })
}
}

Expand Down
19 changes: 15 additions & 4 deletions rust/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,18 @@ use crate::queryplanner::topk::{materialize_topk, plan_topk, ClusterAggregateTop
use crate::queryplanner::CubeTableLogical;
use crate::CubeError;

#[cfg(test)]
pub async fn choose_index(
p: &LogicalPlan,
metastore: &dyn PlanIndexStore,
) -> Result<(LogicalPlan, Vec<IndexSnapshot>), DataFusionError> {
choose_index_ext(p, metastore, true).await
}

pub async fn choose_index_ext(
p: &LogicalPlan,
metastore: &dyn PlanIndexStore,
enable_topk: bool,
) -> Result<(LogicalPlan, Vec<IndexSnapshot>), DataFusionError> {
// Prepare information to choose the index.
let mut collector = CollectConstraints::default();
Expand Down Expand Up @@ -91,6 +100,7 @@ pub async fn choose_index(
let mut r = ChooseIndex {
chosen_indices: &indices,
next_index: 0,
enable_topk,
};
let plan = rewrite_plan(p, &(), &mut r)?;
assert_eq!(r.next_index, indices.len());
Expand Down Expand Up @@ -246,6 +256,7 @@ impl PlanRewriter for CollectConstraints {
struct ChooseIndex<'a> {
next_index: usize,
chosen_indices: &'a [IndexSnapshot],
enable_topk: bool,
}

impl PlanRewriter for ChooseIndex<'_> {
Expand All @@ -257,9 +268,10 @@ impl PlanRewriter for ChooseIndex<'_> {
_: &Self::Context,
) -> Result<LogicalPlan, DataFusionError> {
let p = self.choose_table_index(n)?;
let p = pull_up_cluster_send(p)?;
// TODO: fix and re-enable.
// let p = materialize_topk(p)?;
let mut p = pull_up_cluster_send(p)?;
if self.enable_topk {
p = materialize_topk(p)?;
}
Ok(p)
}
}
Expand Down Expand Up @@ -802,7 +814,6 @@ pub mod tests {
}

#[tokio::test]
#[ignore]
pub async fn test_materialize_topk() {
let indices = default_indices();
let plan = initial_plan(
Expand Down
6 changes: 5 additions & 1 deletion rust/cubestore/src/queryplanner/topk/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,11 @@ fn cmp_same_types(l: &ScalarValue, r: &ScalarValue, nulls_first: bool, asc: bool
(ScalarValue::List(_, _), ScalarValue::List(_, _)) => {
panic!("list as accumulator result is not supported")
}
(_, _) => panic!("unhandled types in comparison"),
(l, r) => panic!(
"unhandled types in comparison: {} and {}",
l.get_datatype(),
r.get_datatype()
),
};
if asc {
o
Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3000,7 +3000,6 @@ mod tests {
}

#[tokio::test]
#[ignore]
async fn planning_inplace_aggregate2() {
Config::run_test("planning_inplace_aggregate2", async move |services| {
let service = services.sql_service;
Expand Down

0 comments on commit 9c2838a

Please sign in to comment.