Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlparser: update code to conform to upstream sqlparser-rs changes #2510

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"job": {
"facets": {
"sql": {
"query": "CREATE TYPE myrowtype AS (f1 int, f2 text, f3 numeric)"
"query": "DROP POLICY IF EXISTS name ON table_name"
}
},
"name": "failed_sql_extraction.fail"
Expand All @@ -15,8 +15,8 @@
"totalTasks": 1,
"failedTasks": 1,
"errors": [{
"errorMessage": "Expected an object type after CREATE, found: TYPE",
"task": "CREATE TYPE myrowtype AS (f1 int, f2 text, f3 numeric)",
"errorMessage": "Expected TABLE, VIEW, INDEX, ROLE, SCHEMA, FUNCTION, STAGE or SEQUENCE after DROP, found: POLICY at Line: 1, Column 6",
"task": "DROP POLICY IF EXISTS name ON table_name",
"taskNumber": 0
}]
}
Expand All @@ -27,7 +27,7 @@
"job": {
"facets": {
"sql": {
"query": "CREATE TYPE myrowtype AS (f1 int, f2 text, f3 numeric)"
"query": "DROP POLICY IF EXISTS name ON table_name"
}
},
"name": "failed_sql_extraction.fail"
Expand All @@ -38,8 +38,8 @@
"totalTasks": 1,
"failedTasks": 1,
"errors": [{
"errorMessage": "Expected an object type after CREATE, found: TYPE",
"task": "CREATE TYPE myrowtype AS (f1 int, f2 text, f3 numeric)",
"errorMessage": "Expected TABLE, VIEW, INDEX, ROLE, SCHEMA, FUNCTION, STAGE or SEQUENCE after DROP, found: POLICY at Line: 1, Column 6",
"task": "DROP POLICY IF EXISTS name ON table_name",
"taskNumber": 0
}]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@


# Some sql-parser unsupported syntax
sql = "CREATE TYPE myrowtype AS (f1 int, f2 text, f3 numeric)"
sql = "DROP POLICY IF EXISTS name ON table_name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of replacing one test scenario with another?



if parse_version(AIRFLOW_VERSION) < parse_version("2.5.0"):
Expand Down
2 changes: 1 addition & 1 deletion integration/common/tests/sql/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def test_parse_bugged_cte():
FROM sum_trans
WHERE count > 1000 OR balance > 100000;
"""
assert parse(sql).errors == [ExtractionError(0, "Expected ), found: user_id", sql)]
assert parse(sql).errors == [ExtractionError(0, "Expected ), found: user_id at Line: 3, Column 20", sql)]


def test_parse_recursive_cte():
Expand Down
2 changes: 1 addition & 1 deletion integration/sql/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ crate-type = ["rlib"]
[dependencies]
anyhow = {workspace = true}

sqlparser = {git = "https://github.com/OpenLineage/sqlparser-rs/", branch = "release"}
sqlparser = {git = "https://github.com/OpenLineage/sqlparser-rs/", branch = "fix_snowflake_stage_no_parens"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to rely on feature branch? If there is any particular reason to do that, pls specify PR that we need to merge in another repo. Otherwise it may be difficult to change this in future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because if we merge the sqlparser-rs changes first here: OpenLineage/sqlparser-rs#5 it would break the main: so the algorithm is

  • do the sqlparser change on a branch
  • make changes in the OpenLineage repo, pointing to the branch
  • merge sqlparser changes to our fork
  • move the changes back to the release branch on OpenLineage repo

23 changes: 0 additions & 23 deletions integration/sql/impl/src/bigquery.rs

This file was deleted.

3 changes: 1 addition & 2 deletions integration/sql/impl/src/dialect.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright 2018-2023 contributors to the OpenLineage project
// SPDX-License-Identifier: Apache-2.0

use crate::bigquery::BigQueryDialect;
use sqlparser::dialect::{
AnsiDialect, Dialect, GenericDialect, HiveDialect, MsSqlDialect, MySqlDialect,
AnsiDialect, BigQueryDialect, Dialect, GenericDialect, HiveDialect, MsSqlDialect, MySqlDialect,
PostgreSqlDialect, RedshiftSqlDialect, SQLiteDialect, SnowflakeDialect,
};

Expand Down
2 changes: 0 additions & 2 deletions integration/sql/impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
// Copyright 2018-2023 contributors to the OpenLineage project
// SPDX-License-Identifier: Apache-2.0

mod bigquery;
mod context;
mod dialect;
mod lineage;
mod visitor;

use std::collections::HashSet;

pub use bigquery::BigQueryDialect;
use context::Context;
pub use dialect::*;
pub use lineage::*;
Expand Down
126 changes: 90 additions & 36 deletions integration/sql/impl/src/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::lineage::*;

use anyhow::{anyhow, Result};
use sqlparser::ast::{
AlterTableOperation, Expr, Function, FunctionArg, FunctionArgExpr, Ident, ListAggOnOverflow,
Query, Select, SelectItem, SetExpr, Statement, Table, TableFactor, WindowSpec, With,
AlterTableOperation, Expr, FromTable, Function, FunctionArg, FunctionArgExpr, Ident,
ListAggOnOverflow, Query, Select, SelectItem, SetExpr, Statement, Table, TableFactor,
WindowSpec, WindowType, With,
};

pub trait Visit {
Expand Down Expand Up @@ -52,18 +53,19 @@ impl Visit for TableFactor {
context.add_input(name.clone().0);
Ok(())
}
TableFactor::Pivot {
name, pivot_alias, ..
} => {
let table = DbTableMeta::new(
name.clone().0,
context.dialect(),
context.default_schema().clone(),
);
if let Some(pivot_alias) = pivot_alias {
context.add_table_alias(table, vec![pivot_alias.clone().name]);
TableFactor::Pivot { table, alias, .. } => {
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
let ident = get_table_name_from_table_factor(table)?;
if let Some(pivot_alias) = alias {
context.add_table_alias(
DbTableMeta::new(
ident.clone(),
context.dialect(),
context.default_schema().clone(),
),
vec![pivot_alias.clone().name],
);
}
context.add_input(name.clone().0);
context.add_input(ident);
Ok(())
}
TableFactor::Derived {
Expand Down Expand Up @@ -181,11 +183,22 @@ impl Visit for Expr {
| Expr::IsNull(expr)
| Expr::IsNotNull(expr)
| Expr::IsUnknown(expr)
| Expr::IsNotUnknown(expr)
| Expr::AnyOp(expr)
| Expr::AllOp(expr) => {
| Expr::IsNotUnknown(expr) => {
expr.visit(context)?;
}
Expr::AnyOp {
left,
compare_op: _,
right,
}
| Expr::AllOp {
left,
compare_op: _,
right,
} => {
left.visit(context)?;
right.visit(context)?;
}
Expr::InList { expr, list, .. } => {
expr.visit(context)?;
for e in list {
Expand Down Expand Up @@ -240,6 +253,7 @@ impl Visit for Expr {
expr,
substring_from,
substring_for,
..
} => {
expr.visit(context)?;
if let Some(e) = substring_from {
Expand All @@ -253,6 +267,7 @@ impl Visit for Expr {
expr,
trim_where: _,
trim_what,
..
} => {
expr.visit(context)?;
if let Some(e) = trim_what {
Expand Down Expand Up @@ -369,7 +384,11 @@ impl Visit for Function {
impl Visit for FunctionArg {
fn visit(&self, context: &mut Context) -> Result<()> {
match self {
FunctionArg::Named { name: _, arg } => arg.visit(context),
FunctionArg::Named {
name: _,
arg,
operator: _,
} => arg.visit(context),
FunctionArg::Unnamed(arg) => arg.visit(context),
}
}
Expand All @@ -384,6 +403,14 @@ impl Visit for FunctionArgExpr {
}
}

impl Visit for WindowType {
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
fn visit(&self, context: &mut Context) -> Result<()> {
match self {
WindowType::WindowSpec(spec) => spec.visit(context),
WindowType::NamedWindow(..) => Ok(()),
}
}
}
impl Visit for WindowSpec {
fn visit(&self, context: &mut Context) -> Result<()> {
for expr in &self.partition_by {
Expand Down Expand Up @@ -489,6 +516,7 @@ impl Visit for SetExpr {
right.visit(context)
}
SetExpr::Table(table) => table.visit(context),
SetExpr::Update(stmt) => stmt.visit(context),
}
}
}
Expand Down Expand Up @@ -524,7 +552,9 @@ impl Visit for Statement {
Statement::Insert {
table_name, source, ..
} => {
source.visit(context)?;
if let Some(src) = source {
src.visit(context)?;
}
context.add_output(table_name.clone().0);
}
Statement::Merge { table, source, .. } => {
Expand Down Expand Up @@ -587,34 +617,58 @@ impl Visit for Statement {
expr.visit(context)?;
}
}
Statement::AlterTable { name, operation } => {
match operation {
AlterTableOperation::SwapWith { table_name } => {
// both table names are inputs and outputs of the swap operation
context.add_input(table_name.clone().0);
context.add_input(name.clone().0);

context.add_output(table_name.clone().0);
context.add_output(name.clone().0);
}
AlterTableOperation::RenameTable { table_name } => {
context.add_input(name.clone().0);
context.add_output(table_name.clone().0);
Statement::AlterTable {
name,
if_exists: _,
only: _,
operations,
location: _,
} => {
for operation in operations {
match operation {
AlterTableOperation::SwapWith { table_name } => {
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
// both table names are inputs and outputs of the swap operation
context.add_input(table_name.clone().0);
context.add_input(name.clone().0);

context.add_output(table_name.clone().0);
context.add_output(name.clone().0);
}
AlterTableOperation::RenameTable { table_name } => {
context.add_input(name.clone().0);
context.add_output(table_name.clone().0);
}
_ => context.add_output(name.clone().0),
}
_ => context.add_output(name.clone().0),
}
}
Statement::Delete {
table_name,
tables: _,
from,
using,
selection,
..
} => {
let table_name = get_table_name_from_table_factor(table_name)?;
context.add_output(table_name);
match from {
FromTable::WithFromKeyword(tables) | FromTable::WithoutKeyword(tables) => {
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
for table in tables {
let output = get_table_name_from_table_factor(&table.relation)?;
context.add_output(output);
for join in &table.joins {
let join_output = get_table_name_from_table_factor(&join.relation)?;
context.add_output(join_output);
}
}
}
}

if let Some(using) = using {
using.visit(context)?;
for table in using {
table.relation.visit(context)?;
for join in &table.joins {
join.relation.visit(context)?;
}
}
}

if let Some(expr) = selection {
Expand Down
13 changes: 13 additions & 0 deletions integration/sql/impl/tests/table_lineage/tests_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,16 @@ fn alter_snowflake_swap_with() {
}
)
}

#[test]
fn alter_multiple_operations() {
assert_eq!(
test_sql("ALTER TABLE IF EXISTS ONLY tab ADD COLUMN a TEXT, ADD COLUMN b INT")
.unwrap()
.table_lineage,
TableLineage {
in_tables: vec![],
out_tables: vec![table("tab")],
}
)
}
2 changes: 1 addition & 1 deletion integration/sql/impl/tests/table_lineage/tests_cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn parse_bugged_cte() {
meta.errors.first().unwrap(),
&ExtractionError {
index: 0,
message: "Expected ), found: user_id".to_string(),
message: "Expected ), found: user_id at Line: 3, Column 20".to_string(),
origin_statement: sql.to_string(),
}
);
Expand Down
14 changes: 14 additions & 0 deletions integration/sql/impl/tests/table_lineage/tests_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ fn delete_from() {
);
}

#[test]
fn delete_without_from_bigquery() {
assert_eq!(
test_sql_dialect("DELETE \"project.dataset.table\" WHERE 1", "bigquery")
.unwrap()
.table_lineage
.out_tables
.first()
.unwrap()
.qualified_name(),
"project.dataset.table"
)
}

#[test]
fn delete_from_using() {
assert_eq!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ fn test_failing_statement_with_insert() {
errors: vec![
ExtractionError {
index: 0,
message: "Expected an SQL statement, found: FAILING".to_string(),
message: "Expected an SQL statement, found: FAILING at Line: 1, Column 1".to_string(),
origin_statement: "FAILING STATEMENT;".to_string(),
},
ExtractionError {
index: 2,
message:
"Expected SELECT, VALUES, or a subquery in the query body, found: FAILING"
"Expected SELECT, VALUES, or a subquery in the query body, found: FAILING at Line: 1, Column 13"
.to_string(),
origin_statement: "INSERT ALSO FAILING".to_string(),
}
Expand Down