Skip to content

Commit

Permalink
fix(cubesql): Reuse query params in push down
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed May 8, 2024
1 parent 0bea221 commit b849f34
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
7 changes: 6 additions & 1 deletion packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -593,13 +593,18 @@ export class BaseQuery {
['buildSqlAndParams', exportAnnotatedSql],
() => this.paramAllocator.buildSqlAndParams(
this.buildParamAnnotatedSql(),
exportAnnotatedSql
exportAnnotatedSql,
this.shouldReuseParams
),
{ cache: this.queryCache }
)
);
}

get shouldReuseParams() {
return false;
}

/**
* Returns a dictionary mapping each preagregation to its corresponding query fragment.
* @returns {Record<string, Array<string>>}
Expand Down
18 changes: 17 additions & 1 deletion packages/cubejs-schema-compiler/src/adapter/ParamAllocator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@ export class ParamAllocator {
return sql.match(PARAMS_MATCH_REGEXP) !== null;
}

public buildSqlAndParams(annotatedSql: string, exportAnnotatedSql?: boolean): [string, unknown[]] {
public buildSqlAndParams(annotatedSql: string, exportAnnotatedSql?: boolean, shouldReuseParams?: boolean): [string, unknown[]] {
const paramsInSqlOrder: unknown[] = [];
const paramIndexMap: Record<string, number> = {};

if (shouldReuseParams) {
return [
annotatedSql.replace(PARAMS_MATCH_REGEXP, (match, paramIndex) => {
let newIndex = paramIndexMap[paramIndex];
if (newIndex == null) {
newIndex = paramsInSqlOrder.length;
paramIndexMap[paramIndex] = newIndex;
paramsInSqlOrder.push(this.params[paramIndex]);
}
return exportAnnotatedSql ? `$${newIndex}$` : this.paramPlaceHolder(newIndex);
}),
paramsInSqlOrder
];
}

return [
annotatedSql.replace(PARAMS_MATCH_REGEXP, (match, paramIndex) => {
Expand Down
4 changes: 4 additions & 0 deletions packages/cubejs-schema-compiler/src/adapter/PostgresQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@ export class PostgresQuery extends BaseQuery {

return templates;
}

public get shouldReuseParams() {
return true;
}
}
20 changes: 16 additions & 4 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ impl SqlQuery {
}

pub fn add_value(&mut self, value: Option<String>) -> usize {
if let Some(index) = self.values.iter().position(|v| v == &value) {
return index;
}
let index = self.values.len();
self.values.push(value);
index
Expand All @@ -59,30 +62,39 @@ impl SqlQuery {
&self,
sql_templates: Arc<SqlTemplates>,
param_index: Option<&str>,
rendered_params: &HashMap<usize, String>,
new_param_index: usize,
) -> Result<(usize, String)> {
) -> Result<(usize, String, bool)> {
let param = param_index
.ok_or_else(|| DataFusionError::Execution("Missing param match".to_string()))?
.parse::<usize>()
.map_err(|e| DataFusionError::Execution(format!("Can't parse param index: {}", e)))?;
if let Some(rendered_param) = rendered_params.get(&param) {
return Ok((param, rendered_param.clone(), false));
}
Ok((
param,
sql_templates
.param(new_param_index)
.map_err(|e| DataFusionError::Execution(format!("Can't render param: {}", e)))?,
true,
))
}

pub fn finalize_query(&mut self, sql_templates: Arc<SqlTemplates>) -> Result<()> {
let mut params = Vec::new();
let mut rendered_params = HashMap::new();
let regex = Regex::new(r"\$(\d+)\$")
.map_err(|e| DataFusionError::Execution(format!("Can't parse regex: {}", e)))?;
let mut res = Ok(());
let replaced_sql = regex.replace_all(self.sql.as_str(), |c: &Captures<'_>| {
let param = c.get(1).map(|x| x.as_str());
match self.render_param(sql_templates.clone(), param, params.len()) {
Ok((param_index, param)) => {
params.push(self.values[param_index].clone());
match self.render_param(sql_templates.clone(), param, &rendered_params, params.len()) {
Ok((param_index, param, push_param)) => {
if push_param {
params.push(self.values[param_index].clone());
rendered_params.insert(param_index, param.clone());
}
param
}
Err(e) => {
Expand Down

0 comments on commit b849f34

Please sign in to comment.