diff --git a/crates/catalog/src/models.rs b/crates/catalog/src/models.rs index bc1e3c478..b5422c13d 100644 --- a/crates/catalog/src/models.rs +++ b/crates/catalog/src/models.rs @@ -193,7 +193,7 @@ pub struct DatabaseIdent { impl Display for DatabaseIdent { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}.{}", self.warehouse, self.namespace.to_url_string()) + write!(f, "{}.{}", self.warehouse, self.namespace.join(".")) } } diff --git a/crates/control_plane/src/models/mod.rs b/crates/control_plane/src/models/mod.rs index ca6fb95c4..991b15aa7 100644 --- a/crates/control_plane/src/models/mod.rs +++ b/crates/control_plane/src/models/mod.rs @@ -1,5 +1,5 @@ -use arrow::array::{RecordBatch, UInt64Array}; -use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field}; use chrono::{NaiveDateTime, Utc}; use dotenv::dotenv; use iceberg_rust::object_store::ObjectStoreBuilder; @@ -500,16 +500,3 @@ impl ColumnInfo { column_info } } - -pub fn created_entity_response() -> std::result::Result, arrow::error::ArrowError> -{ - let schema = Arc::new(ArrowSchema::new(vec![Field::new( - "count", - DataType::UInt64, - false, - )])); - Ok(vec![RecordBatch::try_new( - schema, - vec![Arc::new(UInt64Array::from(vec![0]))], - )?]) -} diff --git a/crates/runtime/src/datafusion/error.rs b/crates/runtime/src/datafusion/error.rs index c79609b11..49f857ed5 100644 --- a/crates/runtime/src/datafusion/error.rs +++ b/crates/runtime/src/datafusion/error.rs @@ -29,6 +29,11 @@ pub enum IcehutSQLError { #[snafu(display("Iceberg error: {source}"))] Iceberg { source: iceberg_rust::error::Error }, + #[snafu(display("Iceberg spec error: {source}"))] + IcebergSpec { + source: iceberg_rust::spec::error::Error, + }, + #[snafu(display("Invalid precision: {precision}"))] InvalidPrecision { precision: String }, diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 77ffe9a3d..14fb850e8 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -1,7 +1,7 @@ #![allow(clippy::missing_errors_doc)] #![allow(clippy::missing_panics_doc)] -use super::error::{self as ih_error, IcehutSQLResult}; +use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult}; use crate::datafusion::context::CustomContextProvider; use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; @@ -76,8 +76,22 @@ impl SqlExecutor { )) .await; } - Statement::CreateSchema { schema_name, .. } => { - return self.create_schema(schema_name, warehouse_name).await; + Statement::CreateDatabase { + db_name, + if_not_exists, + .. + } => { + return self + .create_database(warehouse_name, db_name, if_not_exists) + .await; + } + Statement::CreateSchema { + schema_name, + if_not_exists, + } => { + return self + .create_schema(warehouse_name, schema_name, if_not_exists) + .await; } Statement::AlterTable { .. } | Statement::StartTransaction { .. } @@ -395,42 +409,14 @@ impl SqlExecutor { #[tracing::instrument(level = "trace", skip(self), err, ret)] pub async fn create_schema( &self, - name: SchemaName, warehouse_name: &str, + name: SchemaName, + if_not_exists: bool, ) -> IcehutSQLResult> { match name { SchemaName::Simple(schema_name) => { - //println!("Creating simple schema: {:?}", schema_name); - // TODO: Abstract the Iceberg catalog - let catalog = self.ctx.catalog(warehouse_name).ok_or( - ih_error::IcehutSQLError::WarehouseNotFound { - name: warehouse_name.to_string(), - }, - )?; - let iceberg_catalog = catalog.as_any().downcast_ref::().ok_or( - ih_error::IcehutSQLError::IcebergCatalogNotFound { - warehouse_name: warehouse_name.to_string(), - }, - )?; - let rest_catalog = iceberg_catalog.catalog(); - let namespace_vec: Vec = schema_name - .0 - .iter() - .map(|ident| ident.value.clone()) - .collect(); - let single_layer_namespace = vec![namespace_vec.join(".")]; - #[allow(clippy::unwrap_used)] - let namespace = Namespace::try_new(&single_layer_namespace).unwrap(); - // Why are we checking if namespace exists as a single layer and then creating it? - // There are multiple possible Error scenarios here that are not handled - if rest_catalog.load_namespace(&namespace).await.is_err() { - #[allow(clippy::unwrap_used)] - let namespace = Namespace::try_new(&namespace_vec).unwrap(); - rest_catalog - .create_namespace(&namespace, None) - .await - .context(ih_error::IcebergSnafu)?; - } + self.create_database(warehouse_name, schema_name.clone(), if_not_exists) + .await?; } _ => { return Err(super::error::IcehutSQLError::DataFusion { @@ -443,6 +429,39 @@ impl SqlExecutor { created_entity_response().context(super::error::ArrowSnafu) } + pub async fn create_database( + &self, + warehouse_name: &str, + name: ObjectName, + _if_not_exists: bool, + ) -> IcehutSQLResult> { + // TODO: Abstract the Iceberg catalog + let catalog = self.ctx.catalog(warehouse_name).ok_or( + ih_error::IcehutSQLError::WarehouseNotFound { + name: warehouse_name.to_string(), + }, + )?; + let iceberg_catalog = catalog.as_any().downcast_ref::().ok_or( + ih_error::IcehutSQLError::IcebergCatalogNotFound { + warehouse_name: warehouse_name.to_string(), + }, + )?; + let rest_catalog = iceberg_catalog.catalog(); + let namespace_vec: Vec = name.0.iter().map(|ident| ident.value.clone()).collect(); + let single_layer_namespace = vec![namespace_vec.join(".")]; + + let namespace = + Namespace::try_new(&single_layer_namespace).context(ih_error::IcebergSpecSnafu)?; + let exists = rest_catalog.load_namespace(&namespace).await.is_ok(); + if !exists { + rest_catalog + .create_namespace(&namespace, None) + .await + .context(ih_error::IcebergSnafu)?; + } + created_entity_response().context(super::error::ArrowSnafu) + } + #[tracing::instrument(level = "trace", skip(self), err, ret)] pub async fn get_custom_logical_plan( &self, @@ -493,7 +512,7 @@ impl SqlExecutor { .table(&table) .await .context(super::error::DataFusionSnafu)? - .ok_or(super::error::IcehutSQLError::TableProviderNotFound { + .ok_or(IcehutSQLError::TableProviderNotFound { table_name: table.clone(), })?; ctx_provider.tables.insert( @@ -888,8 +907,7 @@ impl SqlExecutor { } } -pub fn created_entity_response() -> std::result::Result, arrow::error::ArrowError> -{ +pub fn created_entity_response() -> Result, arrow::error::ArrowError> { let schema = Arc::new(ArrowSchema::new(vec![Field::new( "count", DataType::UInt64,