diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index bd61d41da..bbf67fdc8 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -22,9 +22,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592 datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index be1f8afcc..4d7fcccf6 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -18,9 +18,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592 datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" } arrow = { version = "53" } arrow-json = { version = "53" } diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 7aeb176c8..3c024239b 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -28,7 +28,12 @@ use iceberg_rust::spec::namespace::Namespace; use iceberg_rust::spec::schema::Schema; use iceberg_rust::spec::types::StructType; use snafu::ResultExt; -use sqlparser::ast::{MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery}; +use sqlparser::ast::helpers::attached_token::AttachedToken; +use sqlparser::ast::{ + BinaryOperator, GroupByExpr, MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery, + Select, SelectItem, +}; +use sqlparser::tokenizer::Span; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -77,11 +82,17 @@ impl SqlExecutor { | Statement::StartTransaction { .. } | Statement::Commit { .. } | Statement::Insert { .. } - | Statement::Query { .. } | Statement::ShowSchemas { .. } | Statement::ShowVariable { .. } => { return Box::pin(self.execute_with_custom_plan(&query, warehouse_name)).await; } + Statement::Query(mut subquery) => { + self.update_qualify_in_query(subquery.as_mut()); + return Box::pin( + self.execute_with_custom_plan(&subquery.to_string(), warehouse_name), + ) + .await; + } Statement::Drop { .. } => { return Box::pin(self.drop_table_query(&query, warehouse_name)).await; } @@ -110,8 +121,9 @@ impl SqlExecutor { pub fn preprocess_query(&self, query: &str) -> String { // Replace field[0].subfield -> json_get(json_get(field, 0), 'subfield') // TODO: This regex should be a static allocation - let re = regex::Regex::new(r"(\w+)\[(\d+)][:\.](\w+)").unwrap(); - let date_add = regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap(); + let re = regex::Regex::new(r"(\w+.\w+)\[(\d+)][:\.](\w+)").unwrap(); + let date_add = + regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap(); let query = re .replace_all(query, "json_get(json_get($1, $2), '$3')") @@ -150,13 +162,19 @@ impl SqlExecutor { // Replace the name of table that needs creation (for ex. "warehouse"."database"."table" -> "table") // And run the query - this will create an InMemory table - let modified_statement = CreateTableStatement { + let mut modified_statement = CreateTableStatement { name: ObjectName(vec![new_table_name.clone()]), transient: false, ..create_table_statement }; + + // Replace qualify with nested select + if let Some(ref mut query) = modified_statement.query { + self.update_qualify_in_query(query); + } // Create InMemory table since external tables with "AS SELECT" are not supported let updated_query = modified_statement.to_string(); + let plan = self .get_custom_logical_plan(&updated_query, warehouse_name) .await?; @@ -481,7 +499,7 @@ impl SqlExecutor { } } } - // println!("Tables: {:?}", ctx_provider.tables.keys()); + let planner = ExtendedSqlToRel::new(&ctx_provider); planner .sql_statement_to_plan(*s) @@ -509,6 +527,97 @@ impl SqlExecutor { .context(super::error::DataFusionSnafu) } + #[allow(clippy::only_used_in_recursion)] + fn update_qualify_in_query(&self, query: &mut Query) { + if let Some(with) = query.with.as_mut() { + for cte in &mut with.cte_tables { + self.update_qualify_in_query(&mut cte.query); + } + } + + match query.body.as_mut() { + sqlparser::ast::SetExpr::Select(select) => { + if let Some(Expr::BinaryOp { left, op, right }) = select.qualify.as_ref() { + if matches!( + op, + BinaryOperator::Eq | BinaryOperator::Lt | BinaryOperator::LtEq + ) { + let mut inner_select = select.clone(); + inner_select.qualify = None; + inner_select.projection.push(SelectItem::ExprWithAlias { + expr: *(left.clone()), + alias: Ident { + value: "qualify_alias".to_string(), + quote_style: None, + span: Span::empty(), + }, + }); + let subquery = Query { + with: None, + body: Box::new(sqlparser::ast::SetExpr::Select(inner_select)), + order_by: None, + limit: None, + limit_by: vec![], + offset: None, + fetch: None, + locks: vec![], + for_clause: None, + settings: None, + format_clause: None, + }; + let outer_select = Select { + select_token: AttachedToken::empty(), + distinct: None, + top: None, + top_before_distinct: false, + projection: vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident { + value: "*".to_string(), + quote_style: None, + span: Span::empty(), + }))], + into: None, + from: vec![TableWithJoins { + relation: TableFactor::Derived { + lateral: false, + subquery: Box::new(subquery), + alias: None, + }, + joins: vec![], + }], + lateral_views: vec![], + prewhere: None, + selection: Some(Expr::BinaryOp { + left: Box::new(Expr::Identifier(Ident { + value: "qualify_alias".to_string(), + quote_style: None, + span: Span::empty(), + })), + op: op.clone(), + right: Box::new(*right.clone()), + }), + group_by: GroupByExpr::Expressions(vec![], vec![]), + cluster_by: vec![], + distribute_by: vec![], + sort_by: vec![], + having: None, + named_window: vec![], + qualify: None, + window_before_qualify: false, + value_table_mode: None, + connect_by: None, + }; + + *query.body = sqlparser::ast::SetExpr::Select(Box::new(outer_select)); + } + } + } + sqlparser::ast::SetExpr::Query(q) => { + self.update_qualify_in_query(q); + } + _ => {} + } + } + #[allow(clippy::only_used_in_recursion)] fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> Vec { match expr { diff --git a/crates/runtime/src/datafusion/functions/convert_timezone.rs b/crates/runtime/src/datafusion/functions/convert_timezone.rs index 611a0f7cf..11f521e80 100644 --- a/crates/runtime/src/datafusion/functions/convert_timezone.rs +++ b/crates/runtime/src/datafusion/functions/convert_timezone.rs @@ -130,6 +130,7 @@ impl ScalarUDFImpl for ConvertTimezoneFunc { //should use local session time //TODO: select convert_timezone('America/New_York, 'UTC', v3) with v3 a timestamp with value = '2025-01-06 08:00:00 America/New_York', //should be parsed as the timezone None variant timestamp + #[allow(clippy::too_many_lines)] fn invoke(&self, args: &[ColumnarValue]) -> Result { match args.len() { 2 => { diff --git a/crates/runtime/src/datafusion/functions/date_diff.rs b/crates/runtime/src/datafusion/functions/date_diff.rs index 06b111d9b..2bfddf17a 100644 --- a/crates/runtime/src/datafusion/functions/date_diff.rs +++ b/crates/runtime/src/datafusion/functions/date_diff.rs @@ -1,18 +1,16 @@ use arrow::array::Array; use arrow::compute::{date_part, DatePart}; -use arrow::datatypes::DataType::Int64; use arrow::datatypes::DataType; +use arrow::datatypes::DataType::Int64; use datafusion::common::{plan_err, Result}; use datafusion::logical_expr::TypeSignature::Coercible; use datafusion::logical_expr::TypeSignatureClass; -use datafusion::logical_expr::{ - ColumnarValue, ScalarUDFImpl, Signature, Volatility, -}; +use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use datafusion::scalar::ScalarValue; use datafusion_common::{internal_err, types::logical_string}; use std::any::Any; -use std::vec; use std::sync::Arc; +use std::vec; #[derive(Debug)] pub struct DateDiffFunc { @@ -60,7 +58,11 @@ impl DateDiffFunc { ], } } - fn date_diff_func(date_or_time_expr1: &Arc, date_or_time_expr2: &Arc, unit_type: DatePart) -> Result { + fn date_diff_func( + date_or_time_expr1: &Arc, + date_or_time_expr2: &Arc, + unit_type: DatePart, + ) -> Result { let unit2 = date_part(date_or_time_expr2, unit_type)?; let unit1 = date_part(date_or_time_expr1, unit_type)?; Ok(ColumnarValue::Scalar( @@ -159,14 +161,22 @@ impl ScalarUDFImpl for DateDiffFunc { "second" | "s" | "sec" | "seconds" | "secs" => { Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Second) } - "millisecond" | "ms" | "msec" | "milliseconds" => { - Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Millisecond) - } - "microsecond" | "us" | "usec" | "microseconds" => { - Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Microsecond) - } + "millisecond" | "ms" | "msec" | "milliseconds" => Self::date_diff_func( + &date_or_time_expr1, + &date_or_time_expr2, + DatePart::Millisecond, + ), + "microsecond" | "us" | "usec" | "microseconds" => Self::date_diff_func( + &date_or_time_expr1, + &date_or_time_expr2, + DatePart::Microsecond, + ), "nanosecond" | "ns" | "nsec" | "nanosec" | "nsecond" | "nanoseconds" | "nanosecs" => { - Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Nanosecond) + Self::date_diff_func( + &date_or_time_expr1, + &date_or_time_expr2, + DatePart::Nanosecond, + ) } _ => plan_err!("Invalid date_or_time_part type")?, } @@ -176,4 +186,4 @@ impl ScalarUDFImpl for DateDiffFunc { } } -super::macros::make_udf_function!(DateDiffFunc); \ No newline at end of file +super::macros::make_udf_function!(DateDiffFunc); diff --git a/crates/runtime/src/datafusion/planner.rs b/crates/runtime/src/datafusion/planner.rs index 84b28c02f..9d2044846 100644 --- a/crates/runtime/src/datafusion/planner.rs +++ b/crates/runtime/src/datafusion/planner.rs @@ -231,16 +231,20 @@ where match sql_type { SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean), SQLDataType::TinyInt(_) => Ok(DataType::Int8), - SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16), - SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => { - Ok(DataType::Int32) - } - SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64), - SQLDataType::UnsignedTinyInt(_) => Ok(DataType::UInt8), - SQLDataType::UnsignedSmallInt(_) | SQLDataType::UnsignedInt2(_) => Ok(DataType::UInt16), + SQLDataType::SmallInt(_) | SQLDataType::Int2(_)| SQLDataType::Int16 => Ok(DataType::Int16), + SQLDataType::Int(_) + | SQLDataType::Integer(_) + | SQLDataType::Int4(_) + | SQLDataType::Int32 => Ok(DataType::Int32) , + SQLDataType::BigInt(_) | SQLDataType::Int8(_) | SQLDataType::Int64 => Ok(DataType::Int64), + SQLDataType::UnsignedTinyInt(_) | SQLDataType::UInt8 => Ok(DataType::UInt8), + SQLDataType::UnsignedSmallInt(_) + | SQLDataType::UnsignedInt2(_) + | SQLDataType::UInt16 => Ok(DataType::UInt16), SQLDataType::UnsignedInt(_) | SQLDataType::UnsignedInteger(_) - | SQLDataType::UnsignedInt4(_) => Ok(DataType::UInt32), + | SQLDataType::UnsignedInt4(_) + | SQLDataType::UInt32 => Ok(DataType::UInt32), SQLDataType::Varchar(length) => match (length, true) { (Some(_), false) => plan_err!( "does not support Varchar with length, please set `support_varchar_with_length` to be true" @@ -248,13 +252,17 @@ where _ => Ok(DataType::Utf8), }, SQLDataType::Blob(_) => Ok(DataType::Binary), - SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64), - SQLDataType::Real | SQLDataType::Float4 | SQLDataType::Float(_) => { - Ok(DataType::Float32) - } - SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => { - Ok(DataType::Float64) - } + SQLDataType::UnsignedBigInt(_) + | SQLDataType::UnsignedInt8(_) + | SQLDataType::UInt64 => Ok(DataType::UInt64), + SQLDataType::Real + | SQLDataType::Float4 + | SQLDataType::Float(_) + | SQLDataType::Float32=> Ok(DataType::Float32), + SQLDataType::Double + | SQLDataType::DoublePrecision + | SQLDataType::Float8 + | SQLDataType::Float64 => Ok(DataType::Float64), SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => Ok(DataType::Utf8), SQLDataType::Timestamp(precision, tz_info) => { let tz = if matches!(tz_info, TimezoneInfo::Tz) @@ -386,19 +394,10 @@ where | SQLDataType::BigDecimal(_) | SQLDataType::Clob(_) | SQLDataType::Bytes(_) - | SQLDataType::Int16 - | SQLDataType::Int32 - | SQLDataType::Int64 | SQLDataType::Int128 | SQLDataType::Int256 - | SQLDataType::UInt8 - | SQLDataType::UInt16 - | SQLDataType::UInt32 - | SQLDataType::UInt64 | SQLDataType::UInt128 | SQLDataType::UInt256 - | SQLDataType::Float32 - | SQLDataType::Float64 | SQLDataType::Date32 | SQLDataType::Datetime64(_, _) | SQLDataType::FixedString(_) diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 32a9f042d..3d93abbde 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -134,9 +134,9 @@ impl Db { /// /// Returns a `DbError` if the database operations fail, or /// `SerializeError`/`DeserializeError` if the value cannot be serialized or deserialized. - pub async fn modify(&self, key: &str, f: impl Fn(&mut T)) -> Result<()> + pub async fn modify(&self, key: &str, f: impl Fn(&mut T) + Send) -> Result<()> where - T: serde::Serialize + DeserializeOwned + Default + Sync, + T: serde::Serialize + DeserializeOwned + Default + Sync + Send, { let mut value: T = self.get(key).await?.unwrap_or_default();