diff --git a/crates/catalog/src/service.rs b/crates/catalog/src/service.rs index 3c6498564..e8651302e 100644 --- a/crates/catalog/src/service.rs +++ b/crates/catalog/src/service.rs @@ -1,12 +1,15 @@ +use object_store::path::Path; use async_trait::async_trait; -use object_store::CredentialProvider; +use object_store::{CredentialProvider, ObjectStore, PutPayload}; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; - +use bytes::Bytes; use control_plane::models::Warehouse; use iceberg::{spec::TableMetadataBuilder, TableCreation}; - +use object_store::local::LocalFileSystem; +use tokio::fs; +use uuid::Uuid; use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result use crate::models::{ Config, Database, DatabaseIdent, Table, TableCommit, TableIdent, TableRequirementExt, @@ -108,14 +111,17 @@ impl Catalog for CatalogImpl { .map(TableRequirementExt::new) .try_for_each(|req| req.assert(&table.metadata, true))?; + // TODO rewrite metadata file? need to research when metadata rewrite is needed + // Currently the metadata file is only written once - during table creation + let mut builder = - TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location)); + TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location.clone())); for update in commit.updates { builder = update.apply(builder)?; } let result = builder.build()?; - let metadata_location = result.metadata.location.clone(); + let metadata_location = table.metadata_location.clone(); let metadata = result.metadata; let table: Table = Table { @@ -234,9 +240,10 @@ impl Catalog for CatalogImpl { // Take into account namespace location property if present // Take into account provided location if present // If none, generate location based on warehouse location + let table_location = format!("{}/{}", warehouse.location, creation.name); let creation = { let mut creation = creation; - creation.location = Some(format!("{}/{}", warehouse.location, creation.name)); + creation.location = Some(table_location.clone()); creation }; // TODO: Add checks @@ -244,18 +251,29 @@ impl Catalog for CatalogImpl { let name = creation.name.to_string(); let result = TableMetadataBuilder::from_table_creation(creation)?.build()?; - let location = result.metadata.location.clone(); + let metadata = result.metadata.clone(); + let metadata_file_id = Uuid::new_v4().to_string(); + let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json"); + // TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least) + let metadata_full_location = format!("file://object_store/{metadata_relative_location}"); let table = Table { - metadata: result.metadata, - metadata_location: location, + metadata: metadata.clone(), + metadata_location: metadata_full_location, ident: TableIdent { database: namespace.clone(), table: name.clone(), }, }; self.table_repo.put(&table).await?; - // TODO: Write metadata contents to metadata_location + + let local_dir = "object_store"; + fs::create_dir_all(local_dir).await.unwrap(); + let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store"); + let path = Path::from(metadata_relative_location); + let json_data = serde_json::to_string(&table.metadata).unwrap(); + let content = Bytes::from(json_data); + store.put(&path, PutPayload::from(content)).await.expect("Failed to write file"); Ok(table) } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index f7b1e52f5..c8dc02cb2 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -8,6 +8,7 @@ use uuid::Uuid; use datafusion::prelude::*; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; +use datafusion::catalog_common::CatalogProvider; use std::collections::HashMap; #[async_trait] @@ -110,10 +111,14 @@ impl ControlService for ControlServiceImpl { let catalog = RestCatalog::new(config); - // TODO need manifest file written before the code below works - // let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) - // .await - // .unwrap(); + let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) + .await + .unwrap(); + + // Test that catalog loaded successfully + println!("SCHEMAS: {:?}", catalog.schema_names()); + + // TODO rest of the query code Ok(("OK")) }