Skip to content
6 changes: 3 additions & 3 deletions crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand Down
6 changes: 3 additions & 3 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand Down
121 changes: 115 additions & 6 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ use iceberg_rust::spec::namespace::Namespace;
use iceberg_rust::spec::schema::Schema;
use iceberg_rust::spec::types::StructType;
use snafu::ResultExt;
use sqlparser::ast::{MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery};
use sqlparser::ast::helpers::attached_token::AttachedToken;
use sqlparser::ast::{
BinaryOperator, GroupByExpr, MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery,
Select, SelectItem,
};
use sqlparser::tokenizer::Span;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -77,11 +82,17 @@ impl SqlExecutor {
| Statement::StartTransaction { .. }
| Statement::Commit { .. }
| Statement::Insert { .. }
| Statement::Query { .. }
| Statement::ShowSchemas { .. }
| Statement::ShowVariable { .. } => {
return Box::pin(self.execute_with_custom_plan(&query, warehouse_name)).await;
}
Statement::Query(mut subquery) => {
self.update_qualify_in_query(subquery.as_mut());
return Box::pin(
self.execute_with_custom_plan(&subquery.to_string(), warehouse_name),
)
.await;
}
Statement::Drop { .. } => {
return Box::pin(self.drop_table_query(&query, warehouse_name)).await;
}
Expand Down Expand Up @@ -110,8 +121,9 @@ impl SqlExecutor {
pub fn preprocess_query(&self, query: &str) -> String {
// Replace field[0].subfield -> json_get(json_get(field, 0), 'subfield')
// TODO: This regex should be a static allocation
let re = regex::Regex::new(r"(\w+)\[(\d+)][:\.](\w+)").unwrap();
let date_add = regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap();
let re = regex::Regex::new(r"(\w+.\w+)\[(\d+)][:\.](\w+)").unwrap();
let date_add =
regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap();

let query = re
.replace_all(query, "json_get(json_get($1, $2), '$3')")
Expand Down Expand Up @@ -150,13 +162,19 @@ impl SqlExecutor {

// Replace the name of table that needs creation (for ex. "warehouse"."database"."table" -> "table")
// And run the query - this will create an InMemory table
let modified_statement = CreateTableStatement {
let mut modified_statement = CreateTableStatement {
name: ObjectName(vec![new_table_name.clone()]),
transient: false,
..create_table_statement
};

// Replace qualify with nested select
if let Some(ref mut query) = modified_statement.query {
self.update_qualify_in_query(query);
}
// Create InMemory table since external tables with "AS SELECT" are not supported
let updated_query = modified_statement.to_string();

let plan = self
.get_custom_logical_plan(&updated_query, warehouse_name)
.await?;
Expand Down Expand Up @@ -481,7 +499,7 @@ impl SqlExecutor {
}
}
}
// println!("Tables: {:?}", ctx_provider.tables.keys());

let planner = ExtendedSqlToRel::new(&ctx_provider);
planner
.sql_statement_to_plan(*s)
Expand Down Expand Up @@ -509,6 +527,97 @@ impl SqlExecutor {
.context(super::error::DataFusionSnafu)
}

#[allow(clippy::only_used_in_recursion)]
fn update_qualify_in_query(&self, query: &mut Query) {
if let Some(with) = query.with.as_mut() {
for cte in &mut with.cte_tables {
self.update_qualify_in_query(&mut cte.query);
}
}

match query.body.as_mut() {
sqlparser::ast::SetExpr::Select(select) => {
if let Some(Expr::BinaryOp { left, op, right }) = select.qualify.as_ref() {
if matches!(
op,
BinaryOperator::Eq | BinaryOperator::Lt | BinaryOperator::LtEq
) {
let mut inner_select = select.clone();
inner_select.qualify = None;
inner_select.projection.push(SelectItem::ExprWithAlias {
expr: *(left.clone()),
alias: Ident {
value: "qualify_alias".to_string(),
quote_style: None,
span: Span::empty(),
},
});
let subquery = Query {
with: None,
body: Box::new(sqlparser::ast::SetExpr::Select(inner_select)),
order_by: None,
limit: None,
limit_by: vec![],
offset: None,
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
};
let outer_select = Select {
select_token: AttachedToken::empty(),
distinct: None,
top: None,
top_before_distinct: false,
projection: vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident {
value: "*".to_string(),
quote_style: None,
span: Span::empty(),
}))],
into: None,
from: vec![TableWithJoins {
relation: TableFactor::Derived {
lateral: false,
subquery: Box::new(subquery),
alias: None,
},
joins: vec![],
}],
lateral_views: vec![],
prewhere: None,
selection: Some(Expr::BinaryOp {
left: Box::new(Expr::Identifier(Ident {
value: "qualify_alias".to_string(),
quote_style: None,
span: Span::empty(),
})),
op: op.clone(),
right: Box::new(*right.clone()),
}),
group_by: GroupByExpr::Expressions(vec![], vec![]),
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
window_before_qualify: false,
value_table_mode: None,
connect_by: None,
};

*query.body = sqlparser::ast::SetExpr::Select(Box::new(outer_select));
}
}
}
sqlparser::ast::SetExpr::Query(q) => {
self.update_qualify_in_query(q);
}
_ => {}
}
}

