Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/catalog/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub struct DatabaseIdent {

impl Display for DatabaseIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.warehouse, self.namespace.to_url_string())
write!(f, "{}.{}", self.warehouse, self.namespace.join("."))
}
}

Expand Down
17 changes: 2 additions & 15 deletions crates/control_plane/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use arrow::array::{RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field};
use chrono::{NaiveDateTime, Utc};
use dotenv::dotenv;
use iceberg_rust::object_store::ObjectStoreBuilder;
Expand Down Expand Up @@ -500,16 +500,3 @@ impl ColumnInfo {
column_info
}
}

pub fn created_entity_response() -> std::result::Result<Vec<RecordBatch>, arrow::error::ArrowError>
{
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"count",
DataType::UInt64,
false,
)]));
Ok(vec![RecordBatch::try_new(
schema,
vec![Arc::new(UInt64Array::from(vec![0]))],
)?])
}
5 changes: 5 additions & 0 deletions crates/runtime/src/datafusion/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ pub enum IcehutSQLError {
#[snafu(display("Iceberg error: {source}"))]
Iceberg { source: iceberg_rust::error::Error },

#[snafu(display("Iceberg spec error: {source}"))]
IcebergSpec {
source: iceberg_rust::spec::error::Error,
},

#[snafu(display("Invalid precision: {precision}"))]
InvalidPrecision { precision: String },

Expand Down
94 changes: 56 additions & 38 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::missing_panics_doc)]

use super::error::{self as ih_error, IcehutSQLResult};
use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult};
use crate::datafusion::context::CustomContextProvider;
use crate::datafusion::functions::register_udfs;
use crate::datafusion::planner::ExtendedSqlToRel;
Expand Down Expand Up @@ -76,8 +76,22 @@ impl SqlExecutor {
))
.await;
}
Statement::CreateSchema { schema_name, .. } => {
return self.create_schema(schema_name, warehouse_name).await;
Statement::CreateDatabase {
db_name,
if_not_exists,
..
} => {
return self
.create_database(warehouse_name, db_name, if_not_exists)
.await;
}
Statement::CreateSchema {
schema_name,
if_not_exists,
} => {
return self
.create_schema(warehouse_name, schema_name, if_not_exists)
.await;
}
Statement::AlterTable { .. }
| Statement::StartTransaction { .. }
Expand Down Expand Up @@ -395,42 +409,14 @@ impl SqlExecutor {
#[tracing::instrument(level = "trace", skip(self), err, ret)]
pub async fn create_schema(
&self,
name: SchemaName,
warehouse_name: &str,
name: SchemaName,
if_not_exists: bool,
) -> IcehutSQLResult<Vec<RecordBatch>> {
match name {
SchemaName::Simple(schema_name) => {
//println!("Creating simple schema: {:?}", schema_name);
// TODO: Abstract the Iceberg catalog
let catalog = self.ctx.catalog(warehouse_name).ok_or(
ih_error::IcehutSQLError::WarehouseNotFound {
name: warehouse_name.to_string(),
},
)?;
let iceberg_catalog = catalog.as_any().downcast_ref::<IcebergCatalog>().ok_or(
ih_error::IcehutSQLError::IcebergCatalogNotFound {
warehouse_name: warehouse_name.to_string(),
},
)?;
let rest_catalog = iceberg_catalog.catalog();
let namespace_vec: Vec<String> = schema_name
.0
.iter()
.map(|ident| ident.value.clone())
.collect();
let single_layer_namespace = vec![namespace_vec.join(".")];
#[allow(clippy::unwrap_used)]
let namespace = Namespace::try_new(&single_layer_namespace).unwrap();
// Why are we checking if namespace exists as a single layer and then creating it?
// There are multiple possible Error scenarios here that are not handled
if rest_catalog.load_namespace(&namespace).await.is_err() {
#[allow(clippy::unwrap_used)]
let namespace = Namespace::try_new(&namespace_vec).unwrap();
rest_catalog
.create_namespace(&namespace, None)
.await
.context(ih_error::IcebergSnafu)?;
}
self.create_database(warehouse_name, schema_name.clone(), if_not_exists)
.await?;
}
_ => {
return Err(super::error::IcehutSQLError::DataFusion {
Expand All @@ -443,6 +429,39 @@ impl SqlExecutor {
created_entity_response().context(super::error::ArrowSnafu)
}

pub async fn create_database(
&self,
warehouse_name: &str,
name: ObjectName,
_if_not_exists: bool,
) -> IcehutSQLResult<Vec<RecordBatch>> {
// TODO: Abstract the Iceberg catalog
let catalog = self.ctx.catalog(warehouse_name).ok_or(
ih_error::IcehutSQLError::WarehouseNotFound {
name: warehouse_name.to_string(),
},
)?;
let iceberg_catalog = catalog.as_any().downcast_ref::<IcebergCatalog>().ok_or(
ih_error::IcehutSQLError::IcebergCatalogNotFound {
warehouse_name: warehouse_name.to_string(),
},
)?;
let rest_catalog = iceberg_catalog.catalog();
let namespace_vec: Vec<String> = name.0.iter().map(|ident| ident.value.clone()).collect();
let single_layer_namespace = vec![namespace_vec.join(".")];

let namespace =
Namespace::try_new(&single_layer_namespace).context(ih_error::IcebergSpecSnafu)?;
let exists = rest_catalog.load_namespace(&namespace).await.is_ok();
if !exists {
rest_catalog
.create_namespace(&namespace, None)
.await
.context(ih_error::IcebergSnafu)?;
}
created_entity_response().context(super::error::ArrowSnafu)
}

#[tracing::instrument(level = "trace", skip(self), err, ret)]
pub async fn get_custom_logical_plan(
&self,
Expand Down Expand Up @@ -493,7 +512,7 @@ impl SqlExecutor {
.table(&table)
.await
.context(super::error::DataFusionSnafu)?
.ok_or(super::error::IcehutSQLError::TableProviderNotFound {
.ok_or(IcehutSQLError::TableProviderNotFound {
table_name: table.clone(),
})?;
ctx_provider.tables.insert(
Expand Down Expand Up @@ -888,8 +907,7 @@ impl SqlExecutor {
}
}

pub fn created_entity_response() -> std::result::Result<Vec<RecordBatch>, arrow::error::ArrowError>
{
pub fn created_entity_response() -> Result<Vec<RecordBatch>, arrow::error::ArrowError> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"count",
DataType::UInt64,
Expand Down
Loading