Skip to content

Commit

Permalink
feat(cubesql): Real connection_id
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Nov 3, 2021
1 parent 3b8f920 commit 24d9804
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 45 deletions.
4 changes: 2 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cubeclient/src/models/mod.rs
Expand Up @@ -31,4 +31,4 @@ pub use self::v1_load_result_annotation::V1LoadResultAnnotation;
pub mod v1_meta_response;
pub use self::v1_meta_response::V1MetaResponse;
pub mod v1_load_continue_wait;
pub use self::v1_load_continue_wait::V1LoadConinueWait;
pub use self::v1_load_continue_wait::V1LoadConinueWait;
2 changes: 1 addition & 1 deletion rust/cubesql/Cargo.toml
Expand Up @@ -29,7 +29,7 @@ simple_logger = "1.7.0"
async-trait = "0.1.36"
regex = "1.5"
uuid = { version = "0.8", features = ["serde", "v4"] }
msql-srv = { git = 'https://github.com/cube-js/msql-srv', rev = '21158a8b10747f9e2f06824310b4472fe3f48cc0' }
msql-srv = { git = 'https://github.com/cube-js/msql-srv', rev = '744e47e4cf6ca52eaa79c07428ef2e81fabc7071' }
bincode = "1.3.1"
chrono = "0.4.15"
mockall = "0.8.1"
Expand Down
33 changes: 29 additions & 4 deletions rust/cubesql/src/compile/engine/udf.rs
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use datafusion::{
arrow::{
array::{ArrayRef, StringBuilder},
array::{ArrayRef, StringBuilder, UInt32Builder},
datatypes::DataType,
},
logical_plan::create_udf,
Expand All @@ -12,6 +12,8 @@ use datafusion::{
},
};

use crate::compile::QueryPlannerExecutionProps;

