Skip to content

Commit

Permalink
chore: fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
v3g42 committed Jan 19, 2023
1 parent 7d62be2 commit 612564a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 69 deletions.
13 changes: 13 additions & 0 deletions dozer-sql/src/pipeline/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,17 @@ pub enum PipelineError {
InternalExecutionError(#[from] ExecutionError),
#[error(transparent)]
InternalError(#[from] BoxedError),

#[error(transparent)]
UnsupportedSqlError(#[from] UnsupportedSqlError),
}

#[derive(Error, Debug)]
pub enum UnsupportedSqlError {
#[error("Recursive CTE is not supported. Please refer to the documentation(https://getdozer.io/docs/reference/sql/introduction) for more information. ")]
Recursive,
#[error("Currently this syntax is not supported for CTEs")]
CteFromError,
#[error("Currently only SELECT operations are allowed")]
SelectOnlyError,
}
145 changes: 76 additions & 69 deletions dozer-sql/src/pipeline/new_builder.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
use crate::pipeline::builder::PipelineBuilder;
use crate::pipeline::new_builder::PipelineError::InvalidQuery;
use crate::pipeline::product::factory::get_input_name;
use crate::pipeline::selection::factory::SelectionProcessorFactory;
use crate::pipeline::{errors::PipelineError, product::factory::ProductProcessorFactory};
use dozer_core::dag::app::AppPipeline;
use dozer_core::dag::app::PipelineEntryPoint;
use dozer_core::dag::appsource::AppSourceId;
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
use dozer_core::dag::node::PortHandle;
use dozer_core::dag::{
app::AppPipeline,
dag::{Endpoint, DEFAULT_PORT_HANDLE},
node::NodeHandle,
};
use sqlparser::ast::TableWithJoins;
use sqlparser::{
ast::{Query, Select, SetExpr, Statement},
Expand All @@ -21,63 +17,7 @@ use sqlparser::{
use std::collections::HashMap;
use std::sync::Arc;

const SQL_FIRST_JOIN: &str = r#"
SELECT
a.name as "Genre",
SUM(amount) as "Gross Revenue(in $)"
FROM
(
SELECT
c.name, f.title, p.amount
FROM film f
LEFT JOIN film_category fc
ON fc.film_id = f.film_id
LEFT JOIN category c
ON fc.category_id = c.category_id
LEFT JOIN inventory i
ON i.film_id = f.film_id
LEFT JOIN rental r
ON r.inventory_id = i.inventory_id
LEFT JOIN payment p
ON p.rental_id = r.rental_id
WHERE p.amount IS NOT NULL
) a
GROUP BY name
ORDER BY sum(amount) desc
LIMIT 5;
"#;

const SQL_2: &str = r#"
SELECT
c.name, f.title, p.amount
FROM film f
LEFT JOIN film_category fc
"#;

const SQL_3: &str = r#"
WITH tbl as (select id from a)
select id from tbl
"#;

const SQL_4: &str = r#"
WITH tbl as (select id from a),
tbl2 as (select id from tbl)
select id from tbl2
"#;

const SQL_5: &str = r#"
WITH tbl as (select id from (select ttid from a) as a),
tbl2 as (select id from tbl)
select id from tbl2
"#;
#[test]
fn sql_logic_test_1() {
let _pipeline = statement_to_pipeline(SQL_4).unwrap();

println!("sdfs");
}

use super::errors::UnsupportedSqlError;
pub fn statement_to_pipeline(
sql: &str,
) -> Result<(AppPipeline, (String, PortHandle)), PipelineError> {
Expand Down Expand Up @@ -118,15 +58,18 @@ fn query_to_pipeline(
// Attach the first pipeline if there is with clause
if let Some(with) = &query.with {
if with.recursive {
panic!("Recursive CTE is not supported");
return Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::Recursive,
));
}

for table in &with.cte_tables {
if table.from.is_some() {
panic!("table.from Not supported");
return Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::CteFromError,
));
}

println!("query: {:?}", table.query);
query_to_pipeline(
table.alias.name.to_string(),
&table.query,
Expand All @@ -139,13 +82,17 @@ fn query_to_pipeline(

match *query.body.clone() {
SetExpr::Select(select) => {
select_to_pipeline(processor_name, *select, pipeline, source_map, pipeline_map)?;
select_to_pipeline(processor_name, *select, pipeline, pipeline_map)?;
}
SetExpr::Query(query) => {
let query_name = format!("subquery_{}", uuid::Uuid::new_v4().to_string());
query_to_pipeline(query_name, &query, pipeline, source_map, pipeline_map)?
}
_ => panic!("Only select queries are supported"),
_ => {
return Err(PipelineError::UnsupportedSqlError(
UnsupportedSqlError::SelectOnlyError,
))
}
};
Ok(())
}
Expand All @@ -154,7 +101,6 @@ fn select_to_pipeline(
processor_name: String,
select: Select,
pipeline: &mut AppPipeline,
source_map: &mut HashMap<String, PortHandle>,
pipeline_map: &mut HashMap<String, (String, PortHandle)>,
) -> Result<(), PipelineError> {
// FROM clause
Expand Down Expand Up @@ -258,3 +204,64 @@ pub fn get_entry_points(

Ok(endpoints)
}

#[cfg(test)]
mod tests {
use super::statement_to_pipeline;

#[test]
fn sql_logic_test_1() {
let statements: Vec<&str> = vec![
// r#"
// SELECT
// a.name as "Genre",
// SUM(amount) as "Gross Revenue(in $)"
// FROM
// (
// SELECT
// c.name, f.title, p.amount
// FROM film f
// LEFT JOIN film_category fc
// ON fc.film_id = f.film_id
// LEFT JOIN category c
// ON fc.category_id = c.category_id
// LEFT JOIN inventory i
// ON i.film_id = f.film_id
// LEFT JOIN rental r
// ON r.inventory_id = i.inventory_id
// LEFT JOIN payment p
// ON p.rental_id = r.rental_id
// WHERE p.amount IS NOT NULL
// ) a

// GROUP BY name
// ORDER BY sum(amount) desc
// LIMIT 5;
// "#,
r#"
SELECT
c.name, f.title, p.amount
FROM film f
LEFT JOIN film_category fc
"#,
r#"
WITH tbl as (select id from a)
select id from tbl
"#,
r#"
WITH tbl as (select id from a),
tbl2 as (select id from tbl)
select id from tbl2
"#,
// r#"
// WITH tbl as (select id from (select ttid from a) as a),
// tbl2 as (select id from tbl)
// select id from tbl2
// "#,
];
for sql in statements {
println!("Parsing {:?}", sql);
let _pipeline = statement_to_pipeline(sql).unwrap();
}
}
}

0 comments on commit 612564a

Please sign in to comment.