Skip to content

Commit

Permalink
fix: forbid duplicated cte names (#703)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Jan 21, 2023
1 parent 61c696d commit 02f99a9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 13 deletions.
32 changes: 28 additions & 4 deletions dozer-sql/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ use sqlparser::{
dialect::AnsiDialect,
parser::Parser,
};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use super::errors::UnsupportedSqlError;
use super::expression::builder::{fullname_from_ident, normalize_ident};

/// The struct contains some contexts during query to pipeline.
#[derive(Debug, Clone, Default)]
pub struct QueryContext {
pub cte_names: HashSet<String>,
}

#[derive(Debug, Clone)]
pub struct IndexedTabelWithJoins {
pub relation: (NameOrAlias, TableFactor),
Expand All @@ -32,6 +38,7 @@ pub fn statement_to_pipeline(
sql: &str,
) -> Result<(AppPipeline, (String, PortHandle)), PipelineError> {
let dialect = AnsiDialect {};
let mut ctx = QueryContext::default();

let ast = Parser::parse_sql(&dialect, sql).unwrap();
let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
Expand All @@ -40,7 +47,14 @@ pub fn statement_to_pipeline(
let mut pipeline = AppPipeline::new();
let mut pipeline_map = HashMap::new();
if let Statement::Query(query) = statement {
query_to_pipeline(&query_name, &query, &mut pipeline, &mut pipeline_map, false)?;
query_to_pipeline(
&query_name,
&query,
&mut pipeline,
&mut pipeline_map,
&mut ctx,
false,
)?;
};
let node = pipeline_map
.get(&query_name.0)
Expand All @@ -54,6 +68,7 @@ fn query_to_pipeline(
query: &Query,
pipeline: &mut AppPipeline,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
query_ctx: &mut QueryContext,
stateful: bool,
) -> Result<(), PipelineError> {
// Attach the first pipeline if there is with clause
Expand All @@ -71,11 +86,18 @@ fn query_to_pipeline(
));
}
let table_name = table.alias.name.to_string();
if query_ctx.cte_names.contains(&table_name) {
return Err(InvalidQuery(format!(
"WITH query name {table_name:?} specified more than once"
)));
}
query_ctx.cte_names.insert(table_name.clone());
query_to_pipeline(
&NameOrAlias(table_name.clone(), Some(table_name)),
&table.query,
pipeline,
pipeline_map,
query_ctx,
false,
)?;
}
Expand All @@ -87,12 +109,13 @@ fn query_to_pipeline(
}
SetExpr::Query(query) => {
let query_name = format!("subquery_{}", uuid::Uuid::new_v4());

let mut ctx = QueryContext::default();
query_to_pipeline(
&NameOrAlias(query_name, None),
&query,
pipeline,
pipeline_map,
&mut ctx,
stateful,
)?
}
Expand Down Expand Up @@ -272,7 +295,8 @@ pub fn get_from_source(
.map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));

let name_or = NameOrAlias(name, alias_name);
query_to_pipeline(&name_or, subquery, pipeline, pipeline_map, false)?;
let mut ctx = QueryContext::default();
query_to_pipeline(&name_or, subquery, pipeline, pipeline_map, &mut ctx, false)?;

Ok(name_or)
}
Expand Down
16 changes: 8 additions & 8 deletions dozer-tests/src/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ fn get_queries() -> Vec<&'static str> {
"select actor_id, TRIM(first_name) from actor where actor_id<=5",
"select actor_id, first_name, last_name,last_update from actor where last_name = 'PIPPO'",
"select actor_id, first_name as fn, last_name as ln,last_update from actor where last_name = 'PIPPO'",
// "select actor_id, first_name, last_name,last_update from actor where last_name IS NULL",
// "select actor_id, first_name, last_name,last_update from actor where last_name IS NULL",
"select count(actor_id) from actor",
// "select actor_id, first_name, last_name,last_update from actor where actor_id in (1,5)",
"select actor_id, first_name, last_name,last_update from actor where first_name='GUINESS'",
"select actor_id, first_name, last_name,last_update from actor where actor_id<5 and actor_id>2",
"select actor_id, first_name, last_name,last_update from actor where (actor_id<5 and actor_id>2) or (actor_id>50)",
"select actor_id from actor order by actor_id",
"select actor_id, count(actor_id) from actor group by actor_id",
"select actor_id, count(actor_id) as counts from actor group by actor_id",
// "select actor_id, first_name, last_name,last_update from actor where actor_id in (1,5)",
"select actor_id, first_name, last_name,last_update from actor where first_name='GUINESS'",
"select actor_id, first_name, last_name,last_update from actor where actor_id<5 and actor_id>2",
"select actor_id, first_name, last_name,last_update from actor where (actor_id<5 and actor_id>2) or (actor_id>50)",
"select actor_id from actor order by actor_id",
"select actor_id, count(actor_id) from actor group by actor_id",
"select actor_id, count(actor_id) as counts from actor group by actor_id",
]
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-types/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub enum IndexDefinition {
pub struct Record {
/// Schema implemented by this Record
pub schema_id: Option<SchemaIdentifier>,
/// List of values, following the definitions of `fields` of the asscoiated schema
/// List of values, following the definitions of `fields` of the associated schema
pub values: Vec<Field>,
pub version: Option<u32>,
}
Expand Down

0 comments on commit 02f99a9

Please sign in to comment.