pub fn create_version_udf() -> ScalarUDF {
let version = make_scalar_function(|_args: &[ArrayRef]| {
let mut builder = StringBuilder::new(1);
Expand All @@ -29,10 +31,13 @@ pub fn create_version_udf() -> ScalarUDF {
)
}

pub fn create_db_udf() -> ScalarUDF {
let version = make_scalar_function(|_args: &[ArrayRef]| {
pub fn create_db_udf(props: &QueryPlannerExecutionProps) -> ScalarUDF {
// Due our requirements it's more easy to clone this variable rather then Arc
let fixed_state = props.database.clone().unwrap_or("db".to_string());

let version = make_scalar_function(move |_args: &[ArrayRef]| {
let mut builder = StringBuilder::new(1);
builder.append_value("database").unwrap();
builder.append_value(fixed_state.clone()).unwrap();

Ok(Arc::new(builder.finish()) as ArrayRef)
});
Expand All @@ -45,3 +50,23 @@ pub fn create_db_udf() -> ScalarUDF {
version,
)
}

pub fn create_connection_id_udf(props: &QueryPlannerExecutionProps) -> ScalarUDF {
// Due our requirements it's more easy to clone this variable rather then Arc
let fixed_connection_id = props.connection_id;

let version = make_scalar_function(move |_args: &[ArrayRef]| {
let mut builder = UInt32Builder::new(1);
builder.append_value(fixed_connection_id).unwrap();

Ok(Arc::new(builder.finish()) as ArrayRef)
});

create_udf(
"connection_id",
vec![],
Arc::new(DataType::UInt32),
Volatility::Immutable,
version,
)
}
94 changes: 74 additions & 20 deletions rust/cubesql/src/compile/mod.rs
Expand Up @@ -3,6 +3,7 @@ use std::{backtrace::Backtrace, fmt};

use chrono::{DateTime, Duration, TimeZone, Utc};


use datafusion::sql::parser::Statement as DFStatement;
use datafusion::sql::planner::SqlToRel;
use datafusion::variable::VarType;
Expand All @@ -25,12 +26,12 @@ use crate::{
compile::builder::QueryBuilder,
schema::{ctx, V1CubeMetaDimensionExt, V1CubeMetaMeasureExt, V1CubeMetaSegmentExt},
};
use msql_srv::ColumnType;
use msql_srv::{AsyncMysqlShim, ColumnType};

use self::builder::*;
use self::context::*;
use self::engine::context::SystemVar;
use self::engine::udf::{create_db_udf, create_version_udf};
use self::engine::udf::{create_connection_id_udf, create_db_udf, create_version_udf};

pub mod builder;
pub mod context;
Expand Down Expand Up @@ -1033,6 +1034,27 @@ fn compile_select(expr: &ast::Select, ctx: &mut QueryContext) -> CompilationResu
Ok(builder)
}

#[derive(Debug)]
pub struct QueryPlannerExecutionProps {
connection_id: u32,
database: Option<String>,
}

impl QueryPlannerExecutionProps {
pub fn new(connection_id: u32, database: Option<String>) -> Self {
Self {
connection_id,
database,
}
}
}

impl QueryPlannerExecutionProps {
pub fn connection_id(&self) -> u32 {
self.connection_id
}
}

struct QueryPlanner {
context: Arc<ctx::TenantContext>,
}
Expand All @@ -1042,7 +1064,11 @@ impl QueryPlanner {
Self { context }
}

pub fn plan(&self, stmt: &ast::Statement) -> CompilationResult<QueryPlan> {
pub fn plan(
&self,
stmt: &ast::Statement,
props: &QueryPlannerExecutionProps,
) -> CompilationResult<QueryPlan> {
let (query, select) = match stmt {
ast::Statement::Query(q) => {
if q.with.is_some() {
Expand All @@ -1067,11 +1093,11 @@ impl QueryPlanner {
))));
}
ast::Statement::ShowVariable { variable } => {
return self.show_variable_to_plan(&variable);
return self.show_variable_to_plan(variable, props);
}
// Proxy some queries to DF
ast::Statement::ShowColumns { .. } | ast::Statement::ShowVariable { .. } => {
return self.create_df_logical_plan(stmt.clone());
return self.create_df_logical_plan(stmt.clone(), props);
}
_ => {
return Err(CompilationError::Unsupported(
Expand Down Expand Up @@ -1107,7 +1133,7 @@ impl QueryPlanner {

&select.from[0]
} else {
return self.create_df_logical_plan(stmt.clone());
return self.create_df_logical_plan(stmt.clone(), props);
};

let (schema_name, table_name) = match &from_table.relation {
Expand Down Expand Up @@ -1184,10 +1210,14 @@ impl QueryPlanner {
}
}

fn show_variable_to_plan(&self, variable: &Vec<Ident>) -> CompilationResult<QueryPlan> {
fn show_variable_to_plan(
&self,
variable: &Vec<Ident>,
props: &QueryPlannerExecutionProps,
) -> CompilationResult<QueryPlan> {
let name = ObjectName(variable.to_vec()).to_string();
if name.eq_ignore_ascii_case("databases") || name.eq_ignore_ascii_case("schemas") {
return Ok(QueryPlan::Meta(Arc::new(dataframe::DataFrame::new(
Ok(QueryPlan::Meta(Arc::new(dataframe::DataFrame::new(
vec![dataframe::Column::new(
"Database".to_string(),
ColumnType::MYSQL_TYPE_STRING,
Expand All @@ -1203,22 +1233,30 @@ impl QueryPlanner {
)]),
dataframe::Row::new(vec![dataframe::TableValue::String("sys".to_string())]),
],
))));
))))
} else {
return self.create_df_logical_plan(ast::Statement::ShowVariable {
variable: variable.clone(),
});
self.create_df_logical_plan(
ast::Statement::ShowVariable {
variable: variable.clone(),
},
props,
)
}
}

fn create_df_logical_plan(&self, stmt: ast::Statement) -> CompilationResult<QueryPlan> {
fn create_df_logical_plan(
&self,
stmt: ast::Statement,
props: &QueryPlannerExecutionProps,
) -> CompilationResult<QueryPlan> {
let mut ctx = ExecutionContext::new();

let variable_provider = SystemVar::new();
ctx.register_variable(VarType::System, Arc::new(variable_provider));

ctx.register_udf(create_version_udf());
ctx.register_udf(create_db_udf());
ctx.register_udf(create_db_udf(props));
ctx.register_udf(create_connection_id_udf(props));

let state = ctx.state.lock().unwrap().clone();
let df_query_planner = SqlToRel::new(&state);
Expand All @@ -1239,10 +1277,11 @@ impl QueryPlanner {

pub fn convert_statement_to_cube_query(
stmt: &ast::Statement,
tenant: Arc<ctx::TenantContext>,
tenant_ctx: Arc<ctx::TenantContext>,
props: &QueryPlannerExecutionProps,
) -> CompilationResult<QueryPlan> {
let planner = QueryPlanner::new(tenant);
planner.plan(stmt)
let planner = QueryPlanner::new(tenant_ctx);
planner.plan(stmt, props)
}

#[derive(Debug, PartialEq, Serialize)]
Expand Down Expand Up @@ -1288,6 +1327,7 @@ impl QueryPlan {
pub fn convert_sql_to_cube_query(
query: &String,
tenant: Arc<ctx::TenantContext>,
props: &QueryPlannerExecutionProps,
) -> CompilationResult<QueryPlan> {
let dialect = MySqlDialectWithBackTicks {};
let parse_result = Parser::parse_sql(&dialect, query);
Expand All @@ -1300,7 +1340,7 @@ pub fn convert_sql_to_cube_query(
Ok(stmts) => {
let stmt = &stmts[0];

convert_statement_to_cube_query(stmt, tenant)
convert_statement_to_cube_query(stmt, tenant, props)
}
}
}
Expand Down Expand Up @@ -1402,7 +1442,14 @@ mod tests {
}

fn convert_simple_select(query: String) -> CompiledQuery {
let query = convert_sql_to_cube_query(&query, get_test_tenant_ctx());
let query = convert_sql_to_cube_query(
&query,
get_test_tenant_ctx(),
&QueryPlannerExecutionProps {
connection_id: 8,
database: None,
},
);
match query.unwrap() {
QueryPlan::CubeSelect(query) => query,
_ => panic!("Must return CubeSelect instead of DF plan"),
Expand Down Expand Up @@ -1863,7 +1910,14 @@ mod tests {
];

for (input_query, expected_error) in variants.iter() {
let query = convert_sql_to_cube_query(&input_query, get_test_tenant_ctx());
let query = convert_sql_to_cube_query(
&input_query,
get_test_tenant_ctx(),
&QueryPlannerExecutionProps {
connection_id: 8,
database: None,
},
);

match &query {
Ok(_) => panic!("Query ({}) should return error", input_query),
Expand Down
5 changes: 4 additions & 1 deletion rust/cubesql/src/mysql/dataframe.rs
Expand Up @@ -2,7 +2,8 @@ use std::fmt::{self, Debug, Formatter};

use chrono::{SecondsFormat, TimeZone, Utc};
use datafusion::arrow::array::{
Array, Float64Array, Int64Array, StringArray, TimestampMicrosecondArray,
Array, Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
UInt32Array,
};
use datafusion::arrow::{
array::{BooleanArray, TimestampNanosecondArray, UInt64Array},
Expand Down Expand Up @@ -308,6 +309,8 @@ pub fn batch_to_dataframe(batches: &Vec<RecordBatch>) -> Result<DataFrame, CubeE
let array = batch.column(column_index);
let num_rows = batch.num_rows();
match array.data_type() {
DataType::Int32 => convert_array!(array, num_rows, rows, Int32Array, Int64, i64),
DataType::UInt32 => convert_array!(array, num_rows, rows, UInt32Array, Int64, i64),
DataType::UInt64 => convert_array!(array, num_rows, rows, UInt64Array, Int64, i64),
DataType::Int64 => convert_array!(array, num_rows, rows, Int64Array, Int64, i64),
DataType::Float64 => {
Expand Down

0 comments on commit 24d9804

Please sign in to comment.