Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async-trait = { version = "0.1" }
serde = { version = "1.0", features = ["derive"] }
slatedb = { version = "*" }
bytes = { version = "1" }
object_store = { version = "0.11" }
object_store = { version = "0.10.1" }
serde_json = "1.0"
serde_yaml = "0.8"
tower = { version = "0.4", features = ["util"] }
Expand Down
1 change: 0 additions & 1 deletion crates/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ futures = { workspace = true }
slatedb = { workspace = true }
bytes = { workspace = true }
tokio = { workspace = true }
object_store = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
uuid = { workspace = true }
Expand Down
41 changes: 20 additions & 21 deletions crates/catalog/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use object_store::path::Path;
use async_trait::async_trait;
use object_store::{CredentialProvider, ObjectStore, PutPayload};
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use bytes::Bytes;
use control_plane::models::Warehouse;
use iceberg::{spec::TableMetadataBuilder, TableCreation};
use object_store::local::LocalFileSystem;
use tokio::fs;
use uuid::Uuid;
use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result
use crate::models::{
Expand Down Expand Up @@ -222,14 +217,14 @@ impl Catalog for CatalogImpl {
&self,
namespace: &DatabaseIdent,
warehouse: &Warehouse,
creation: TableCreation,
table_creation: TableCreation,
) -> Result<Table> {
// Check if namespace exists
_ = self.get_namespace(namespace).await?;
// Check if table exists
let ident = TableIdent {
database: namespace.clone(),
table: creation.name.clone(),
table: table_creation.name.clone(),
};
let res = self.load_table(&ident).await;
if res.is_ok() {
Expand All @@ -240,40 +235,44 @@ impl Catalog for CatalogImpl {
// Take into account namespace location property if present
// Take into account provided location if present
// If none, generate location based on warehouse location
let table_location = format!("{}/{}", warehouse.location, creation.name);
// un-hardcode "file://" and make it dynamic - filesystem or s3 (at least)
let working_dir_abs_path = std::env::current_dir().unwrap().to_str().unwrap().to_string();
let table_location = format!("file://{}/{}/{}", working_dir_abs_path, warehouse.location, table_creation.name);
let creation = {
let mut creation = creation;
let mut creation = table_creation;
creation.location = Some(table_location.clone());
creation
};
// TODO: Add checks
// - Check if storage profile is valid (writtable)
// - Check if storage profile is valid (writable)

let name = creation.name.to_string();
let result = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = result.metadata.clone();
let metadata_file_id = Uuid::new_v4().to_string();
let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json");
// TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least)
let metadata_full_location = format!("file://object_store/{metadata_relative_location}");
let metadata_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json");

let table = Table {
metadata: metadata.clone(),
metadata_location: metadata_full_location,
metadata_location: metadata_location.clone(),
ident: TableIdent {
database: namespace.clone(),
table: name.clone(),
},
};
self.table_repo.put(&table).await?;

let local_dir = "object_store";
fs::create_dir_all(local_dir).await.unwrap();
let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store");
let path = Path::from(metadata_relative_location);
let json_data = serde_json::to_string(&table.metadata).unwrap();
let content = Bytes::from(json_data);
store.put(&path, PutPayload::from(content)).await.expect("Failed to write file");
let file_io = iceberg::io::FileIOBuilder::new("file").build()?;
let metadata_file = file_io
.new_output(metadata_location)?;
let mut writer = metadata_file
.writer()
.await?;
let buf = serde_json::to_vec(&table.metadata).unwrap();
writer
.write(buf.into())
.await?;
writer.close().await?;

Ok(table)
}
Expand Down
10 changes: 6 additions & 4 deletions crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ thiserror = { workspace = true }
utils = { path = "../utils" }
futures = { workspace = true }
serde = { workspace = true }
datafusion = { version = "41" }
iceberg-catalog-rest = { version = "0.3" }
iceberg-datafusion = { version = "0.3" }
datafusion = { version = "40" }
iceberg-rust = { version = "0.5.8" }
iceberg-rest-catalog = { version = "0.5.8" }
datafusion_iceberg = { version = "0.5.8" }
arrow = { version = "52" }
arrow-json = { version = "52" }
object_store = { workspace = true }

[dev-dependencies]
slatedb = {workspace = true }
object_store = { workspace = true }
72 changes: 54 additions & 18 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use crate::models::{StorageProfile, StorageProfileCreateRequest};
use crate::models::{Warehouse, WarehouseCreateRequest};
use crate::repository::{StorageProfileRepository, WarehouseRepository};
use async_trait::async_trait;
use datafusion::catalog_common::CatalogProvider;
use datafusion::prelude::*;
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::IcebergCatalogProvider;
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
use datafusion::prelude::*;
use iceberg_rest_catalog::apis::configuration::Configuration;
use iceberg_rust::catalog::bucket::ObjectStoreBuilder;
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use iceberg_rest_catalog::catalog::RestCatalog;
use arrow::record_batch::RecordBatch;
use object_store::local::LocalFileSystem;

#[async_trait]
pub trait ControlService: Send + Sync {
Expand All @@ -33,7 +35,8 @@ pub trait ControlService: Send + Sync {
// async fn delete_table(&self, id: Uuid) -> Result<()>;
// async fn list_tables(&self) -> Result<Vec<Table>>;

async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)>;
async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result<
(String)>;
}

pub struct ControlServiceImpl {
Expand Down Expand Up @@ -102,25 +105,58 @@ impl ControlService for ControlServiceImpl {
self.warehouse_repo.list().await
}

async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)> {
let config = RestCatalogConfig::builder()
.uri("http://0.0.0.0:3000/catalog".to_string())
.warehouse(warehouse_id.to_string())
.props(HashMap::default())
.build();
async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result<
(String)> {
let config = {
let mut config = Configuration::new();
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
config
};
let builder = {
Arc::new(LocalFileSystem::new())
};
let rest_client = RestCatalog::new(
Some(warehouse_id.to_string().as_str()),
config,
ObjectStoreBuilder::Filesystem(builder),
);
let catalog = IcebergCatalog::new(Arc::new(rest_client), None)
.await
.unwrap();

let ctx = SessionContext::new();
ctx.register_catalog("catalog", Arc::new(catalog));

let provider = ctx.catalog("catalog").unwrap();
let schemas = provider.schema_names();
println!("{schemas:?}");

let catalog = RestCatalog::new(config);
let tables = provider.schema(database_name).unwrap().table_names();
println!("{tables:?}");

let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog))
println!("{}", query);
let records = ctx
.sql(query)
.await
.unwrap()
.collect()
.await
.unwrap();
println!("{records:?}");

let df = ctx.sql(query,).await.unwrap();
df.show().await.unwrap();

// Test that catalog loaded successfully
println!("SCHEMAS: {:?}", catalog.schema_names());
let buf = Vec::new();
let mut writer = arrow_json::ArrayWriter::new(buf);
let record_refs: Vec<&RecordBatch> = records.iter().collect();
writer.write_batches(&record_refs).unwrap();
writer.finish().unwrap();

// TODO rest of the query code
// Get the underlying buffer back,
let buf = writer.into_inner();

Ok(("OK"))
Ok((String::from_utf8(buf).unwrap()))
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde = { workspace = true }
iceberg = { workspace = true }
slatedb = { workspace = true }
object_store = { workspace = true }
object_store_for_slatedb = { package = "object_store", version = "0.11.1" }
utils = { path = "../utils" }
utoipa = { workspace = true }
utoipa-axum = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions crates/nexus/src/http/catalog/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,15 @@ pub async fn get_config(

Ok(Json(config.into()))
}

// only one endpoint is defined for the catalog implementation to work
// we don't actually have functionality for views yet
pub async fn list_views(
State(state): State<AppState>,
Path((id, namespace_id)): Path<(Uuid, String)>,
) -> Result<Json<schemas::TableListResponse>, AppError> {

Ok(Json(schemas::TableListResponse {
identifiers: vec![],
}))
}
10 changes: 8 additions & 2 deletions crates/nexus/src/http/catalog/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::Router;

use crate::http::catalog::handlers::{
commit_table, create_namespace, create_table, delete_namespace, delete_table, get_config,
get_namespace, get_table, list_namespaces, list_tables,
get_namespace, get_table, list_namespaces, list_tables, list_views
};

pub fn create_router() -> Router<AppState> {
Expand All @@ -15,12 +15,18 @@ pub fn create_router() -> Router<AppState> {
.route("/:table", delete(delete_table))
.route("/:table", post(commit_table));

// only one endpoint is defined for the catalog implementation to work
// we don't actually have functionality for views yet
let view_router: Router<AppState> = Router::new()
.route("/", get(list_views));

let ns_router = Router::new()
.route("/", get(list_namespaces))
.route("/", post(create_namespace))
.route("/:namespace", get(get_namespace))
.route("/:namespace", delete(delete_namespace))
.nest("/:namespace/tables", table_router);
.nest("/:namespace/tables", table_router)
.nest("/:namespace/views", view_router);

// Iceberg clients do not prefix config fetch RPC call
// and do prefix (with whatever prefix returned by config fetch) all other RPC calls
Expand Down
2 changes: 1 addition & 1 deletion crates/nexus/src/http/ui/handlers/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub async fn query_table(
let request: TableQueryRequest = payload.into();
let result = state
.control_svc
.query_table(&warehouse_id, &request.query)
.query_table(&warehouse_id, &database_name, &table_name, &request.query)
.await?;
Ok(Json(TableQueryResponse {
id: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion crates/nexus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use catalog::repository::{DatabaseRepositoryDb, TableRepositoryDb};
use catalog::service::CatalogImpl;
use control_plane::repository::{StorageProfileRepositoryDb, WarehouseRepositoryDb};
use control_plane::service::ControlServiceImpl;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use object_store_for_slatedb::{memory::InMemory, path::Path, ObjectStore};
use slatedb::config::DbOptions;
use slatedb::db::Db as SlateDb;
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ def storage_profile(server: Server) -> dict:
@pytest.fixture(scope="session")
def warehouse(server: Server, storage_profile) -> dict:
warehouse_name = "test-warehouse"
preix = "prefix"
prefix = "prefix"
wh = server.create_warehouse(
name=warehouse_name,
prefix=preix,
prefix=prefix,
storage_profile_id=storage_profile.get("id", None),
)
yield wh
Expand Down
2 changes: 1 addition & 1 deletion tests/test_catalog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import conftest
import tests.conftest
import pandas as pd
import pyarrow as pa
import pytest
Expand Down