Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ slatedb-prefix/
warehouse-prefix/
**/*.parquet
crates/catalog/prefix/**
embucket-warehouse-test/
embucket-warehouse-test/
dummyprefix/
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ snafu = { version = "0.8.5", features = ["futures"] }
tracing = { version = "0.1" }

[patch.crates-io]
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }

[workspace.lints.clippy]
all={ level="deny", priority=-1 }
Expand Down
12 changes: 6 additions & 6 deletions crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" }
#iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }
#datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }

datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand Down
6 changes: 6 additions & 0 deletions crates/control_plane/src/models/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ pub enum ControlPlaneModelError {
#[snafu(display("Invalid directory `{directory}`"))]
InvalidDirectory { directory: String },

#[snafu(display("Invalid endpoint url `{url}`"))]
InvalidEndpointUrl {
url: String,
source: url::ParseError,
},

#[snafu(display("Cloud providerNot implemented"))]
CloudProviderNotImplemented { provider: String },

Expand Down
8 changes: 8 additions & 0 deletions crates/control_plane/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::convert::TryFrom;
use std::env;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;

pub mod error;
Expand Down Expand Up @@ -211,6 +212,13 @@ impl StorageProfile {
}
}

pub fn get_object_store_endpoint_url(&self) -> ControlPlaneModelResult<Url> {
let storage_endpoint_url = self.endpoint.clone().unwrap_or_default();
Url::parse(storage_endpoint_url.as_str()).context(error::InvalidEndpointUrlSnafu {
url: storage_endpoint_url,
})
}

// This is needed to initialize the catalog used in JanKaul code
pub fn get_object_store_builder(&self) -> ControlPlaneModelResult<ObjectStoreBuilder> {
// TODO remove duplicated code
Expand Down
10 changes: 10 additions & 0 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ impl ControlService for ControlServiceImpl {
executor
.ctx
.register_catalog(warehouse.name.clone(), Arc::new(catalog));

let object_store = storage_profile
.get_object_store()
.context(crate::error::InvalidStorageProfileSnafu)?;
let endpoint_url = storage_profile
.get_object_store_endpoint_url()
.map_err(|_| ControlPlaneError::MissingStorageEndpointURL)?;
executor
.ctx
.register_object_store(&endpoint_url, Arc::from(object_store));
}
(warehouse.name, warehouse.location)
};
Expand Down
13 changes: 7 additions & 6 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ serde_json = { workspace = true }
object_store = { workspace = true }
tracing = { workspace = true}

datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "818728a8aeea971546241aa19af35c74bc361d50" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "294df8b4a21bbee99e7d335c9d8f5bbcb40f500f" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand All @@ -33,6 +33,7 @@ regex = { version = "1.11.0" }

paste = "1"
snafu = { workspace = true }
url = { version = "2.5.4" }

[lints]
workspace = true
Expand Down
176 changes: 163 additions & 13 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use datafusion::execution::context::SessionContext;
use datafusion::execution::session_state::SessionContextProvider;
use datafusion::logical_expr::sqlparser::ast::Insert;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::CsvReadOptions;
use datafusion::sql::parser::{CreateExternalTable, DFParser, Statement as DFStatement};
use datafusion::sql::planner::IdentNormalizer;
use datafusion::sql::sqlparser::ast::{
Expand All @@ -28,6 +29,7 @@ use iceberg_rust::spec::identifier::Identifier;
use iceberg_rust::spec::namespace::Namespace;
use iceberg_rust::spec::schema::Schema;
use iceberg_rust::spec::types::StructType;
use object_store::aws::AmazonS3Builder;
use snafu::ResultExt;
use sqlparser::ast::helpers::attached_token::AttachedToken;
use sqlparser::ast::{
Expand All @@ -38,6 +40,7 @@ use sqlparser::tokenizer::Span;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

#[derive(Debug)]
pub struct TablePath {
Expand Down Expand Up @@ -87,12 +90,7 @@ impl SqlExecutor {
if let DFStatement::Statement(s) = statement {
match *s {
Statement::CreateTable { .. } => {
return Box::pin(self.create_table_query(
*s,
warehouse_name,
warehouse_location,
))
.await;
return Box::pin(self.create_table_query(*s, warehouse_name)).await;
}
Statement::CreateDatabase {
db_name,
Expand All @@ -111,6 +109,13 @@ impl SqlExecutor {
.create_schema(warehouse_name, schema_name, if_not_exists)
.await;
}
Statement::CreateStage { .. } => {
// We support only CSV uploads for now
return Box::pin(self.create_stage_query(*s, warehouse_name)).await;
}
Statement::CopyIntoSnowflake { .. } => {
return Box::pin(self.copy_into_snowflake_query(*s, warehouse_name)).await;
}
Statement::AlterTable { .. }
| Statement::StartTransaction { .. }
| Statement::Commit { .. }
Expand Down Expand Up @@ -166,10 +171,13 @@ impl SqlExecutor {
.to_string();
let query = date_add.replace_all(&query, "$1$2('$3',").to_string();
// TODO implement alter session logic
query.replace(
"alter session set query_tag = 'snowplow_dbt'",
"SHOW session",
)
query
.replace(
"alter session set query_tag = 'snowplow_dbt'",
"SHOW session",
)
.replace("skip_header=1", "skip_header=TRUE")
.replace("FROM @~/", "FROM ")
}

#[allow(clippy::redundant_else, clippy::too_many_lines)]
Expand All @@ -178,7 +186,6 @@ impl SqlExecutor {
&self,
statement: Statement,
warehouse_name: &str,
warehouse_location: &str,
) -> IcehutSQLResult<Vec<RecordBatch>> {
if let Statement::CreateTable(create_table_statement) = statement {
let mut new_table_full_name = create_table_statement.name.to_string();
Expand Down Expand Up @@ -313,6 +320,149 @@ impl SqlExecutor {
}
}

/// This is experimental CREATE STAGE support
/// Current limitations
/// TODO
/// - Prepare object storage depending on the URL. Currently we support only s3 public buckets /// with public access with default eu-central-1 region
/// - Parse credentials from specified config
/// - We don't need to create table in case we have common shared session context.
/// CSV is registered as a table which can referenced from SQL statements executed against this context
pub async fn create_stage_query(
&self,
statement: Statement,
warehouse_name: &str,
) -> IcehutSQLResult<Vec<RecordBatch>> {
if let Statement::CreateStage {
name,
stage_params,
file_format,
..
} = statement
{
let table_name = name
.0
.last()
.ok_or_else(|| IcehutSQLError::InvalidIdentifier {
ident: name.to_string(),
})?
.clone();

let skip_header = file_format.options.iter().any(|option| {
option.option_name.eq_ignore_ascii_case("skip_header")
&& option.value.eq_ignore_ascii_case("true")
});

let field_optionally_enclosed_by = file_format
.options
.iter()
.find_map(|option| {
if option
.option_name
.eq_ignore_ascii_case("field_optionally_enclosed_by")
{
Some(option.value.as_bytes()[0])
} else {
None
}
})
.unwrap_or(b'"');

let file_path = stage_params.url.unwrap_or_default();
let stage_table_name = format!("stage_{table_name}");
let url =
Url::parse(file_path.as_str()).map_err(|_| IcehutSQLError::InvalidIdentifier {
ident: file_path.clone(),
})?;
let bucket = url.host_str().unwrap_or_default();
// TODO Prepare object storage depending on the URL
let s3 = AmazonS3Builder::from_env()
// TODO Get region automatically
.with_region("eu-central-1")
.with_bucket_name(bucket)
.build()
.map_err(|_| IcehutSQLError::InvalidIdentifier {
ident: bucket.to_string(),
})?;

self.ctx.register_object_store(&url, Arc::new(s3));

// Read CSV file to get default schema
let csv_data = self
.ctx
.read_csv(
file_path.clone(),
CsvReadOptions::new()
.has_header(skip_header)
.quote(field_optionally_enclosed_by),
)
.await
.context(ih_error::DataFusionSnafu)?;

let fields = csv_data
.schema()
.iter()
.map(|(_, field)| {
let data_type = if matches!(field.data_type(), DataType::Null) {
DataType::Utf8
} else {
field.data_type().clone()
};
Field::new(field.name(), data_type, field.is_nullable())
})
.collect::<Vec<_>>();

// Register CSV file with filled missing datatype with default Utf8
self.ctx
.register_csv(
stage_table_name.clone(),
file_path,
CsvReadOptions::new()
.has_header(skip_header)
.quote(field_optionally_enclosed_by)
.schema(&ArrowSchema::new(fields)),
)
.await
.context(ih_error::DataFusionSnafu)?;

// Create stages database and create table with prepared schema
// TODO Don't create table in case we have common ctx
self.create_database(warehouse_name, ObjectName(vec![Ident::new("stages")]), true)
.await?;
let create_query = format!("CREATE TABLE {warehouse_name}.stages.{table_name} AS (SELECT * FROM {stage_table_name})");
self.query(&create_query, warehouse_name, "").await
} else {
Err(IcehutSQLError::DataFusion {
source: DataFusionError::NotImplemented(
"Only CREATE STAGE statements are supported".to_string(),
),
})
}
}

pub async fn copy_into_snowflake_query(
&self,
statement: Statement,
warehouse_name: &str,
) -> IcehutSQLResult<Vec<RecordBatch>> {
if let Statement::CopyIntoSnowflake {
into, from_stage, ..
} = statement
{
// Insert data to table
let from_query = from_stage.to_string().replace('@', "");
let insert_query =
format!("INSERT INTO {into} SELECT * FROM {warehouse_name}.stages.{from_query}");
self.execute_with_custom_plan(&insert_query, warehouse_name)
.await
} else {
Err(IcehutSQLError::DataFusion {
source: DataFusionError::NotImplemented(
"Only COPY INTO statements are supported".to_string(),
),
})
}
}

#[tracing::instrument(level = "trace", skip(self), err, ret)]
pub async fn merge_query(
&self,
Expand Down Expand Up @@ -699,8 +849,8 @@ impl SqlExecutor {
"table_name as 'name'",
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
"null as 'comment'",
"case when table_type='BASE TABLE' then 'Y' else 'N' end as is_iceberg",
"'N' as 'is_dynamic'",
"case when table_type='BASE TABLE' then True else False end as is_iceberg",
].join(", ");
let information_schema_query =
format!("SELECT {columns} FROM information_schema.tables");
Expand Down Expand Up @@ -757,7 +907,7 @@ impl SqlExecutor {
#[allow(clippy::too_many_lines)]
pub fn get_table_path(&self, statement: &DFStatement) -> TablePath {
let table_path = |arr: &Vec<Ident>| -> TablePath {
let empty = || String::new();
let empty = String::new;
match arr.len() {
1 => TablePath {
db: empty(),
Expand Down
Loading