Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions crates/catalog/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use object_store::path::Path;
use async_trait::async_trait;
use object_store::CredentialProvider;
use object_store::{CredentialProvider, ObjectStore, PutPayload};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use bytes::Bytes;
use control_plane::models::Warehouse;
use iceberg::{spec::TableMetadataBuilder, TableCreation};

use object_store::local::LocalFileSystem;
use tokio::fs;
use uuid::Uuid;
use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result
use crate::models::{
Config, Database, DatabaseIdent, Table, TableCommit, TableIdent, TableRequirementExt,
Expand Down Expand Up @@ -108,14 +111,17 @@ impl Catalog for CatalogImpl {
.map(TableRequirementExt::new)
.try_for_each(|req| req.assert(&table.metadata, true))?;

// TODO rewrite metadata file? need to research when metadata rewrite is needed
// Currently the metadata file is only written once - during table creation

let mut builder =
TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location));
TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location.clone()));

for update in commit.updates {
builder = update.apply(builder)?;
}
let result = builder.build()?;
let metadata_location = result.metadata.location.clone();
let metadata_location = table.metadata_location.clone();
let metadata = result.metadata;

let table: Table = Table {
Expand Down Expand Up @@ -234,28 +240,40 @@ impl Catalog for CatalogImpl {
// Take into account namespace location property if present
// Take into account provided location if present
// If none, generate location based on warehouse location
let table_location = format!("{}/{}", warehouse.location, creation.name);
let creation = {
let mut creation = creation;
creation.location = Some(format!("{}/{}", warehouse.location, creation.name));
creation.location = Some(table_location.clone());
creation
};
// TODO: Add checks
// - Check if storage profile is valid (writtable)

let name = creation.name.to_string();
let result = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let location = result.metadata.location.clone();
let metadata = result.metadata.clone();
let metadata_file_id = Uuid::new_v4().to_string();
let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json");
// TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least)
let metadata_full_location = format!("file://object_store/{metadata_relative_location}");

let table = Table {
metadata: result.metadata,
metadata_location: location,
metadata: metadata.clone(),
metadata_location: metadata_full_location,
ident: TableIdent {
database: namespace.clone(),
table: name.clone(),
},
};
self.table_repo.put(&table).await?;
// TODO: Write metadata contents to metadata_location

let local_dir = "object_store";
fs::create_dir_all(local_dir).await.unwrap();
let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store");
let path = Path::from(metadata_relative_location);
let json_data = serde_json::to_string(&table.metadata).unwrap();
let content = Bytes::from(json_data);
store.put(&path, PutPayload::from(content)).await.expect("Failed to write file");

Ok(table)
}
Expand Down
13 changes: 9 additions & 4 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use uuid::Uuid;
use datafusion::prelude::*;
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::IcebergCatalogProvider;
use datafusion::catalog_common::CatalogProvider;
use std::collections::HashMap;

#[async_trait]
Expand Down Expand Up @@ -110,10 +111,14 @@ impl ControlService for ControlServiceImpl {

let catalog = RestCatalog::new(config);

// TODO need manifest file written before the code below works
// let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog))
// .await
// .unwrap();
let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog))
.await
.unwrap();

// Test that catalog loaded successfully
println!("SCHEMAS: {:?}", catalog.schema_names());

// TODO rest of the query code

Ok(("OK"))
}
Expand Down