#[allow(clippy::only_used_in_recursion)]
fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> Vec<String> {
match expr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl ScalarUDFImpl for ConvertTimezoneFunc {
//should use local session time
//TODO: select convert_timezone('America/New_York, 'UTC', v3) with v3 a timestamp with value = '2025-01-06 08:00:00 America/New_York',
//should be parsed as the timezone None variant timestamp
#[allow(clippy::too_many_lines)]
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
match args.len() {
2 => {
Expand Down
38 changes: 24 additions & 14 deletions crates/runtime/src/datafusion/functions/date_diff.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use arrow::array::Array;
use arrow::compute::{date_part, DatePart};
use arrow::datatypes::DataType::Int64;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Int64;
use datafusion::common::{plan_err, Result};
use datafusion::logical_expr::TypeSignature::Coercible;
use datafusion::logical_expr::TypeSignatureClass;
use datafusion::logical_expr::{
ColumnarValue, ScalarUDFImpl, Signature, Volatility,
};
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion::scalar::ScalarValue;
use datafusion_common::{internal_err, types::logical_string};
use std::any::Any;
use std::vec;
use std::sync::Arc;
use std::vec;

#[derive(Debug)]
pub struct DateDiffFunc {
Expand Down Expand Up @@ -60,7 +58,11 @@ impl DateDiffFunc {
],
}
}
fn date_diff_func(date_or_time_expr1: &Arc<dyn Array>, date_or_time_expr2: &Arc<dyn Array>, unit_type: DatePart) -> Result<ColumnarValue> {
fn date_diff_func(
date_or_time_expr1: &Arc<dyn Array>,
date_or_time_expr2: &Arc<dyn Array>,
unit_type: DatePart,
) -> Result<ColumnarValue> {
let unit2 = date_part(date_or_time_expr2, unit_type)?;
let unit1 = date_part(date_or_time_expr1, unit_type)?;
Ok(ColumnarValue::Scalar(
Expand Down Expand Up @@ -159,14 +161,22 @@ impl ScalarUDFImpl for DateDiffFunc {
"second" | "s" | "sec" | "seconds" | "secs" => {
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Second)
}
"millisecond" | "ms" | "msec" | "milliseconds" => {
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Millisecond)
}
"microsecond" | "us" | "usec" | "microseconds" => {
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Microsecond)
}
"millisecond" | "ms" | "msec" | "milliseconds" => Self::date_diff_func(
&date_or_time_expr1,
&date_or_time_expr2,
DatePart::Millisecond,
),
"microsecond" | "us" | "usec" | "microseconds" => Self::date_diff_func(
&date_or_time_expr1,
&date_or_time_expr2,
DatePart::Microsecond,
),
"nanosecond" | "ns" | "nsec" | "nanosec" | "nsecond" | "nanoseconds" | "nanosecs" => {
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Nanosecond)
Self::date_diff_func(
&date_or_time_expr1,
&date_or_time_expr2,
DatePart::Nanosecond,
)
}
_ => plan_err!("Invalid date_or_time_part type")?,
}
Expand All @@ -176,4 +186,4 @@ impl ScalarUDFImpl for DateDiffFunc {
}
}

super::macros::make_udf_function!(DateDiffFunc);
super::macros::make_udf_function!(DateDiffFunc);
47 changes: 23 additions & 24 deletions crates/runtime/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,30 +231,38 @@ where
match sql_type {
SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
SQLDataType::TinyInt(_) => Ok(DataType::Int8),
SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
Ok(DataType::Int32)
}
SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
SQLDataType::UnsignedTinyInt(_) => Ok(DataType::UInt8),
SQLDataType::UnsignedSmallInt(_) | SQLDataType::UnsignedInt2(_) => Ok(DataType::UInt16),
SQLDataType::SmallInt(_) | SQLDataType::Int2(_)| SQLDataType::Int16 => Ok(DataType::Int16),
SQLDataType::Int(_)
| SQLDataType::Integer(_)
| SQLDataType::Int4(_)
| SQLDataType::Int32 => Ok(DataType::Int32) ,
SQLDataType::BigInt(_) | SQLDataType::Int8(_) | SQLDataType::Int64 => Ok(DataType::Int64),
SQLDataType::UnsignedTinyInt(_) | SQLDataType::UInt8 => Ok(DataType::UInt8),
SQLDataType::UnsignedSmallInt(_)
| SQLDataType::UnsignedInt2(_)
| SQLDataType::UInt16 => Ok(DataType::UInt16),
SQLDataType::UnsignedInt(_)
| SQLDataType::UnsignedInteger(_)
| SQLDataType::UnsignedInt4(_) => Ok(DataType::UInt32),
| SQLDataType::UnsignedInt4(_)
| SQLDataType::UInt32 => Ok(DataType::UInt32),
SQLDataType::Varchar(length) => match (length, true) {
(Some(_), false) => plan_err!(
"does not support Varchar with length, please set `support_varchar_with_length` to be true"
),
_ => Ok(DataType::Utf8),
},
SQLDataType::Blob(_) => Ok(DataType::Binary),
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64),
SQLDataType::Real | SQLDataType::Float4 | SQLDataType::Float(_) => {
Ok(DataType::Float32)
}
SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => {
Ok(DataType::Float64)
}
SQLDataType::UnsignedBigInt(_)
| SQLDataType::UnsignedInt8(_)
| SQLDataType::UInt64 => Ok(DataType::UInt64),
SQLDataType::Real
| SQLDataType::Float4
| SQLDataType::Float(_)
| SQLDataType::Float32=> Ok(DataType::Float32),
SQLDataType::Double
| SQLDataType::DoublePrecision
| SQLDataType::Float8
| SQLDataType::Float64 => Ok(DataType::Float64),
SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => Ok(DataType::Utf8),
SQLDataType::Timestamp(precision, tz_info) => {
let tz = if matches!(tz_info, TimezoneInfo::Tz)
Expand Down Expand Up @@ -386,19 +394,10 @@ where
| SQLDataType::BigDecimal(_)
| SQLDataType::Clob(_)
| SQLDataType::Bytes(_)
| SQLDataType::Int16
| SQLDataType::Int32
| SQLDataType::Int64
| SQLDataType::Int128
| SQLDataType::Int256
| SQLDataType::UInt8
| SQLDataType::UInt16
| SQLDataType::UInt32
| SQLDataType::UInt64
| SQLDataType::UInt128
| SQLDataType::UInt256
| SQLDataType::Float32
| SQLDataType::Float64
| SQLDataType::Date32
| SQLDataType::Datetime64(_, _)
| SQLDataType::FixedString(_)
Expand Down
4 changes: 2 additions & 2 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ impl Db {
///
/// Returns a `DbError` if the database operations fail, or
/// `SerializeError`/`DeserializeError` if the value cannot be serialized or deserialized.
pub async fn modify<T>(&self, key: &str, f: impl Fn(&mut T)) -> Result<()>
pub async fn modify<T>(&self, key: &str, f: impl Fn(&mut T) + Send) -> Result<()>
where
T: serde::Serialize + DeserializeOwned + Default + Sync,
T: serde::Serialize + DeserializeOwned + Default + Sync + Send,
{
let mut value: T = self.get(key).await?.unwrap_or_default();

Expand Down
Loading