diff --git a/Cargo.toml b/Cargo.toml index dc812d595..5316a558c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ async-trait = { version = "0.1" } serde = { version = "1.0", features = ["derive"] } slatedb = { version = "*" } bytes = { version = "1" } -object_store = { version = "0.11" } +object_store = { version = "0.10.1" } serde_json = "1.0" serde_yaml = "0.8" tower = { version = "0.4", features = ["util"] } diff --git a/crates/catalog/Cargo.toml b/crates/catalog/Cargo.toml index 8c88dc498..ae639c58a 100644 --- a/crates/catalog/Cargo.toml +++ b/crates/catalog/Cargo.toml @@ -14,7 +14,6 @@ futures = { workspace = true } slatedb = { workspace = true } bytes = { workspace = true } tokio = { workspace = true } -object_store = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } uuid = { workspace = true } diff --git a/crates/catalog/src/service.rs b/crates/catalog/src/service.rs index e8651302e..3057dbfe1 100644 --- a/crates/catalog/src/service.rs +++ b/crates/catalog/src/service.rs @@ -1,14 +1,9 @@ -use object_store::path::Path; use async_trait::async_trait; -use object_store::{CredentialProvider, ObjectStore, PutPayload}; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; -use bytes::Bytes; use control_plane::models::Warehouse; use iceberg::{spec::TableMetadataBuilder, TableCreation}; -use object_store::local::LocalFileSystem; -use tokio::fs; use uuid::Uuid; use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result use crate::models::{ @@ -222,14 +217,14 @@ impl Catalog for CatalogImpl { &self, namespace: &DatabaseIdent, warehouse: &Warehouse, - creation: TableCreation, + table_creation: TableCreation, ) -> Result { // Check if namespace exists _ = self.get_namespace(namespace).await?; // Check if table exists let ident = TableIdent { database: namespace.clone(), - table: creation.name.clone(), + table: table_creation.name.clone(), }; let res = self.load_table(&ident).await; if res.is_ok() { @@ -240,26 +235,26 @@ impl Catalog for CatalogImpl { // Take into account namespace location property if present // Take into account provided location if present // If none, generate location based on warehouse location - let table_location = format!("{}/{}", warehouse.location, creation.name); + // un-hardcode "file://" and make it dynamic - filesystem or s3 (at least) + let working_dir_abs_path = std::env::current_dir().unwrap().to_str().unwrap().to_string(); + let table_location = format!("file://{}/{}/{}", working_dir_abs_path, warehouse.location, table_creation.name); let creation = { - let mut creation = creation; + let mut creation = table_creation; creation.location = Some(table_location.clone()); creation }; // TODO: Add checks - // - Check if storage profile is valid (writtable) + // - Check if storage profile is valid (writable) let name = creation.name.to_string(); let result = TableMetadataBuilder::from_table_creation(creation)?.build()?; let metadata = result.metadata.clone(); let metadata_file_id = Uuid::new_v4().to_string(); - let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json"); - // TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least) - let metadata_full_location = format!("file://object_store/{metadata_relative_location}"); + let metadata_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json"); let table = Table { metadata: metadata.clone(), - metadata_location: metadata_full_location, + metadata_location: metadata_location.clone(), ident: TableIdent { database: namespace.clone(), table: name.clone(), @@ -267,13 +262,17 @@ impl Catalog for CatalogImpl { }; self.table_repo.put(&table).await?; - let local_dir = "object_store"; - fs::create_dir_all(local_dir).await.unwrap(); - let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store"); - let path = Path::from(metadata_relative_location); - let json_data = serde_json::to_string(&table.metadata).unwrap(); - let content = Bytes::from(json_data); - store.put(&path, PutPayload::from(content)).await.expect("Failed to write file"); + let file_io = iceberg::io::FileIOBuilder::new("file").build()?; + let metadata_file = file_io + .new_output(metadata_location)?; + let mut writer = metadata_file + .writer() + .await?; + let buf = serde_json::to_vec(&table.metadata).unwrap(); + writer + .write(buf.into()) + .await?; + writer.close().await?; Ok(table) } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index 5e4943e3a..d2d49a365 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -12,11 +12,13 @@ thiserror = { workspace = true } utils = { path = "../utils" } futures = { workspace = true } serde = { workspace = true } -datafusion = { version = "41" } -iceberg-catalog-rest = { version = "0.3" } -iceberg-datafusion = { version = "0.3" } +datafusion = { version = "40" } +iceberg-rust = { version = "0.5.8" } +iceberg-rest-catalog = { version = "0.5.8" } +datafusion_iceberg = { version = "0.5.8" } arrow = { version = "52" } +arrow-json = { version = "52" } +object_store = { workspace = true } [dev-dependencies] slatedb = {workspace = true } -object_store = { workspace = true } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index 6412503b4..e4798a4c1 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -3,13 +3,15 @@ use crate::models::{StorageProfile, StorageProfileCreateRequest}; use crate::models::{Warehouse, WarehouseCreateRequest}; use crate::repository::{StorageProfileRepository, WarehouseRepository}; use async_trait::async_trait; -use datafusion::catalog_common::CatalogProvider; -use datafusion::prelude::*; -use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; -use iceberg_datafusion::IcebergCatalogProvider; -use std::collections::HashMap; use std::sync::Arc; use uuid::Uuid; +use datafusion::prelude::*; +use iceberg_rest_catalog::apis::configuration::Configuration; +use iceberg_rust::catalog::bucket::ObjectStoreBuilder; +use datafusion_iceberg::catalog::catalog::IcebergCatalog; +use iceberg_rest_catalog::catalog::RestCatalog; +use arrow::record_batch::RecordBatch; +use object_store::local::LocalFileSystem; #[async_trait] pub trait ControlService: Send + Sync { @@ -33,7 +35,8 @@ pub trait ControlService: Send + Sync { // async fn delete_table(&self, id: Uuid) -> Result<()>; // async fn list_tables(&self) -> Result>; - async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)>; + async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result< + (String)>; } pub struct ControlServiceImpl { @@ -102,25 +105,58 @@ impl ControlService for ControlServiceImpl { self.warehouse_repo.list().await } - async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)> { - let config = RestCatalogConfig::builder() - .uri("http://0.0.0.0:3000/catalog".to_string()) - .warehouse(warehouse_id.to_string()) - .props(HashMap::default()) - .build(); + async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result< + (String)> { + let config = { + let mut config = Configuration::new(); + config.base_path = "http://0.0.0.0:3000/catalog".to_string(); + config + }; + let builder = { + Arc::new(LocalFileSystem::new()) + }; + let rest_client = RestCatalog::new( + Some(warehouse_id.to_string().as_str()), + config, + ObjectStoreBuilder::Filesystem(builder), + ); + let catalog = IcebergCatalog::new(Arc::new(rest_client), None) + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", Arc::new(catalog)); + + let provider = ctx.catalog("catalog").unwrap(); + let schemas = provider.schema_names(); + println!("{schemas:?}"); - let catalog = RestCatalog::new(config); + let tables = provider.schema(database_name).unwrap().table_names(); + println!("{tables:?}"); - let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) + println!("{}", query); + let records = ctx + .sql(query) + .await + .unwrap() + .collect() .await .unwrap(); + println!("{records:?}"); + + let df = ctx.sql(query,).await.unwrap(); + df.show().await.unwrap(); - // Test that catalog loaded successfully - println!("SCHEMAS: {:?}", catalog.schema_names()); + let buf = Vec::new(); + let mut writer = arrow_json::ArrayWriter::new(buf); + let record_refs: Vec<&RecordBatch> = records.iter().collect(); + writer.write_batches(&record_refs).unwrap(); + writer.finish().unwrap(); - // TODO rest of the query code + // Get the underlying buffer back, + let buf = writer.into_inner(); - Ok(("OK")) + Ok((String::from_utf8(buf).unwrap())) } } diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index 11d00f74f..3c208751d 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -20,6 +20,7 @@ serde = { workspace = true } iceberg = { workspace = true } slatedb = { workspace = true } object_store = { workspace = true } +object_store_for_slatedb = { package = "object_store", version = "0.11.1" } utils = { path = "../utils" } utoipa = { workspace = true } utoipa-axum = { workspace = true } diff --git a/crates/nexus/src/http/catalog/handlers.rs b/crates/nexus/src/http/catalog/handlers.rs index 978bbb664..112d4a142 100644 --- a/crates/nexus/src/http/catalog/handlers.rs +++ b/crates/nexus/src/http/catalog/handlers.rs @@ -182,3 +182,15 @@ pub async fn get_config( Ok(Json(config.into())) } + +// only one endpoint is defined for the catalog implementation to work +// we don't actually have functionality for views yet +pub async fn list_views( + State(state): State, + Path((id, namespace_id)): Path<(Uuid, String)>, +) -> Result, AppError> { + + Ok(Json(schemas::TableListResponse { + identifiers: vec![], + })) +} diff --git a/crates/nexus/src/http/catalog/router.rs b/crates/nexus/src/http/catalog/router.rs index deb5b575b..17448668c 100644 --- a/crates/nexus/src/http/catalog/router.rs +++ b/crates/nexus/src/http/catalog/router.rs @@ -4,7 +4,7 @@ use axum::Router; use crate::http::catalog::handlers::{ commit_table, create_namespace, create_table, delete_namespace, delete_table, get_config, - get_namespace, get_table, list_namespaces, list_tables, + get_namespace, get_table, list_namespaces, list_tables, list_views }; pub fn create_router() -> Router { @@ -15,12 +15,18 @@ pub fn create_router() -> Router { .route("/:table", delete(delete_table)) .route("/:table", post(commit_table)); + // only one endpoint is defined for the catalog implementation to work + // we don't actually have functionality for views yet + let view_router: Router = Router::new() + .route("/", get(list_views)); + let ns_router = Router::new() .route("/", get(list_namespaces)) .route("/", post(create_namespace)) .route("/:namespace", get(get_namespace)) .route("/:namespace", delete(delete_namespace)) - .nest("/:namespace/tables", table_router); + .nest("/:namespace/tables", table_router) + .nest("/:namespace/views", view_router); // Iceberg clients do not prefix config fetch RPC call // and do prefix (with whatever prefix returned by config fetch) all other RPC calls diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index ce37a5c50..fa9aac6ce 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -165,7 +165,7 @@ pub async fn query_table( let request: TableQueryRequest = payload.into(); let result = state .control_svc - .query_table(&warehouse_id, &request.query) + .query_table(&warehouse_id, &database_name, &table_name, &request.query) .await?; Ok(Json(TableQueryResponse { id: Default::default(), diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index 0706d6a92..e26c41490 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -2,7 +2,7 @@ use catalog::repository::{DatabaseRepositoryDb, TableRepositoryDb}; use catalog::service::CatalogImpl; use control_plane::repository::{StorageProfileRepositoryDb, WarehouseRepositoryDb}; use control_plane::service::ControlServiceImpl; -use object_store::{memory::InMemory, path::Path, ObjectStore}; +use object_store_for_slatedb::{memory::InMemory, path::Path, ObjectStore}; use slatedb::config::DbOptions; use slatedb::db::Db as SlateDb; use std::sync::Arc; diff --git a/tests/conftest.py b/tests/conftest.py index c304d2597..ba1a3fc1d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -118,10 +118,10 @@ def storage_profile(server: Server) -> dict: @pytest.fixture(scope="session") def warehouse(server: Server, storage_profile) -> dict: warehouse_name = "test-warehouse" - preix = "prefix" + prefix = "prefix" wh = server.create_warehouse( name=warehouse_name, - prefix=preix, + prefix=prefix, storage_profile_id=storage_profile.get("id", None), ) yield wh diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 793f62915..973d97b27 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,4 +1,4 @@ -import conftest +import tests.conftest import pandas as pd import pyarrow as pa import pytest