Skip to content

Commit

Permalink
feat(cubestore): Merge join support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov authored and ovr committed Dec 1, 2020
1 parent 676c2cc commit d08d8e3
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 52 deletions.
60 changes: 30 additions & 30 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Cluster for ClusterImpl {
async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result<Vec<RecordBatch>, CubeError> {
if self.server_name == node_name {
// TODO timeout config
timeout(Duration::from_secs(60), self.run_local_select(plan_node)).await?
timeout(Duration::from_secs(600), self.run_local_select(plan_node)).await?
} else {
unimplemented!()
}
Expand Down Expand Up @@ -191,7 +191,7 @@ impl JobRunner {
_ = self.notify.notified().fuse() => {
self.fetch_and_process().await
}
_ = time::delay_for(Duration::from_secs(60)) => {
_ = time::delay_for(Duration::from_secs(5)) => {
self.fetch_and_process().await
}
};
Expand Down Expand Up @@ -322,7 +322,7 @@ impl ClusterImpl {
pub async fn start_processing_loops(&self) {
if self.config_obj.select_worker_pool_size() > 0 {
let mut pool = self.select_process_pool.write().await;
*pool = Some(Arc::new(WorkerPool::new(self.config_obj.select_worker_pool_size(), Duration::from_secs(60))));
*pool = Some(Arc::new(WorkerPool::new(self.config_obj.select_worker_pool_size(), Duration::from_secs(600))));
}
for _ in 0..4 { // TODO number of job event loops
let job_runner = JobRunner {
Expand Down
17 changes: 14 additions & 3 deletions rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::merge_join::MergeJoinExec;
use datafusion::physical_plan::merge_sort::MergeSortExec;

#[automock]
#[async_trait]
Expand Down Expand Up @@ -334,9 +336,18 @@ impl CubeTable {
self.schema.clone()
};

let plan = Arc::new(MergeExec::new(Arc::new(
CubeTableExec { schema: projected_schema, partition_execs, index_snapshot: self.index_snapshot.clone() }
)));
let index = self.index_snapshot.index().get_row();
let sort_columns = (0..(index.sort_key_size() as usize)).map(|c| index.columns()[c].get_name().to_string()).take(projected_schema.fields().len()).collect::<Vec<_>>();
let plan: Arc<dyn ExecutionPlan> = if sort_columns.iter().all(|sort_column| projected_schema.index_of(sort_column).is_ok()) {
Arc::new(MergeSortExec::try_new(
Arc::new(CubeTableExec { schema: projected_schema, partition_execs, index_snapshot: self.index_snapshot.clone() }),
sort_columns
)?)
} else {
Arc::new(MergeExec::new(
Arc::new(CubeTableExec { schema: projected_schema, partition_execs, index_snapshot: self.index_snapshot.clone() })
))
};

Ok(plan)
}
Expand Down
6 changes: 3 additions & 3 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ impl SqlService for SqlServiceImpl {
let res = self.create_schema(name, if_not_exists).await?;
Ok(DataFrame::from(vec![res]))
}
CubeStoreStatement::Statement(Statement::CreateTable { name, columns, external, location, .. }) => {
CubeStoreStatement::CreateTable { create_table: Statement::CreateTable { name, columns, external, location, .. }, indexes } => {
let nv = &name.0;
if nv.len() != 2 {
return Err(CubeError::user(format!("Schema's name should be present in query (boo.table1). Your query was '{}'", q)));
}
let schema_name = &nv[0].value;
let table_name = &nv[1].value;

let res = self.create_table(schema_name.clone(), table_name.clone(), &columns, external, location, vec![]).await?;
let res = self.create_table(schema_name.clone(), table_name.clone(), &columns, external, location, indexes).await?;
Ok(DataFrame::from(vec![res]))
}
CubeStoreStatement::Statement(Statement::Drop { object_type, names, .. }) => {
Expand Down Expand Up @@ -614,7 +614,7 @@ mod tests {
};

let _ = service.exec_query("CREATE SCHEMA IF NOT EXISTS Foo").await.unwrap();
let _ = service.exec_query(&format!("CREATE TABLE Foo.Persons (id int, city text) LOCATION '{}'", path.as_os_str().to_string_lossy())).await.unwrap();
let _ = service.exec_query(&format!("CREATE TABLE Foo.Persons (id int, city text) INDEX persons_city (city, id) LOCATION '{}'", path.as_os_str().to_string_lossy())).await.unwrap();

let result = service.exec_query("SELECT count(*) as cnt from Foo.Persons").await.unwrap();
assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::Int(2)]));
Expand Down
48 changes: 35 additions & 13 deletions rust/cubestore/src/sql/parser.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use sqlparser::dialect::Dialect;
use sqlparser::ast::{ObjectName, Statement as SQLStatement};
use sqlparser::parser::{Parser, ParserError};
use sqlparser::parser::{Parser, ParserError, IsOptional};
use sqlparser::tokenizer::{Tokenizer, Token};
use sqlparser::dialect::keywords::Keyword;

Expand Down Expand Up @@ -28,6 +28,7 @@ impl Dialect for MySqlDialectWithBackTicks {
#[derive(Debug, Clone, PartialEq)]
pub enum Statement {
Statement(SQLStatement),
CreateTable { create_table: SQLStatement, indexes: Vec<SQLStatement> },
CreateSchema { schema_name: ObjectName, if_not_exists: bool },
}

Expand Down Expand Up @@ -81,29 +82,50 @@ impl CubeStoreParser {
without_rowid,
..
} = statement {
let mut indexes = Vec::new();

while self.parser.parse_keyword(Keyword::INDEX) {
indexes.push(self.parse_with_index(name.clone())?);
}

let location = if self.parser.parse_keyword(Keyword::LOCATION) {
Some(self.parser.parse_literal_string()?)
} else {
None
};

Ok(Statement::Statement(SQLStatement::CreateTable {
name,
columns,
constraints,
with_options,
if_not_exists,
external: location.is_some(),
file_format,
location,
query,
without_rowid,
}))
Ok(Statement::CreateTable {
create_table: SQLStatement::CreateTable {
name,
columns,
constraints,
with_options,
if_not_exists,
external: location.is_some(),
file_format,
location,
query,
without_rowid,
},
indexes
})
} else {
Ok(Statement::Statement(statement))
}
}

pub fn parse_with_index(&mut self, table_name: ObjectName) -> Result<SQLStatement, ParserError> {
let index_name = self.parser.parse_object_name()?;
let columns = self.parser.parse_parenthesized_column_list(IsOptional::Mandatory)?;
Ok(SQLStatement::CreateIndex {
name: index_name,
table_name,
columns,
unique: false,
if_not_exists: false,
})
}

fn parse_create_schema(&mut self) -> Result<Statement, ParserError> {
let if_not_exists = self
.parser
Expand Down

0 comments on commit d08d8e3

Please sign in to comment.