From 8c0979b73b63b6e14c8efcfe3b82fcaf88744eb0 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Tue, 14 Jan 2025 17:33:25 +0300 Subject: [PATCH 1/7] # This is a combination of 14 commits. # This is the 1st commit message: Upload CSV to table # This is the commit message #2: Upload CSV to table # This is the commit message #3: Fix linter # This is the commit message #4: Remove unused error # This is the commit message #5: Fix clippy # This is the commit message #6: Fix clippy # This is the commit message #7: Fix linter # This is the commit message #8: Fix clippy # This is the commit message #9: Fix clippy # This is the commit message #10: Fix clippy # This is the commit message #11: Fix fmt # This is the commit message #12: Add pre-commit # This is the commit message #13: Parse merge into # This is the commit message #14: Parse merge into --- .pre-commit-config.yaml | 10 + Cargo.toml | 2 +- crates/control_plane/Cargo.toml | 12 +- crates/control_plane/src/service.rs | 156 +++++---- .../src/http/control/schemas/warehouses.rs | 1 - crates/nexus/src/http/dbt/handlers.rs | 2 +- crates/nexus/src/main.rs | 1 + crates/runtime/Cargo.toml | 12 +- crates/runtime/src/datafusion/error.rs | 3 + crates/runtime/src/datafusion/execution.rs | 316 +++++++++++++----- .../src/datafusion/functions/greatest.rs | 164 --------- .../functions/greatest_least_utils.rs | 126 ------- .../runtime/src/datafusion/functions/least.rs | 175 ---------- .../runtime/src/datafusion/functions/mod.rs | 5 - crates/runtime/src/datafusion/planner.rs | 2 + crates/utils/src/lib.rs | 1 + 16 files changed, 346 insertions(+), 642 deletions(-) create mode 100644 .pre-commit-config.yaml delete mode 100644 crates/runtime/src/datafusion/functions/greatest.rs delete mode 100644 crates/runtime/src/datafusion/functions/greatest_least_utils.rs delete mode 100644 crates/runtime/src/datafusion/functions/least.rs diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 000000000..b773e8e91 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,10 @@ +repos: + - repo: https://github.com/doublify/pre-commit-rust + rev: v1.0 + hooks: + - id: cargo-check + args: [ "--workspace" ] + - id: fmt + args: [ "--", "--check" ] + - id: clippy + args: [ "--", "-D", "warnings" ] diff --git a/Cargo.toml b/Cargo.toml index 6775ccf2f..a8bd8b86b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ datafusion = { version = "43" } snafu = { version = "0.8.5", features = ["futures"] } [patch.crates-io] -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } [workspace.lints.clippy] diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index 0b7bcdef5..baf7af042 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" } #iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } #datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index 8e07c4931..dfc8a2313 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -15,7 +15,6 @@ use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::planner::IcebergQueryPlanner; use iceberg_rest_catalog::apis::configuration::Configuration; use iceberg_rest_catalog::catalog::RestCatalog; -use icelake::TableIdentifier; use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; use runtime::datafusion::execution::SqlExecutor; @@ -23,7 +22,6 @@ use rusoto_core::{HttpClient, Region}; use rusoto_credential::StaticProvider; use rusoto_s3::{GetBucketAclRequest, S3Client, S3}; use snafu::ResultExt; -use std::collections::HashMap; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -355,9 +353,8 @@ impl ControlService for ControlServiceImpl { data: Bytes, file_name: String, ) -> ControlPlaneResult<()> { - //println!("{:?}", warehouse_id); - let warehouse = self.get_warehouse(*warehouse_id).await?; + let warehouse_name = warehouse.name.clone(); let storage_profile = self.get_profile(warehouse.storage_profile_id).await?; let object_store = storage_profile .get_object_store() @@ -375,8 +372,25 @@ impl ControlService for ControlServiceImpl { .await .context(crate::error::ObjectStoreSnafu)?; + // Create table from CSV + let config = { + let mut config = Configuration::new(); + config.base_path = "http://0.0.0.0:3000/catalog".to_string(); + config + }; + let object_store_builder = storage_profile + .get_object_store_builder() + .context(crate::error::InvalidStorageProfileSnafu)?; + let rest_client = RestCatalog::new( + Some(warehouse_id.to_string().as_str()), + config, + object_store_builder, + ); + let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; let ctx = SessionContext::new(); + ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog)); + // Register CSV file as a table let storage_endpoint_url = storage_profile .endpoint .as_ref() @@ -385,7 +399,7 @@ impl ControlService for ControlServiceImpl { let path_string = match &storage_profile.credentials { Credentials::AccessKey(_) => { // If the storage profile is AWS S3, modify the path_string with the S3 prefix - format!("{storage_endpoint_url}/{path_string}",) + format!("{storage_endpoint_url}/{path_string}") } Credentials::Role(_) => path_string, }; @@ -395,11 +409,17 @@ impl ControlService for ControlServiceImpl { }, )?; ctx.register_object_store(&endpoint_url, Arc::from(object_store)); + ctx.register_csv(table_name, path_string, CsvReadOptions::new()) + .await?; - // println!("{:?}", data); - // Commented code is writing with iceberg-rust-jankaul - // Let it sit here just in case - ////////////////////////////////////// + let insert_query = format!( + "INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}" + ); + let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?; + executor + .execute_with_custom_plan(&insert_query, warehouse_name.as_str()) + .await + .context(crate::error::ExecutionSnafu)?; // let config = { // let mut config = Configuration::new(); @@ -438,65 +458,65 @@ impl ControlService for ControlServiceImpl { ////////////////////////////////////// - let config = { - HashMap::from([ - ("iceberg.catalog.type".to_string(), "rest".to_string()), - ( - "iceberg.catalog.demo.warehouse".to_string(), - warehouse_id.to_string(), - ), - ("iceberg.catalog.name".to_string(), "demo".to_string()), - ( - "iceberg.catalog.demo.uri".to_string(), - "http://0.0.0.0:3000/catalog".to_string(), - ), - ( - "iceberg.table.io.region".to_string(), - storage_profile.region.to_string(), - ), - ( - "iceberg.table.io.endpoint".to_string(), - storage_endpoint_url.to_string(), - ), - // ( - // "iceberg.table.io.bucket".to_string(), - // "examples".to_string(), - // ), - // ( - // "iceberg.table.io.access_key_id".to_string(), - // "minioadmin".to_string(), - // ), - // ( - // "iceberg.table.io.secret_access_key".to_string(), - // "minioadmin".to_string(), - // ), - ]) - }; - let catalog = icelake::catalog::load_catalog(&config).await?; - let table_ident = TableIdentifier::new(vec![database_name, table_name])?; - let mut table = catalog.load_table(&table_ident).await?; - let table_schema = table.current_arrow_schema()?; - println!("{:?}", table.table_name()); - - let df = ctx - .read_csv(path_string, CsvReadOptions::new().schema(&table_schema)) - .await?; - let data = df.collect().await?; - - let builder = table.writer_builder()?.rolling_writer_builder(None)?; - let mut writer = table - .writer_builder()? - .build_append_only_writer(builder) - .await?; - - for r in data { - writer.write(&r).await?; - } - - let res: Vec = writer.close().await?; - let mut txn = icelake::transaction::Transaction::new(&mut table); - txn.append_data_file(res); - txn.commit().await?; + // let config = { + // HashMap::from([ + // ("iceberg.catalog.type".to_string(), "rest".to_string()), + // ( + // "iceberg.catalog.demo.warehouse".to_string(), + // warehouse_id.to_string(), + // ), + // ("iceberg.catalog.name".to_string(), "demo".to_string()), + // ( + // "iceberg.catalog.demo.uri".to_string(), + // "http://0.0.0.0:3000/catalog".to_string(), + // ), + // ( + // "iceberg.table.io.region".to_string(), + // storage_profile.region.to_string(), + // ), + // ( + // "iceberg.table.io.endpoint".to_string(), + // storage_endpoint_url.to_string(), + // ), + // // ( + // // "iceberg.table.io.bucket".to_string(), + // // "examples".to_string(), + // // ), + // // ( + // // "iceberg.table.io.access_key_id".to_string(), + // // "minioadmin".to_string(), + // // ), + // // ( + // // "iceberg.table.io.secret_access_key".to_string(), + // // "minioadmin".to_string(), + // // ), + // ]) + // }; + // let catalog = icelake::catalog::load_catalog(&config).await?; + // let table_ident = TableIdentifier::new(vec![database_name, table_name])?; + // let mut table = catalog.load_table(&table_ident).await?; + // let table_schema = table.current_arrow_schema()?; + // println!("{:?}", table.table_name()); + // + // let df = ctx + // .read_csv(path_string, CsvReadOptions::new().schema(&table_schema)) + // .await?; + // let data = df.collect().await?; + // + // let builder = table.writer_builder()?.rolling_writer_builder(None)?; + // let mut writer = table + // .writer_builder()? + // .build_append_only_writer(builder) + // .await?; + // + // for r in data { + // writer.write(&r).await?; + // } + // + // let res: Vec = writer.close().await?; + // let mut txn = icelake::transaction::Transaction::new(&mut table); + // txn.append_data_file(res); + // txn.commit().await?; Ok(()) } diff --git a/crates/nexus/src/http/control/schemas/warehouses.rs b/crates/nexus/src/http/control/schemas/warehouses.rs index d3522e3c9..04806c82b 100644 --- a/crates/nexus/src/http/control/schemas/warehouses.rs +++ b/crates/nexus/src/http/control/schemas/warehouses.rs @@ -12,7 +12,6 @@ pub struct CreateWarehouseRequest { } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] - pub struct Warehouse { pub id: Uuid, pub prefix: String, diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index b753aa870..425daa65c 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -110,7 +110,7 @@ pub async fn query( // row_set_base_64: Option::from(result.clone()), row_set_base_64: None, #[allow(clippy::unwrap_used)] - row_set: serde_json::from_str(&result).unwrap(), + row_set: ResponseData::rows_to_vec(result.as_str())?, total: Some(1), query_result_format: Option::from("json".to_string()), error_code: None, diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index 80d323292..ee7876b76 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -192,6 +192,7 @@ async fn print_request_response( Ok(res) } +#[allow(clippy::future_not_send)] async fn buffer_and_print( direction: &str, method: &String, diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index be1df6b82..1367e355e 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -14,13 +14,13 @@ serde = { workspace = true } serde_json = { workspace = true } object_store = { workspace = true } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "073a3b110852f97ccb7085ce4bfd19473b8a3f4f" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "be8d668450f2f940a0bc8f428b6338644b3f4ef3" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/src/datafusion/error.rs b/crates/runtime/src/datafusion/error.rs index 9040a11bd..c79609b11 100644 --- a/crates/runtime/src/datafusion/error.rs +++ b/crates/runtime/src/datafusion/error.rs @@ -35,6 +35,9 @@ pub enum IcehutSQLError { #[snafu(display("Invalid scale: {scale}"))] InvalidScale { scale: String }, + #[snafu(display("Invalid table identifier: {ident}"))] + InvalidIdentifier { ident: String }, + #[snafu(display("Not implemented: {message}"))] NotImplemented { message: String }, } diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index d0c6885a1..87de2c8d8 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -7,27 +7,28 @@ use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; use arrow::array::{RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; -use datafusion::catalog::SchemaProvider; -use datafusion::catalog_common::information_schema::InformationSchemaProvider; -use datafusion::catalog_common::{ResolvedTableReference, TableReference}; -use datafusion::common::plan_datafusion_err; use datafusion::common::tree_node::{TransformedResult, TreeNode}; use datafusion::datasource::default_table_source::provider_as_source; use datafusion::execution::context::SessionContext; use datafusion::logical_expr::sqlparser::ast::Insert; -use datafusion::logical_expr::{ - CreateExternalTable as PlanCreateExternalTable, DdlStatement, LogicalPlan, -}; +use datafusion::logical_expr::LogicalPlan; use datafusion::sql::parser::{CreateExternalTable, Statement as DFStatement}; use datafusion::sql::sqlparser::ast::{ CreateTable as CreateTableStatement, Expr, Ident, ObjectName, Query, SchemaName, Statement, TableFactor, TableWithJoins, }; +use datafusion_common::DataFusionError; use datafusion_functions_json::register_all; use datafusion_iceberg::catalog::catalog::IcebergCatalog; use datafusion_iceberg::planner::iceberg_transform; +use iceberg_rust::catalog::create::CreateTable as CreateTableCatalog; +use iceberg_rust::spec::arrow::schema::new_fields_with_ids; +use iceberg_rust::spec::identifier::Identifier; use iceberg_rust::spec::namespace::Namespace; +use iceberg_rust::spec::schema::Schema; +use iceberg_rust::spec::types::StructType; use snafu::ResultExt; +use sqlparser::ast::{MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery}; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -72,12 +73,18 @@ impl SqlExecutor { Statement::CreateSchema { schema_name, .. } => { return self.create_schema(schema_name, warehouse_name).await; } - Statement::ShowVariable { .. } | Statement::Query { .. } => { + Statement::AlterTable { .. } + | Statement::Query { .. } + | Statement::ShowSchemas { .. } + | Statement::ShowVariable { .. } => { return Box::pin(self.execute_with_custom_plan(&query, warehouse_name)).await; } Statement::Drop { .. } => { return Box::pin(self.drop_table_query(&query, warehouse_name)).await; } + Statement::Merge { .. } => { + return Box::pin(self.merge_query(*s, warehouse_name)).await; + } _ => {} } } @@ -119,7 +126,7 @@ impl SqlExecutor { &self, statement: Statement, warehouse_name: &str, - warehouse_location: &str, + _warehouse_location: &str, ) -> IcehutSQLResult> { if let Statement::CreateTable(create_table_statement) = statement { let mut new_table_full_name = create_table_statement.name.to_string(); @@ -129,18 +136,14 @@ impl SqlExecutor { ident.insert(0, Ident::new(warehouse_name)); } let _new_table_wh_id = ident[0].clone(); - let new_table_db = &ident[1..ident.len() - 1] - .iter() - .map(|v| v.value.clone()) - .collect::>() - .join("."); - #[allow(clippy::unwrap_used)] - let new_table_name = ident.last().unwrap(); - let new_table_ref = TableReference::full( - warehouse_name, - new_table_db.to_string(), - new_table_name.value.clone(), - ); + let new_table_db = &ident[1..ident.len() - 1]; + let new_table_name = ident + .last() + .ok_or(ih_error::IcehutSQLError::InvalidIdentifier { + ident: new_table_full_name.clone(), + })? + .clone(); + let location = create_table_statement.location.clone(); // Replace the name of table that needs creation (for ex. "warehouse"."database"."table" -> "table") // And run the query - this will create an InMemory table @@ -162,38 +165,70 @@ impl SqlExecutor { .await .context(super::error::DataFusionSnafu)?; - // Create External table - if let LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(table)) = plan { - let external_table_plan = - LogicalPlan::Ddl(DdlStatement::CreateExternalTable(PlanCreateExternalTable { - schema: table.input.schema().clone(), - name: new_table_ref, - location: warehouse_location.to_string(), // Specify the external location - file_type: "ICEBERG".to_string(), // Specify the file type - table_partition_cols: vec![], // Specify partition columns if any - if_not_exists: table.if_not_exists, - temporary: table.temporary, - definition: None, // Specify the SQL definition if available - order_exprs: vec![], // Specify order expressions if any - unbounded: false, // Specify if the table is unbounded - options: HashMap::new(), // Specify table-specific options if any - constraints: table.constraints.clone(), - column_defaults: HashMap::new(), - })); - let transformed = external_table_plan - .transform(iceberg_transform) - .data() - .context(ih_error::DataFusionSnafu)?; - self.ctx - .execute_logical_plan(transformed) - .await - .context(ih_error::DataFusionSnafu)? - .collect() + let fields_with_ids = StructType::try_from(&new_fields_with_ids( + plan.schema().as_arrow().fields(), + &mut 0, + )) + .map_err(|err| DataFusionError::External(Box::new(err))) + .context(super::error::DataFusionSnafu)?; + let schema = Schema::builder() + .with_schema_id(0) + .with_identifier_field_ids(vec![]) + .with_fields(fields_with_ids) + .build() + .map_err(|err| DataFusionError::External(Box::new(err))) + .context(super::error::DataFusionSnafu)?; + + // Check if it already exists, if it is - drop it + // For now we behave as CREATE OR REPLACE + // TODO support CREATE without REPLACE + let catalog = self.ctx.catalog(warehouse_name).ok_or( + ih_error::IcehutSQLError::WarehouseNotFound { + name: warehouse_name.to_string(), + }, + )?; + let iceberg_catalog = catalog.as_any().downcast_ref::().ok_or( + ih_error::IcehutSQLError::IcebergCatalogNotFound { + warehouse_name: warehouse_name.to_string(), + }, + )?; + let rest_catalog = iceberg_catalog.catalog(); + let new_table_ident = Identifier::new( + &new_table_db + .iter() + .map(|v| v.value.clone()) + .collect::>(), + &new_table_name.value, + ); + if matches!( + rest_catalog.tabular_exists(&new_table_ident).await, + Ok(true) + ) { + rest_catalog + .drop_table(&new_table_ident) .await - .context(ih_error::DataFusionSnafu)?; - } + .context(ih_error::IcebergSnafu)?; + }; - // Insert data to External table + // Create new table + rest_catalog + .create_table( + new_table_ident.clone(), + CreateTableCatalog { + name: new_table_name.value.clone(), + location, + schema, + partition_spec: None, + write_order: None, + stage_create: None, + properties: None, + }, + ) + .await + .map_err(|err| DataFusionError::External(Box::new(err))) + .context(super::error::DataFusionSnafu)?; + + // Insert data to new table let insert_query = format!("INSERT INTO {new_table_full_name} SELECT * FROM {new_table_name}"); self.execute_with_custom_plan(&insert_query, warehouse_name) @@ -219,6 +254,94 @@ impl SqlExecutor { } } + pub async fn merge_query( + &self, + statement: Statement, + warehouse_name: &str, + ) -> IcehutSQLResult> { + if let Statement::Merge { + mut table, + mut source, + on, + clauses, + .. + } = statement + { + self.update_tables_in_table_factor(&mut table, warehouse_name); + self.update_tables_in_table_factor(&mut source, warehouse_name); + + let (target_table, target_alias) = self.get_table_with_alias(table); + let (source_table, _source_alias) = self.get_table_with_alias(source.clone()); + + let source_query = if let TableFactor::Derived { + subquery, + lateral, + alias, + } = source + { + source = TableFactor::Derived { + lateral, + subquery, + alias: None, + }; + alias.map_or_else(|| source.to_string(), |alias| format!("{source} {alias}")) + } else { + source.to_string() + }; + + // Construct the SELECT statement with JOIN ON + let select_query = + format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}"); + let source_result = self + .execute_with_custom_plan(&select_query, warehouse_name) + .await?; + + // Check NOT MATCHED part with the fallback INSERT + let insert_query = if source_result.is_empty() { + // Extract columns and values from clauses + let mut columns = Vec::new(); + let mut values = Vec::new(); + for clause in clauses { + if clause.clause_kind == MergeClauseKind::NotMatched { + if let MergeAction::Insert(insert) = clause.action { + columns = insert.columns; + if let MergeInsertKind::Values(values_insert) = insert.kind { + values = values_insert.rows.into_iter().flatten().collect(); + } + } + } + } + // Construct the INSERT statement + format!( + "INSERT INTO {} ({}) SELECT {} FROM {}", + target_table, + columns + .iter() + .map(std::string::ToString::to_string) + .collect::>() + .join(", "), + values + .iter() + .map(std::string::ToString::to_string) + .collect::>() + .join(", "), + source_table + ) + } else { + format!("INSERT INTO {warehouse_name} {select_query}") + }; + + self.execute_with_custom_plan(&insert_query, warehouse_name) + .await + } else { + Err(super::error::IcehutSQLError::DataFusion { + source: DataFusionError::NotImplemented( + "Only CREATE TABLE statements are supported".to_string(), + ), + }) + } + } + pub async fn drop_table_query( &self, query: &str, @@ -314,9 +437,9 @@ impl SqlExecutor { .context(super::error::DataFusionSnafu)?; //println!("References: {:?}", references); for reference in references { - let resolved = self.resolve_table_ref(reference); + let resolved = state.resolve_table_ref(reference); if let Entry::Vacant(v) = ctx_provider.tables.entry(resolved.to_string()) { - if let Ok(schema) = self.schema_for_ref(resolved.clone()) { + if let Ok(schema) = state.schema_for_ref(resolved.clone()) { if let Some(table) = schema .table(&resolved.table) .await @@ -362,42 +485,6 @@ impl SqlExecutor { }) } } - - pub fn resolve_table_ref( - &self, - table_ref: impl Into, - ) -> ResolvedTableReference { - let catalog = &self.ctx.state().config_options().catalog.clone(); - table_ref - .into() - .resolve(&catalog.default_catalog, &catalog.default_schema) - } - - pub fn schema_for_ref( - &self, - table_ref: impl Into, - ) -> IcehutSQLResult> { - let state = self.ctx.state(); - let resolved_ref = self.resolve_table_ref(table_ref); - if state.config().information_schema() && *resolved_ref.schema == *"information_schema" { - return Ok(Arc::new(InformationSchemaProvider::new( - state.catalog_list().clone(), - ))); - } - - // Need better error handling here instead of just DF errors - state - .catalog_list() - .catalog(&resolved_ref.catalog) - .ok_or_else(|| super::error::IcehutSQLError::DataFusion { - source: plan_datafusion_err!("failed to resolve catalog: {}", resolved_ref.catalog), - })? - .schema(&resolved_ref.schema) - .ok_or_else(|| super::error::IcehutSQLError::DataFusion { - source: plan_datafusion_err!("failed to resolve schema: {}", resolved_ref.schema), - }) - } - pub async fn execute_with_custom_plan( &self, query: &str, @@ -430,11 +517,40 @@ impl SqlExecutor { DFStatement::CreateExternalTable(modified_statement) } DFStatement::Statement(s) => match *s { + Statement::AlterTable { + name, + if_exists, + only, + operations, + location, + on_cluster, + } => { + let name = self.compress_database_name(name.0, warehouse_name); + let modified_statement = Statement::AlterTable { + name: ObjectName(name), + if_exists, + only, + operations, + location, + on_cluster, + }; + DFStatement::Statement(Box::new(modified_statement)) + } Statement::Insert(insert_statement) => { let table_name = self.compress_database_name(insert_statement.table_name.0, warehouse_name); + + let source = insert_statement.source.map_or_else( + || None, + |mut query| { + self.update_tables_in_query(query.as_mut(), warehouse_name); + Some(Box::new(AstQuery { ..*query })) + }, + ); + let modified_statement = Insert { table_name: ObjectName(table_name), + source, ..insert_statement }; DFStatement::Statement(Box::new(Statement::Insert(modified_statement))) @@ -514,6 +630,28 @@ impl SqlExecutor { table_name } + #[allow(clippy::only_used_in_recursion)] + fn get_table_with_alias(&self, factor: TableFactor) -> (ObjectName, String) { + match factor { + TableFactor::Table { name, alias, .. } => { + let target_alias = alias.map_or_else(String::new, |alias| alias.to_string()); + (name, target_alias) + } + TableFactor::Derived { + subquery, alias, .. + } => { + let target_alias = alias.map_or_else(String::new, |alias| alias.to_string()); + if let sqlparser::ast::SetExpr::Select(select) = subquery.body.as_ref() { + if let Some(table_with_joins) = select.from.first() { + return self.get_table_with_alias(table_with_joins.relation.clone()); + } + } + (ObjectName(vec![]), target_alias) + } + _ => (ObjectName(vec![]), String::new()), + } + } + fn update_tables_in_query(&self, query: &mut Query, warehouse_name: &str) { if let Some(with) = query.with.as_mut() { for cte in &mut with.cte_tables { diff --git a/crates/runtime/src/datafusion/functions/greatest.rs b/crates/runtime/src/datafusion/functions/greatest.rs deleted file mode 100644 index 13746a7e4..000000000 --- a/crates/runtime/src/datafusion/functions/greatest.rs +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::datafusion::functions::greatest_least_utils::GreatestLeastOperator; -use arrow::array::{make_comparator, Array, BooleanArray}; -use arrow::buffer::BooleanBuffer; -use arrow::compute::kernels::cmp; -use arrow::compute::SortOptions; -use arrow::datatypes::DataType; -use datafusion::common::{internal_err, Result, ScalarValue}; -use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; -use datafusion::logical_expr::{ColumnarValue, Documentation}; -use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; -use std::sync::OnceLock; - -use super::macros::make_udf_function; - -const SORT_OPTIONS: SortOptions = SortOptions { - // We want greatest first - descending: false, - - // NULL will be less than any other value - nulls_first: true, -}; - -#[derive(Debug)] -pub struct GreatestFunc { - signature: Signature, -} - -impl Default for GreatestFunc { - fn default() -> Self { - Self::new() - } -} - -impl GreatestFunc { - #[must_use] - pub fn new() -> Self { - Self { - signature: Signature::user_defined(Volatility::Immutable), - } - } -} - -impl GreatestLeastOperator for GreatestFunc { - const NAME: &'static str = "greatest"; - - fn keep_scalar<'a>(lhs: &'a ScalarValue, rhs: &'a ScalarValue) -> Result<&'a ScalarValue> { - if !lhs.data_type().is_nested() { - return if lhs >= rhs { Ok(lhs) } else { Ok(rhs) }; - } - - // If complex type we can't compare directly as we want null values to be smaller - let cmp = make_comparator( - lhs.to_array()?.as_ref(), - rhs.to_array()?.as_ref(), - SORT_OPTIONS, - )?; - - if cmp(0, 0).is_ge() { - Ok(lhs) - } else { - Ok(rhs) - } - } - - /// Return boolean array where `arr[i] = lhs[i] >= rhs[i]` for all i, where `arr` is the result array - /// Nulls are always considered smaller than any other value - fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { - // Fast path: - // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorized kernel - // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. - // - both array does not have any nulls: cmp::gt_eq will return null if any of the input is null while we want to return false in that case - if !lhs.data_type().is_nested() - && lhs.logical_null_count() == 0 - && rhs.logical_null_count() == 0 - { - return cmp::gt_eq(&lhs, &rhs).map_err(Into::into); - } - - let cmp = make_comparator(lhs, rhs, SORT_OPTIONS)?; - - if lhs.len() != rhs.len() { - return internal_err!("All arrays should have the same length for greatest comparison"); - } - - let values = BooleanBuffer::collect_bool(lhs.len(), |i| cmp(i, i).is_ge()); - - // No nulls as we only want to keep the values that are larger, its either true or false - Ok(BooleanArray::new(values, None)) - } -} - -impl ScalarUDFImpl for GreatestFunc { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &'static str { - "greatest" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - super::greatest_least_utils::execute_conditional::(args) - } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let coerced_type = super::greatest_least_utils::find_coerced_type::(arg_types)?; - - Ok(vec![coerced_type; arg_types.len()]) - } - - fn documentation(&self) -> Option<&Documentation> { - Some(get_greatest_doc()) - } -} -static DOCUMENTATION: OnceLock = OnceLock::new(); - -#[allow(clippy::unwrap_used)] -fn get_greatest_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder(DOC_SECTION_CONDITIONAL, "return the expression with the greatest value", "greatest(4, 7, 5)") - .with_sql_example("```sql -> select greatest(4, 7, 5); -+---------------------------+ -| greatest(4,7,5) | -+---------------------------+ -| 7 | -+---------------------------+ -```", - ) - .with_argument( - "expression1, expression_n", - "Expressions to compare and return the greatest value.. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary.", - ) - .build() - }) -} - -make_udf_function!(GreatestFunc); diff --git a/crates/runtime/src/datafusion/functions/greatest_least_utils.rs b/crates/runtime/src/datafusion/functions/greatest_least_utils.rs deleted file mode 100644 index b7f985404..000000000 --- a/crates/runtime/src/datafusion/functions/greatest_least_utils.rs +++ /dev/null @@ -1,126 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::{Array, ArrayRef, BooleanArray}; -use arrow::compute::kernels::zip::zip; -use arrow::datatypes::DataType; -use datafusion::common::{internal_err, plan_err, Result, ScalarValue}; -use datafusion::logical_expr::type_coercion::binary::type_union_resolution; -use datafusion::logical_expr::ColumnarValue; -use std::sync::Arc; - -pub(super) trait GreatestLeastOperator { - const NAME: &'static str; - - fn keep_scalar<'a>(lhs: &'a ScalarValue, rhs: &'a ScalarValue) -> Result<&'a ScalarValue>; - - /// Return array with true for values that we should keep from the lhs array - fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result; -} - -fn keep_array(lhs: &ArrayRef, rhs: &ArrayRef) -> Result { - // True for values that we should keep from the left array - let keep_lhs = Op::get_indexes_to_keep(lhs.as_ref(), rhs.as_ref())?; - - let result = zip(&keep_lhs, &lhs, &rhs)?; - - Ok(result) -} - -pub(super) fn execute_conditional( - args: &[ColumnarValue], -) -> Result { - if args.is_empty() { - return internal_err!( - "{} was called with no arguments. It requires at least 1.", - Op::NAME - ); - } - - // Some engines (e.g. SQL Server) allow greatest/least with single arg, it's a noop - if args.len() == 1 { - return Ok(args[0].clone()); - } - - // Split to scalars and arrays for later optimization - let (scalars, arrays): (Vec<_>, Vec<_>) = args.iter().partition(|x| match x { - ColumnarValue::Scalar(_) => true, - ColumnarValue::Array(_) => false, - }); - - let mut arrays_iter = arrays.iter().map(|x| match x { - ColumnarValue::Array(a) => a, - ColumnarValue::Scalar(_) => unreachable!(), - }); - - //let first_array = arrays_iter.next(); - if let Some(first_array) = arrays_iter.next() { - let mut result: ArrayRef = if scalars.is_empty() { - // If we only have arrays, start with the first array - // (We must have at least one array) - Arc::clone(first_array) - } else { - let mut scalars_iter = scalars.iter().map(|x| match x { - ColumnarValue::Scalar(s) => s, - ColumnarValue::Array(_) => unreachable!(), - }); - - // We have at least one scalar - #[allow(clippy::unwrap_used)] - let mut result_scalar = scalars_iter.next().unwrap(); - - for scalar in scalars_iter { - result_scalar = Op::keep_scalar(result_scalar, scalar)?; - } - - // If we only have scalars, return the one that we should keep (largest/least) - if arrays.is_empty() { - return Ok(ColumnarValue::Scalar(result_scalar.clone())); - } - - // We have at least one array - - // Start with the result value - keep_array::( - &Arc::clone(first_array), - &result_scalar.to_array_of_size(first_array.len())?, - )? - }; - - for array in arrays_iter { - result = keep_array::(&Arc::clone(array), &result)?; - } - Ok(ColumnarValue::Array(result)) - } else { - internal_err!("Expected at least one array argument") - } -} - -pub(super) fn find_coerced_type( - data_types: &[DataType], -) -> Result { - if data_types.is_empty() { - plan_err!( - "{} was called without any arguments. It requires at least 1.", - Op::NAME - ) - } else if let Some(coerced_type) = type_union_resolution(data_types) { - Ok(coerced_type) - } else { - plan_err!("Cannot find a common type for arguments") - } -} diff --git a/crates/runtime/src/datafusion/functions/least.rs b/crates/runtime/src/datafusion/functions/least.rs deleted file mode 100644 index 6db089fa7..000000000 --- a/crates/runtime/src/datafusion/functions/least.rs +++ /dev/null @@ -1,175 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::datafusion::functions::greatest_least_utils::GreatestLeastOperator; -use arrow::array::{make_comparator, Array, BooleanArray}; -use arrow::buffer::BooleanBuffer; -use arrow::compute::kernels::cmp; -use arrow::compute::SortOptions; -use arrow::datatypes::DataType; -use datafusion::common::{internal_err, Result, ScalarValue}; -use datafusion::logical_expr::scalar_doc_sections::DOC_SECTION_CONDITIONAL; -use datafusion::logical_expr::{ColumnarValue, Documentation}; -use datafusion::logical_expr::{ScalarUDFImpl, Signature, Volatility}; -use std::any::Any; -use std::sync::OnceLock; - -const SORT_OPTIONS: SortOptions = SortOptions { - // Having the smallest result first - descending: false, - - // NULL will be greater than any other value - nulls_first: false, -}; - -#[derive(Debug)] -pub struct LeastFunc { - signature: Signature, -} - -impl Default for LeastFunc { - fn default() -> Self { - Self::new() - } -} - -impl LeastFunc { - #[must_use] - pub fn new() -> Self { - Self { - signature: Signature::user_defined(Volatility::Immutable), - } - } -} - -impl GreatestLeastOperator for LeastFunc { - const NAME: &'static str = "least"; - - fn keep_scalar<'a>(lhs: &'a ScalarValue, rhs: &'a ScalarValue) -> Result<&'a ScalarValue> { - // Manual checking for nulls as: - // 1. If we're going to use <=, in Rust None is smaller than Some(T), which we don't want - // 2. And we can't use make_comparator as it has no natural order (Arrow error) - if lhs.is_null() { - return Ok(rhs); - } - - if rhs.is_null() { - return Ok(lhs); - } - - if !lhs.data_type().is_nested() { - return if lhs <= rhs { Ok(lhs) } else { Ok(rhs) }; - } - - // Not using <= as in Rust None is smaller than Some(T) - - // If complex type we can't compare directly as we want null values to be larger - let cmp = make_comparator( - lhs.to_array()?.as_ref(), - rhs.to_array()?.as_ref(), - SORT_OPTIONS, - )?; - - if cmp(0, 0).is_le() { - Ok(lhs) - } else { - Ok(rhs) - } - } - - /// Return boolean array where `arr[i] = lhs[i] <= rhs[i]` for all i, where `arr` is the result array - /// Nulls are always considered larger than any other value - fn get_indexes_to_keep(lhs: &dyn Array, rhs: &dyn Array) -> Result { - // Fast path: - // If both arrays are not nested, have the same length and no nulls, we can use the faster vectorized kernel - // - If both arrays are not nested: Nested types, such as lists, are not supported as the null semantics are not well-defined. - // - both array does not have any nulls: cmp::lt_eq will return null if any of the input is null while we want to return false in that case - if !lhs.data_type().is_nested() - && lhs.logical_null_count() == 0 - && rhs.logical_null_count() == 0 - { - return cmp::lt_eq(&lhs, &rhs).map_err(Into::into); - } - - let cmp = make_comparator(lhs, rhs, SORT_OPTIONS)?; - - if lhs.len() != rhs.len() { - return internal_err!("All arrays should have the same length for least comparison"); - } - - let values = BooleanBuffer::collect_bool(lhs.len(), |i| cmp(i, i).is_le()); - - // No nulls as we only want to keep the values that are smaller, its either true or false - Ok(BooleanArray::new(values, None)) - } -} - -impl ScalarUDFImpl for LeastFunc { - fn as_any(&self) -> &dyn Any { - self - } - - fn name(&self) -> &'static str { - "least" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) - } - - fn invoke(&self, args: &[ColumnarValue]) -> Result { - super::greatest_least_utils::execute_conditional::(args) - } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let coerced_type = super::greatest_least_utils::find_coerced_type::(arg_types)?; - - Ok(vec![coerced_type; arg_types.len()]) - } - - fn documentation(&self) -> Option<&Documentation> { - Some(get_smallest_doc()) - } -} -static DOCUMENTATION: OnceLock = OnceLock::new(); - -#[allow(clippy::unwrap_used)] -fn get_smallest_doc() -> &'static Documentation { - DOCUMENTATION.get_or_init(|| { - Documentation::builder(DOC_SECTION_CONDITIONAL, "return the expression with the smallest value", "least(4,7,5)") - .with_sql_example(r"```sql -> select least(4, 7, 5); -+---------------------------+ -| least(4,7,5) | -+---------------------------+ -| 4 | -+---------------------------+ -```", - ) - .with_argument( - "expression1, expression_n", - "Expressions to compare and return the smallest value. Can be a constant, column, or function, and any combination of arithmetic operators. Pass as many expression arguments as necessary.", - ) - .build() - }) -} - -super::macros::make_udf_function!(LeastFunc); diff --git a/crates/runtime/src/datafusion/functions/mod.rs b/crates/runtime/src/datafusion/functions/mod.rs index d6871d6e8..e383f8242 100644 --- a/crates/runtime/src/datafusion/functions/mod.rs +++ b/crates/runtime/src/datafusion/functions/mod.rs @@ -4,17 +4,12 @@ use datafusion::{common::Result, execution::FunctionRegistry, logical_expr::Scal mod convert_timezone; mod date_add; -mod greatest; -mod greatest_least_utils; -mod least; mod parse_json; pub(crate) fn register_udfs(registry: &mut dyn FunctionRegistry) -> Result<()> { let functions: Vec> = vec![ convert_timezone::get_udf(), date_add::get_udf(), - greatest::get_udf(), - least::get_udf(), parse_json::get_udf(), ]; diff --git a/crates/runtime/src/datafusion/planner.rs b/crates/runtime/src/datafusion/planner.rs index 564a228a1..ce4bb142b 100644 --- a/crates/runtime/src/datafusion/planner.rs +++ b/crates/runtime/src/datafusion/planner.rs @@ -91,6 +91,8 @@ where let planner_context: &mut PlannerContext = &mut PlannerContext::new(); // Example: Custom handling for a specific statement match statement.clone() { + Statement::AlterTable { .. } => Ok(LogicalPlan::default()), + Statement::ShowSchemas { .. } => self.show_variable_to_plan(&["schemas".into()]), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), Statement::CreateTable(CreateTableStatement { query, diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 32a9f042d..0f08a6b93 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -134,6 +134,7 @@ impl Db { /// /// Returns a `DbError` if the database operations fail, or /// `SerializeError`/`DeserializeError` if the value cannot be serialized or deserialized. + #[allow(clippy::future_not_send)] pub async fn modify(&self, key: &str, f: impl Fn(&mut T)) -> Result<()> where T: serde::Serialize + DeserializeOwned + Default + Sync, From 42a9005e3d2899b2ff2b52265571c6cb97e9188d Mon Sep 17 00:00:00 2001 From: osipovartem Date: Thu, 16 Jan 2025 13:52:00 +0300 Subject: [PATCH 2/7] Merge --- crates/runtime/src/datafusion/planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/runtime/src/datafusion/planner.rs b/crates/runtime/src/datafusion/planner.rs index ce4bb142b..9329f5edf 100644 --- a/crates/runtime/src/datafusion/planner.rs +++ b/crates/runtime/src/datafusion/planner.rs @@ -306,7 +306,7 @@ where let data_type = self.convert_data_type(&field.field_type)?; let field_name = field.field_name.as_ref().map_or_else( || Ident::new(format!("c{idx}")), - std::clone::Clone::clone, + Clone::clone, ); Ok(Arc::new(Field::new( self.ident_normalizer.normalize(field_name), From ddf10ae902ac0772eb32afb2434a6ab0deaa1800 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Thu, 16 Jan 2025 15:36:29 +0300 Subject: [PATCH 3/7] Add full message text into the error --- Cargo.toml | 2 +- crates/control_plane/Cargo.toml | 12 ++++++------ crates/nexus/src/http/dbt/error.rs | 17 ++++++++++++++++- crates/nexus/src/http/dbt/handlers.rs | 2 +- crates/runtime/Cargo.toml | 12 ++++++------ crates/runtime/src/datafusion/execution.rs | 2 +- 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8bd8b86b..c25256644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ datafusion = { version = "43" } snafu = { version = "0.8.5", features = ["futures"] } [patch.crates-io] -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } [workspace.lints.clippy] diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index baf7af042..bd61d41da 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" } #iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } #datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/nexus/src/http/dbt/error.rs b/crates/nexus/src/http/dbt/error.rs index dfdce9475..9e4b881ab 100644 --- a/crates/nexus/src/http/dbt/error.rs +++ b/crates/nexus/src/http/dbt/error.rs @@ -57,9 +57,24 @@ impl IntoResponse for DbtError { Self::NotImplemented => http::StatusCode::NOT_IMPLEMENTED, }; + let message = match &self { + Self::GZipDecompress { source } => format!("failed to decompress GZip body: {source}"), + Self::LoginRequestParse { source } => { + format!("failed to parse login request: {source}") + } + Self::QueryBodyParse { source } => format!("failed to parse query body: {source}"), + Self::InvalidWarehouseIdFormat { source } => format!("invalid warehouse_id: {source}"), + Self::ControlService { source } => source.to_string(), + Self::RowParse { source } => format!("failed to parse row JSON: {source}"), + Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => { + "session error".to_string() + } + Self::NotImplemented => "feature not implemented".to_string(), + }; + let body = Json(JsonResponse { success: false, - message: Some(self.to_string()), + message: Some(message), data: None, code: Some(status_code.as_u16().to_string()), }); diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index 425daa65c..e5b84d39c 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -32,7 +32,7 @@ pub async fn login( //println!("Received login request: {:?}", query); //println!("Body data parameters: {:?}", body_json); - let token = uuid::Uuid::new_v4().to_string(); + let token = Uuid::new_v4().to_string(); let warehouses = state .control_svc diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 1367e355e..be1f8afcc 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -14,13 +14,13 @@ serde = { workspace = true } serde_json = { workspace = true } object_store = { workspace = true } -datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } -datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "bc5a978a4102391b2b7427dfdf94dd4e2667be49" } +datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } +datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "5d4211521085722de35b14c444da087c52309771" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 87de2c8d8..fac0a91cb 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -107,7 +107,7 @@ impl SqlExecutor { pub fn preprocess_query(&self, query: &str) -> String { // Replace field[0].subfield -> json_get(json_get(field, 0), 'subfield') // TODO: This regex should be a static allocation - let re = regex::Regex::new(r"(\w+)\[(\d+)]\.(\w+)").unwrap(); + let re = regex::Regex::new(r"(\w+)\[(\d+)][:\.](\w+)").unwrap(); let date_add = regex::Regex::new(r"(date|time|timestamp)(_?add)\(\s*([a-zA-Z]+),").unwrap(); let query = re From 325eb5b43f85a0049c24ab791d434f9e648c3819 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 17 Jan 2025 11:23:06 +0300 Subject: [PATCH 4/7] Fix --- crates/nexus/src/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index ee7876b76..80d323292 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -192,7 +192,6 @@ async fn print_request_response( Ok(res) } -#[allow(clippy::future_not_send)] async fn buffer_and_print( direction: &str, method: &String, From 2eb24aea292aa5af3dc7dba83a967acc50720d26 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 17 Jan 2025 17:23:44 +0300 Subject: [PATCH 5/7] Update MERGE to insert only not matched records --- crates/nexus/src/main.rs | 2 +- crates/runtime/src/datafusion/execution.rs | 114 +++++++++++++-------- 2 files changed, 74 insertions(+), 42 deletions(-) diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index 80d323292..142d19b52 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -199,7 +199,7 @@ async fn buffer_and_print( body: B, ) -> Result where - B: axum::body::HttpBody, + B: axum::body::HttpBody + Send, B::Error: std::fmt::Display, { let bytes = match body.collect().await { diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index fac0a91cb..2f29d760b 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -247,7 +247,7 @@ impl SqlExecutor { created_entity_response().context(ih_error::ArrowSnafu) } else { Err(super::error::IcehutSQLError::DataFusion { - source: datafusion::error::DataFusionError::NotImplemented( + source: DataFusionError::NotImplemented( "Only CREATE TABLE statements are supported".to_string(), ), }) @@ -289,54 +289,49 @@ impl SqlExecutor { source.to_string() }; - // Construct the SELECT statement with JOIN ON + // Check NOT MATCHED for records to INSERT + let where_clause = self.get_join_on_where_clause(*on.clone(), target_alias.as_str()); let select_query = - format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}"); - let source_result = self - .execute_with_custom_plan(&select_query, warehouse_name) + format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}{where_clause}"); + self.execute_with_custom_plan(&select_query, warehouse_name) .await?; - // Check NOT MATCHED part with the fallback INSERT - let insert_query = if source_result.is_empty() { - // Extract columns and values from clauses - let mut columns = Vec::new(); - let mut values = Vec::new(); - for clause in clauses { - if clause.clause_kind == MergeClauseKind::NotMatched { - if let MergeAction::Insert(insert) = clause.action { - columns = insert.columns; - if let MergeInsertKind::Values(values_insert) = insert.kind { - values = values_insert.rows.into_iter().flatten().collect(); - } + // Extract columns and values from clauses + let mut columns = Vec::new(); + let mut values = Vec::new(); + for clause in clauses { + if clause.clause_kind == MergeClauseKind::NotMatched { + if let MergeAction::Insert(insert) = clause.action { + columns = insert.columns; + if let MergeInsertKind::Values(values_insert) = insert.kind { + values = values_insert.rows.into_iter().flatten().collect(); } } } - // Construct the INSERT statement - format!( - "INSERT INTO {} ({}) SELECT {} FROM {}", - target_table, - columns - .iter() - .map(std::string::ToString::to_string) - .collect::>() - .join(", "), - values - .iter() - .map(std::string::ToString::to_string) - .collect::>() - .join(", "), - source_table - ) - } else { - format!("INSERT INTO {warehouse_name} {select_query}") - }; + } + // Construct the INSERT statement + let insert_query = format!( + "INSERT INTO {} ({}) SELECT {} FROM {}", + target_table, + columns + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "), + values + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "), + source_table + ); self.execute_with_custom_plan(&insert_query, warehouse_name) .await } else { Err(super::error::IcehutSQLError::DataFusion { source: DataFusionError::NotImplemented( - "Only CREATE TABLE statements are supported".to_string(), + "Only MERGE statements are supported".to_string(), ), }) } @@ -404,7 +399,7 @@ impl SqlExecutor { } _ => { return Err(super::error::IcehutSQLError::DataFusion { - source: datafusion::error::DataFusionError::NotImplemented( + source: DataFusionError::NotImplemented( "Only simple schema names are supported".to_string(), ), }); @@ -479,7 +474,7 @@ impl SqlExecutor { .context(super::error::DataFusionSnafu) } else { Err(super::error::IcehutSQLError::DataFusion { - source: datafusion::error::DataFusionError::NotImplemented( + source: DataFusionError::NotImplemented( "Only SQL statements are supported".to_string(), ), }) @@ -500,6 +495,43 @@ impl SqlExecutor { .context(super::error::DataFusionSnafu) } + fn get_join_on_where_clause(&self, on: Expr, target_alias: &str) -> String { + if let Expr::BinaryOp { left, right, .. } = on { + let left_expr = self.get_expr_where_clause(*left, target_alias); + if left_expr.is_empty() { + return self.get_expr_where_clause(*right, target_alias); + } + return left_expr; + } + String::new() + } + + #[allow(clippy::only_used_in_recursion)] + fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> String { + let where_clause = String::new(); + match expr { + Expr::CompoundIdentifier(ident) => { + if ident.len() > 1 && ident[0].value == target_alias { + let ident_str = ident + .iter() + .map(|v| v.value.clone()) + .collect::>() + .join("."); + return format!(" WHERE {ident_str} IS NULL"); + } + where_clause + } + Expr::BinaryOp { left, right, .. } => { + let left_expr = self.get_expr_where_clause(*left, target_alias); + if left_expr.is_empty() { + return self.get_expr_where_clause(*right, target_alias); + } + left_expr + } + _ => where_clause, + } + } + #[must_use] pub fn update_statement_references( &self, @@ -660,7 +692,7 @@ impl SqlExecutor { } match query.body.as_mut() { - datafusion::sql::sqlparser::ast::SetExpr::Select(select) => { + sqlparser::ast::SetExpr::Select(select) => { for table_with_joins in &mut select.from { self.update_tables_in_table_with_joins(table_with_joins, warehouse_name); } @@ -669,7 +701,7 @@ impl SqlExecutor { self.update_tables_in_expr(expr, warehouse_name); } } - datafusion::sql::sqlparser::ast::SetExpr::Query(q) => { + sqlparser::ast::SetExpr::Query(q) => { self.update_tables_in_query(q, warehouse_name); } _ => {} From d497a6da31fd40dc3b76554c4f30d97f0a4a7d14 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 17 Jan 2025 17:26:14 +0300 Subject: [PATCH 6/7] Update deps --- crates/utils/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 0f08a6b93..32a9f042d 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -134,7 +134,6 @@ impl Db { /// /// Returns a `DbError` if the database operations fail, or /// `SerializeError`/`DeserializeError` if the value cannot be serialized or deserialized. - #[allow(clippy::future_not_send)] pub async fn modify(&self, key: &str, f: impl Fn(&mut T)) -> Result<()> where T: serde::Serialize + DeserializeOwned + Default + Sync, From 5496ed16371490b23cbd00e607da3195c8650b0e Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 17 Jan 2025 18:23:55 +0300 Subject: [PATCH 7/7] Update MERGE to insert only not matched records --- crates/runtime/src/datafusion/execution.rs | 45 +++++++++++----------- crates/runtime/src/datafusion/planner.rs | 4 +- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 2f29d760b..adf72cbfd 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -74,6 +74,9 @@ impl SqlExecutor { return self.create_schema(schema_name, warehouse_name).await; } Statement::AlterTable { .. } + | Statement::StartTransaction { .. } + | Statement::Commit { .. } + | Statement::Insert { .. } | Statement::Query { .. } | Statement::ShowSchemas { .. } | Statement::ShowVariable { .. } => { @@ -289,10 +292,21 @@ impl SqlExecutor { source.to_string() }; + // Prepare WHERE clause to filter unmatched records + let where_clause = self + .get_expr_where_clause(*on.clone(), target_alias.as_str()) + .iter() + .map(|v| format!("{v} IS NULL")) + .collect::>(); + let where_clause_str = if where_clause.is_empty() { + String::new() + } else { + format!(" WHERE {}", where_clause.join(" AND ")) + }; + // Check NOT MATCHED for records to INSERT - let where_clause = self.get_join_on_where_clause(*on.clone(), target_alias.as_str()); let select_query = - format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}{where_clause}"); + format!("SELECT * FROM {source_query} JOIN {target_table} {target_alias} ON {on}{where_clause_str}"); self.execute_with_custom_plan(&select_query, warehouse_name) .await?; @@ -495,20 +509,8 @@ impl SqlExecutor { .context(super::error::DataFusionSnafu) } - fn get_join_on_where_clause(&self, on: Expr, target_alias: &str) -> String { - if let Expr::BinaryOp { left, right, .. } = on { - let left_expr = self.get_expr_where_clause(*left, target_alias); - if left_expr.is_empty() { - return self.get_expr_where_clause(*right, target_alias); - } - return left_expr; - } - String::new() - } - #[allow(clippy::only_used_in_recursion)] - fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> String { - let where_clause = String::new(); + fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> Vec { match expr { Expr::CompoundIdentifier(ident) => { if ident.len() > 1 && ident[0].value == target_alias { @@ -517,18 +519,17 @@ impl SqlExecutor { .map(|v| v.value.clone()) .collect::>() .join("."); - return format!(" WHERE {ident_str} IS NULL"); + return vec![ident_str]; } - where_clause + vec![] } Expr::BinaryOp { left, right, .. } => { - let left_expr = self.get_expr_where_clause(*left, target_alias); - if left_expr.is_empty() { - return self.get_expr_where_clause(*right, target_alias); - } + let mut left_expr = self.get_expr_where_clause(*left, target_alias); + let right_expr = self.get_expr_where_clause(*right, target_alias); + left_expr.extend(right_expr); left_expr } - _ => where_clause, + _ => vec![], } } diff --git a/crates/runtime/src/datafusion/planner.rs b/crates/runtime/src/datafusion/planner.rs index 9329f5edf..84b28c02f 100644 --- a/crates/runtime/src/datafusion/planner.rs +++ b/crates/runtime/src/datafusion/planner.rs @@ -91,7 +91,9 @@ where let planner_context: &mut PlannerContext = &mut PlannerContext::new(); // Example: Custom handling for a specific statement match statement.clone() { - Statement::AlterTable { .. } => Ok(LogicalPlan::default()), + Statement::AlterTable { .. } + | Statement::StartTransaction { .. } + | Statement::Commit { .. } => Ok(LogicalPlan::default()), Statement::ShowSchemas { .. } => self.show_variable_to_plan(&["schemas".into()]), Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable), Statement::CreateTable(CreateTableStatement {