diff --git a/Cargo.toml b/Cargo.toml index c25256644..a8f79025a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ datafusion = { version = "43" } snafu = { version = "0.8.5", features = ["futures"] } [patch.crates-io] -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } [workspace.lints.clippy] diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index bbf67fdc8..28bd545e5 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" } #iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } #datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index eca5d8f7c..dfa318630 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -14,13 +14,13 @@ serde = { workspace = true } serde_json = { workspace = true } object_store = { workspace = true } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 3c024239b..a49c84621 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -83,7 +83,8 @@ impl SqlExecutor { | Statement::Commit { .. } | Statement::Insert { .. } | Statement::ShowSchemas { .. } - | Statement::ShowVariable { .. } => { + | Statement::ShowVariable { .. } + | Statement::Update { .. } => { return Box::pin(self.execute_with_custom_plan(&query, warehouse_name)).await; } Statement::Query(mut subquery) => { @@ -291,8 +292,8 @@ impl SqlExecutor { self.update_tables_in_table_factor(&mut table, warehouse_name); self.update_tables_in_table_factor(&mut source, warehouse_name); - let (target_table, target_alias) = self.get_table_with_alias(table); - let (source_table, _source_alias) = self.get_table_with_alias(source.clone()); + let (target_table, target_alias) = Self::get_table_with_alias(table); + let (_source_table, source_alias) = Self::get_table_with_alias(source.clone()); let source_query = if let TableFactor::Derived { subquery, @@ -323,41 +324,37 @@ impl SqlExecutor { }; // Check NOT MATCHED for records to INSERT - let select_query = - format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}{where_clause_str}"); - self.execute_with_custom_plan(&select_query, warehouse_name) - .await?; - // Extract columns and values from clauses - let mut columns = Vec::new(); - let mut values = Vec::new(); + let mut columns = String::new(); + let mut values = String::new(); for clause in clauses { if clause.clause_kind == MergeClauseKind::NotMatched { if let MergeAction::Insert(insert) = clause.action { - columns = insert.columns; + columns = insert + .columns + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "); if let MergeInsertKind::Values(values_insert) = insert.kind { - values = values_insert.rows.into_iter().flatten().collect(); + values = values_insert + .rows + .into_iter() + .flatten() + .collect::>() + .iter() + .map(|v| format!("{source_alias}.{v}")) + .collect::>() + .join(", "); } } } } - // Construct the INSERT statement - let insert_query = format!( - "INSERT INTO {} ({}) SELECT {} FROM {}", - target_table, - columns - .iter() - .map(ToString::to_string) - .collect::>() - .join(", "), - values - .iter() - .map(ToString::to_string) - .collect::>() - .join(", "), - source_table - ); + let select_query = + format!("SELECT {values} FROM {source_query} JOIN {target_table} {target_alias} ON {on}{where_clause_str}"); + // Construct the INSERT statement + let insert_query = format!("INSERT INTO {target_table} ({columns}) {select_query}"); self.execute_with_custom_plan(&insert_query, warehouse_name) .await } else { @@ -643,6 +640,7 @@ impl SqlExecutor { } #[must_use] + #[allow(clippy::too_many_lines)] pub fn update_statement_references( &self, statement: DFStatement, @@ -736,6 +734,28 @@ impl SqlExecutor { statement } } + Statement::Update { + mut table, + assignments, + mut from, + selection, + returning, + or, + } => { + self.update_tables_in_table_with_joins(&mut table, warehouse_name); + if let Some(from) = from.as_mut() { + self.update_tables_in_table_with_joins(from, warehouse_name); + } + let modified_statement = Statement::Update { + table, + assignments, + from, + selection, + returning, + or, + }; + DFStatement::Statement(Box::new(modified_statement)) + } _ => statement, }, _ => statement, @@ -773,21 +793,15 @@ impl SqlExecutor { } #[allow(clippy::only_used_in_recursion)] - fn get_table_with_alias(&self, factor: TableFactor) -> (ObjectName, String) { + fn get_table_with_alias(factor: TableFactor) -> (ObjectName, String) { match factor { TableFactor::Table { name, alias, .. } => { let target_alias = alias.map_or_else(String::new, |alias| alias.to_string()); (name, target_alias) } - TableFactor::Derived { - subquery, alias, .. - } => { + // Return only alias for derived tables + TableFactor::Derived { alias, .. } => { let target_alias = alias.map_or_else(String::new, |alias| alias.to_string()); - if let sqlparser::ast::SetExpr::Select(select) = subquery.body.as_ref() { - if let Some(table_with_joins) = select.from.first() { - return self.get_table_with_alias(table_with_joins.relation.clone()); - } - } (ObjectName(vec![]), target_alias) } _ => (ObjectName(vec![]), String::new()), diff --git a/crates/runtime/src/datafusion/planner.rs b/crates/runtime/src/datafusion/planner.rs index 9d2044846..f25fbad7e 100644 --- a/crates/runtime/src/datafusion/planner.rs +++ b/crates/runtime/src/datafusion/planner.rs @@ -93,7 +93,8 @@ where match statement.clone() { Statement::AlterTable { .. } | Statement::StartTransaction { .. } - | Statement::Commit { .. } => Ok(LogicalPlan::default()), + | Statement::Commit { .. } + | Statement::Update { .. } => Ok(LogicalPlan::default()), Statement::ShowSchemas { .. } => self.show_variable_to_plan(&["schemas".into()]), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), Statement::CreateTable(CreateTableStatement {