From 61a4691bd0521686b0f60dbc0933e764a8892d48 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 25 Oct 2024 17:05:59 +0300 Subject: [PATCH 1/3] Add databases and tables --- crates/catalog/src/models.rs | 4 +- crates/nexus/Cargo.toml | 2 +- .../nexus/src/http/ui/handlers/databases.rs | 192 ++++--- crates/nexus/src/http/ui/handlers/profiles.rs | 51 +- crates/nexus/src/http/ui/handlers/tables.rs | 252 +++++---- .../nexus/src/http/ui/handlers/warehouses.rs | 181 +++++- crates/nexus/src/http/ui/models/aws.rs | 1 + crates/nexus/src/http/ui/models/database.rs | 97 ++-- crates/nexus/src/http/ui/models/errors.rs | 93 +++- crates/nexus/src/http/ui/models/table.rs | 513 +++--------------- crates/nexus/src/http/ui/models/warehouse.rs | 63 +-- crates/nexus/src/http/ui/router.rs | 12 +- 12 files changed, 676 insertions(+), 785 deletions(-) diff --git a/crates/catalog/src/models.rs b/crates/catalog/src/models.rs index 5b668f315..cc5ace624 100644 --- a/crates/catalog/src/models.rs +++ b/crates/catalog/src/models.rs @@ -128,14 +128,14 @@ impl TableRequirementExt { } } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct Table { pub ident: TableIdent, pub metadata: TableMetadata, pub metadata_location: String, } -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] pub struct TableIdent { pub database: DatabaseIdent, pub table: String, diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index 4d69a325d..707a3763f 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -13,7 +13,7 @@ tokio = { workspace = true } control_plane = { path = "../control_plane" } catalog = { path = "../catalog" } chrono = { workspace = true } -uuid = { workspace = true } +uuid = { workspace = true, features = ["v4", "v5"] } serde_json = { workspace = true } serde_yaml = { workspace = true } serde = { workspace = true } diff --git a/crates/nexus/src/http/ui/handlers/databases.rs b/crates/nexus/src/http/ui/handlers/databases.rs index 5ed5c7042..11bf287f9 100644 --- a/crates/nexus/src/http/ui/handlers/databases.rs +++ b/crates/nexus/src/http/ui/handlers/databases.rs @@ -1,11 +1,12 @@ -use crate::error::AppError; -use crate::http::ui::models::aws; use crate::http::ui::models::database; -use crate::http::ui::models::storage_profile; -use crate::http::ui::models::table::Statistics; -use crate::http::ui::models::warehouse; +use crate::http::ui::models::database::{get_database_id, DatabaseDashboard}; +use crate::http::ui::models::errors::AppError; +use crate::http::ui::models::table::{get_table_id, TableEntity}; +use crate::http::ui::models::warehouse::WarehouseEntity; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use catalog::models::{DatabaseIdent, WarehouseIdent}; +use iceberg::NamespaceIdent; use swagger; use utoipa::OpenApi; use uuid::Uuid; @@ -20,7 +21,12 @@ use uuid::Uuid; components( schemas( database::CreateDatabasePayload, - database::Database + database::Database, + database::DatabaseDashboard, + database::DatabaseEntity, + database::DatabaseExtended, + database::DatabaseShort, + AppError, ) ), tags( @@ -33,96 +39,156 @@ pub struct ApiDoc; post, path = "/ui/warehouses/{warehouseId}/databases", operation_id = "webCreateDatabase", + params( + ("warehouseId" = Uuid, description = "Warehouse ID"), + ), + request_body = database::CreateDatabasePayload, responses( (status = 200, description = "Successful Response", body = database::Database), - (status = 400, description = "Bad request"), - (status = 500, description = "Internal server error") + (status = 400, description = "Bad request", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), + (status = 500, description = "Internal server error", body = AppError) ) )] pub async fn create_database( State(state): State, - Path(payload): Path, + Path(warehouse_id): Path, + Json(payload): Json, ) -> Result, AppError> { - Ok(Json(database::Database { - name: "".to_string(), - properties: None, - id: Default::default(), - warehouse_id: Default::default(), - })) + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let ident = DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse.id), + namespace: NamespaceIdent::new(payload.name), + }; + let res = state + .catalog_svc + .create_namespace(&ident, payload.properties.unwrap_or_default()) + .await + .map_err(|e| { + let fmt = format!("{}: failed to create database with ident {}", e, &ident); + AppError::new(e, fmt.as_str()) + })?; + Ok(Json(res.into())) } #[utoipa::path( delete, path = "/ui/warehouses/{warehouseId}/databases/{databaseName}", operation_id = "webDeleteDatabase", + params( + ("warehouseId" = Uuid, description = "Warehouse ID"), + ("databaseName" = String, description = "Database Name"), + ), responses( (status = 204, description = "Successful Response"), - (status = 404, description = "Database not found") + (status = 404, description = "Database not found", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), ) )] pub async fn delete_database( State(state): State, Path((warehouse_id, database_name)): Path<(Uuid, String)>, -) -> Result<(), AppError> { - Ok(()) +) -> Result, AppError> { + let ident = DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse_id), + namespace: NamespaceIdent::new(database_name), + }; + + state + .catalog_svc + .drop_namespace(&ident) + .await + .map_err(|e| { + let fmt = format!("{}: failed to delete database with ident {}", e, &ident); + AppError::new(e, fmt.as_str()) + })?; + Ok(Json(())) } #[utoipa::path( get, path = "/ui/warehouses/{warehouseId}/databases/{databaseName}", + params( + ("warehouseId" = Uuid, description = "Warehouse ID"), + ("databaseName" = String, description = "Database Name"), + ), operation_id = "webDatabaseDashboard", responses( - (status = 200, description = "Successful Response", body = database::DatabaseDashboard), - (status = 204, description = "Successful Response"), - (status = 404, description = "Database not found") + (status = 200, description = "Successful Response", body = DatabaseDashboard), + (status = 404, description = "Database not found", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), ) )] pub async fn get_database( State(state): State, Path((warehouse_id, database_name)): Path<(Uuid, String)>, ) -> Result, AppError> { - Ok(Json(database::DatabaseDashboard { - name: "".to_string(), - properties: None, - id: Default::default(), - warehouse_id: Default::default(), - warehouse: warehouse::WarehouseEntity { - name: "".to_string(), - storage_profile_id: Default::default(), - key_prefix: "".to_string(), - id: Default::default(), - external_id: Default::default(), - location: "".to_string(), - created_at: Default::default(), - updated_at: Default::default(), - storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::AWS, - region: "".to_string(), - bucket: "".to_string(), - credentials: Default::default(), - sts_role_arn: None, - endpoint: None, - id: Default::default(), - created_at: Default::default(), - updated_at: Default::default(), - }, - }, - tables: vec![], - statistics: Statistics { - commit_count: 0, - op_append_count: 0, - op_overwrite_count: 0, - op_delete_count: 0, - op_replace_count: 0, - total_bytes: 0, - bytes_added: 0, - bytes_removed: 0, - total_rows: 0, - rows_added: 0, - rows_deleted: 0, - table_count: None, - database_count: None, - }, + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let profile = state + .control_svc + .get_profile(warehouse.storage_profile_id) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get profile by id {}", + e, warehouse.storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; + let ident = DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse.id), + namespace: NamespaceIdent::new(database_name), + }; + let database = state.catalog_svc.get_namespace(&ident).await.map_err(|e| { + let fmt = format!( + "{}: failed to get database with db ident {}", + e, &ident + ); + AppError::new(e, fmt.as_str()) + })?; + let tables = state.catalog_svc.list_tables(&ident).await + .map_err(|e| { + let fmt = format!( + "{}: failed to get database tables with db ident {}", + e, &ident + ); + AppError::new(e, fmt.as_str()) + })?; + Ok(Json(DatabaseDashboard { + name: database.ident.to_string(), + properties: Option::from(database.properties), + id: get_database_id(database.ident), + warehouse_id, + warehouse: WarehouseEntity::new(warehouse.into(), profile.into()), + tables: tables + .into_iter() + .map(|t| { + let ident = t.ident.clone(); + TableEntity { + id: get_table_id(t.ident), + name: ident.table, + created_at: Default::default(), + updated_at: Default::default(), + statistics: Default::default(), + compaction_summary: None, + } + }) + .collect(), + statistics: Default::default(), compaction_summary: None, })) } diff --git a/crates/nexus/src/http/ui/handlers/profiles.rs b/crates/nexus/src/http/ui/handlers/profiles.rs index fa9325249..f7e1a4819 100644 --- a/crates/nexus/src/http/ui/handlers/profiles.rs +++ b/crates/nexus/src/http/ui/handlers/profiles.rs @@ -1,4 +1,4 @@ -use crate::error::AppError; +use crate::http::ui::models::errors::AppError; use crate::http::ui::models::{aws, storage_profile}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; @@ -22,6 +22,7 @@ use uuid::Uuid; aws::AwsAccessKeyCredential, aws::AwsRoleCredential, aws::CloudProvider, + AppError, ) ), tags( @@ -37,8 +38,9 @@ pub struct ApiDoc; request_body = storage_profile::CreateStorageProfilePayload, responses( (status = 200, description = "Successful Response", body = storage_profile::StorageProfile), - (status = 400, description = "Bad request"), - (status = 500, description = "Internal server error") + (status = 400, description = "Bad request", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), + (status = 500, description = "Internal server error", body = AppError) ) )] pub async fn create_storage_profile( @@ -46,11 +48,15 @@ pub async fn create_storage_profile( Json(payload): Json, ) -> Result, AppError> { let request: StorageProfileCreateRequest = payload.into(); - let profile: StorageProfile = state - .control_svc - .create_profile(&request) - .await - .map_err(|e| AppError::from(e))?; + let profile: StorageProfile = + state + .control_svc + .create_profile(&request) + .await + .map_err(|e| { + let fmt = format!("{}: failed to create storage profile", e); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(profile.into())) } @@ -63,7 +69,8 @@ pub async fn create_storage_profile( ), responses( (status = 200, description = "Successful Response", body = storage_profile::StorageProfile), - (status = 404, description = "Not found"), + (status = 404, description = "Not found", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), ) )] pub async fn get_storage_profile( @@ -74,7 +81,13 @@ pub async fn get_storage_profile( .control_svc .get_profile(storage_profile_id) .await - .map_err(|e| AppError::from(e))?; + .map_err(|e| { + let fmt = format!( + "{}: failed to get storage profile with id {}", + e, storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(profile.into())) } @@ -87,7 +100,8 @@ pub async fn get_storage_profile( ), responses( (status = 200, description = "Successful Response", body = storage_profile::StorageProfile), - (status = 404, description = "Not found"), + (status = 404, description = "Not found", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), ) )] pub async fn delete_storage_profile( @@ -98,7 +112,13 @@ pub async fn delete_storage_profile( .control_svc .delete_profile(storage_profile_id) .await - .map_err(|e| AppError::from(e))?; + .map_err(|e| { + let fmt = format!( + "{}: failed to delete storage profile with id {}", + e, storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(())) } @@ -108,12 +128,15 @@ pub async fn delete_storage_profile( path = "/ui/storage-profiles/", responses( (status = 200, body = Vec), - (status = 500, description = "Internal server error") + (status = 500, description = "Internal server error", body = AppError) ) )] pub async fn list_storage_profiles( State(state): State, ) -> Result>, AppError> { - let profiles = state.control_svc.list_profiles().await?; + let profiles = state.control_svc.list_profiles().await.map_err(|e| { + let fmt = format!("{}: failed to list storage profile", e); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(profiles.into_iter().map(|p| p.into()).collect())) } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index 5f17262e8..67eddfd82 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -1,12 +1,13 @@ -use crate::error::AppError; -use crate::http::ui::models::{aws, database, storage_profile, table, warehouse}; +use crate::http::ui::models::errors::AppError; +use crate::http::ui::models::table; +use crate::http::ui::models::table::TableQueryRequest; +use crate::http::ui::models::table::{Table, TableCreateRequest, TableExtended}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; -use axum_macros::debug_handler; +use catalog::models::{DatabaseIdent, TableIdent, WarehouseIdent}; +use iceberg::NamespaceIdent; use utoipa::OpenApi; use uuid::Uuid; -use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credentials, AwsAccessKeyCredential, WarehouseCreateRequest}; -use crate::http::ui::models::table::TableQueryRequest; #[derive(OpenApi)] #[openapi( @@ -14,14 +15,12 @@ use crate::http::ui::models::table::TableQueryRequest; create_table, delete_table, get_table, - list_tables, query_table, ), components( schemas( - table::TableExtended, table::TableQueryResponse, - database::Database, + AppError, ) ), tags( @@ -30,153 +29,119 @@ use crate::http::ui::models::table::TableQueryRequest; )] pub struct ApiDoc; -#[utoipa::path( - get, - path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables", - operation_id = "webTablesDashboard", - responses( - (status = 200, description = "List all warehouses", body = Vec), - (status = 500, description = "Internal server error") - ) -)] -pub async fn list_tables( - State(state): State, -) -> Result>, AppError> { - Ok(Json(vec![])) -} - #[utoipa::path( get, path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}", operation_id = "webTableDashboard", params( - ("warehouseId" = Uuid, Path, description = "Warehouse ID"), - ("databaseName" = Uuid, Path, description = "Database Name"), - ("tableName" = Uuid, Path, description = "Table name") + ("warehouseId" = Uuid, description = "Warehouse ID"), + ("databaseName" = String, description = "Database Name"), + ("tableName" = String, description = "Table name") ), responses( - (status = 200, description = "List all warehouses", body = Vec), - (status = 500, description = "Internal server error") + (status = 200, description = "List all warehouses"), + (status = 404, description = "Not found", body = AppError), + (status = 422, description = "Unprocessable entity", body = AppError), + (status = 500, description = "Internal server error", body = AppError) ) )] pub async fn get_table( State(state): State, Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, -) -> Result, AppError> { - Ok(Json(table::TableExtended { - id: Default::default(), - name: table_name, - database_name: Default::default(), - warehouse_id: Default::default(), - properties: None, - metadata: Default::default(), - statistics: None, - compaction_summary: None, - created_at: Default::default(), - updated_at: Default::default(), - database: database::DatabaseExtended { - name: "1".to_string(), - properties: None, - id: Default::default(), - warehouse_id, - statistics: None, - compaction_summary: None, - created_at: Default::default(), - updated_at: Default::default(), - warehouse: warehouse::WarehouseExtended { - name: "11".to_string(), - storage_profile_id: Default::default(), - key_prefix: "".to_string(), - id: Default::default(), - external_id: Default::default(), - location: "".to_string(), - created_at: Default::default(), - updated_at: Default::default(), - statistics: None, - compaction_summary: None, - storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::AWS, - region: "22".to_string(), - bucket: "2".to_string(), - credentials: Default::default(), - sts_role_arn: None, - endpoint: None, - id: Default::default(), - created_at: Default::default(), - updated_at: Default::default(), - }, - }, - }, - })) +) -> Result, AppError> { + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let profile = state + .control_svc + .get_profile(warehouse.storage_profile_id) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get profile by id {}", + e, warehouse.storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; + let ident = DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse.id), + namespace: NamespaceIdent::new(database_name), + }; + let database = state.catalog_svc.get_namespace(&ident).await.map_err(|e| { + let fmt = format!("{}: failed to get database with db ident {}", e, &ident); + AppError::new(e, fmt.as_str()) + })?; + + let table_ident = TableIdent { + database: ident, + table: table_name, + }; + let table = state + .catalog_svc + .load_table(&table_ident) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get table with ident {}", e, &table_ident); + AppError::new(e, fmt.as_str()) + })?; + + Ok(Json(TableExtended::new( + profile.into(), + warehouse.into(), + database.into(), + table, + ))) } #[utoipa::path( get, operation_id = "webCreateTable", - path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}", + path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables", params( - ("warehouseId" = Uuid, Path, description = "Warehouse ID"), - ("databaseName" = Uuid, Path, description = "Database Name"), - ("tableName" = Uuid, Path, description = "Table name") + ("warehouseId" = Uuid, description = "Warehouse ID"), + ("databaseName" = String, description = "Database Name"), ), responses( - (status = 200, description = "Successful Response", body = table::TableExtended), + (status = 200, description = "Successful Response"), (status = 404, description = "Not found"), ) )] pub async fn create_table( State(state): State, - Json(payload): Json, -) -> Result, AppError> { - Ok(Json(table::TableExtended { - id: Default::default(), - name: "3".to_string(), - database_name: "3".to_string(), - warehouse_id: Default::default(), - properties: None, - metadata: Default::default(), - statistics: None, - compaction_summary: None, - created_at: Default::default(), - updated_at: Default::default(), - database: database::DatabaseExtended { - name: "4".to_string(), - properties: None, - id: Default::default(), - warehouse_id: Default::default(), - statistics: None, - compaction_summary: None, - created_at: Default::default(), - updated_at: Default::default(), - warehouse: warehouse::WarehouseExtended { - name: "1".to_string(), - storage_profile_id: Default::default(), - key_prefix: "".to_string(), - id: Default::default(), - external_id: Default::default(), - location: "".to_string(), - created_at: Default::default(), - updated_at: Default::default(), - statistics: None, - compaction_summary: None, - storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::AWS, - region: "2".to_string(), - bucket: "2".to_string(), - credentials: Default::default(), - sts_role_arn: None, - endpoint: None, - id: Default::default(), - created_at: Default::default(), - updated_at: Default::default(), - }, - }, - }, - })) + Path((warehouse_id, database_name)): Path<(Uuid, String)>, + Json(payload): Json, +) -> Result, AppError> { + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let db_ident = DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse.id), + namespace: NamespaceIdent::new(database_name), + }; + + let table = state + .catalog_svc + .create_table(&db_ident, &warehouse, payload.into()) + .await + .map_err(|e| { + let fmt = format!("{}: failed to create table", e); + AppError::new(e, fmt.as_str()) + })?; + Ok(Json(table.into())) } #[utoipa::path( - get, + delete, operation_id = "webDeleteTable", path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}", params( @@ -185,18 +150,40 @@ pub async fn create_table( ("tableName" = Uuid, Path, description = "Table name") ), responses( - (status = 200, description = "Successful Response", body = table::TableExtended), - (status = 404, description = "Not found"), + (status = 200, description = "Successful Response"), + (status = 404, description = "Not found", body=AppError), ) )] pub async fn delete_table( State(state): State, Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, ) -> Result<(), AppError> { + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let table_ident = TableIdent { + database: DatabaseIdent { + warehouse: WarehouseIdent::new(warehouse.id), + namespace: NamespaceIdent::new(database_name), + }, + table: table_name, + }; + state + .catalog_svc + .drop_table(&table_ident) + .await + .map_err(|e| { + let fmt = format!("{}: failed to delete table with ident {}", e, &table_ident); + AppError::new(e, fmt.as_str()) + })?; Ok(()) } - #[utoipa::path( post, path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query", @@ -218,7 +205,10 @@ pub async fn query_table( Json(payload): Json, ) -> Result, AppError> { let request: TableQueryRequest = payload.into(); - let result = state.control_svc.query_table(&warehouse_id, &request.query).await?; + let result = state + .control_svc + .query_table(&warehouse_id, &request.query) + .await?; Ok(Json(table::TableQueryResponse { id: Default::default(), query: request.query.clone(), diff --git a/crates/nexus/src/http/ui/handlers/warehouses.rs b/crates/nexus/src/http/ui/handlers/warehouses.rs index 733423fa6..0279c45db 100644 --- a/crates/nexus/src/http/ui/handlers/warehouses.rs +++ b/crates/nexus/src/http/ui/handlers/warehouses.rs @@ -1,7 +1,10 @@ -use crate::error::AppError; +use crate::http::ui::models::database::{get_database_id, DatabaseShort}; +use crate::http::ui::models::errors::AppError; +use crate::http::ui::models::table::{get_table_id, TableShort}; use crate::http::ui::models::warehouse; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use catalog::models::WarehouseIdent; use control_plane::models::{Warehouse as WarehouseModel, WarehouseCreateRequest}; use utoipa::OpenApi; use uuid::Uuid; @@ -9,6 +12,7 @@ use uuid::Uuid; #[derive(OpenApi)] #[openapi( paths( + navigation, get_warehouse, list_warehouses, create_warehouse, @@ -16,9 +20,13 @@ use uuid::Uuid; ), components( schemas( + warehouse::CreateWarehousePayload, warehouse::Warehouse, + warehouse::WarehouseExtended, warehouse::WarehousesDashboard, - warehouse::CreateWarehousePayload, + warehouse::WarehouseEntity, + warehouse::WarehouseShort, + AppError, ) ), tags( @@ -27,20 +35,96 @@ use uuid::Uuid; )] pub struct ApiDoc; +#[utoipa::path( + get, + path = "/ui/navigation", + operation_id = "webWarehousesNavigation", + responses( + (status = 200, description = "List all warehouses fot navigation", body = warehouse::Navigation), + ) +)] +pub async fn navigation( + State(state): State, +) -> Result, AppError> { + let warehouses = state.control_svc.list_warehouses().await.map_err(|e| { + let fmt = format!("{}: failed to get warehouses", e); + AppError::new(e, fmt.as_str()) + })?; + let mut warehouses_short = Vec::new(); + + for warehouse in warehouses { + let databases = state + .catalog_svc + .list_namespaces(&WarehouseIdent::new(warehouse.id), None) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get warehouse databases with wh id {}", + e, warehouse.id + ); + AppError::new(e, fmt.as_str()) + })?; + let mut databases_short = Vec::new(); + + for database in databases { + let tables = state + .catalog_svc + .list_tables(&database.ident) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get database tables with db ident {}", + e, &database.ident + ); + AppError::new(e, fmt.as_str()) + })?; + let ident = database.ident.clone(); + databases_short.push(DatabaseShort { + id: get_database_id(database.ident), + name: ident.to_string(), + tables: tables + .into_iter() + .map(|t| { + let ident = t.ident.clone(); + TableShort { + id: get_table_id(t.ident), + name: ident.table, + } + }) + .collect(), + }); + } + warehouses_short.push(warehouse::WarehouseShort { + id: warehouse.id, + name: warehouse.name, + databases: databases_short, + }); + } + Ok(Json(warehouse::Navigation { + warehouses: warehouses_short, + })) +} #[utoipa::path( get, path = "/ui/warehouses", operation_id = "webWarehousesDashboard", responses( (status = 200, description = "List all warehouses", body = warehouse::WarehousesDashboard), - (status = 500, description = "Internal server error") + (status = 500, description = "List all warehouses", body = AppError), + ) )] pub async fn list_warehouses( State(state): State, ) -> Result, AppError> { - let warehouses = state.control_svc.list_warehouses().await?; - let storage_profiles = state.control_svc.list_profiles().await?; + let warehouses = state.control_svc.list_warehouses().await.map_err(|e| { + let fmt = format!("{}: failed to get warehouses", e); + AppError::new(e, fmt.as_str()) + })?; + let storage_profiles = state.control_svc.list_profiles().await.map_err(|e| { + let fmt = format!("{}: failed to get profiles", e); + AppError::new(e, fmt.as_str()) + })?; let mut extended_warehouses = Vec::new(); for warehouse in warehouses { @@ -69,28 +153,59 @@ pub async fn list_warehouses( ("warehouseId" = Uuid, Path, description = "Warehouse ID") ), responses( - (status = 200, description = "Warehouse found", body = warehouse::WarehouseExtended), - (status = 404, description = "Warehouse not found") + (status = 200, description = "Warehouse found", body = warehouse::WarehouseDashboard), + (status = 404, description = "Warehouse not found", body = AppError) ) )] pub async fn get_warehouse( State(state): State, Path(warehouse_id): Path, -) -> Result, AppError> { - let warehouse = state.control_svc.get_warehouse(warehouse_id).await?; - - let mut extended_warehouse = - warehouse::WarehouseExtended::new(warehouse.into(), Default::default()); +) -> Result, AppError> { + let warehouse = state + .control_svc + .get_warehouse(warehouse_id) + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id); + AppError::new(e, fmt.as_str()) + })?; + let profile = state + .control_svc + .get_profile(warehouse.storage_profile_id) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get profile by id {}", + e, warehouse.storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; + let databases = state + .catalog_svc + .list_namespaces(&WarehouseIdent::new(warehouse.id), None) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get warehouse databases with wh id {}", + e, warehouse.id + ); + AppError::new(e, fmt.as_str()) + })?; + let mut dashboard = warehouse::WarehouseDashboard::new( + warehouse.into(), + profile.into(), + databases.into_iter().map(|d| d.into()).collect(), + ); if let Ok(profile) = state .control_svc - .get_profile(extended_warehouse.storage_profile_id) + .get_profile(dashboard.storage_profile_id) .await { - extended_warehouse.storage_profile = profile.into(); + dashboard.storage_profile = profile.into(); } - Ok(Json(extended_warehouse)) + Ok(Json(dashboard)) } #[utoipa::path( @@ -100,7 +215,7 @@ pub async fn get_warehouse( operation_id = "webCreateWarehouse", responses( (status = 201, description = "Warehouse created", body = warehouse::Warehouse), - (status = 400, description = "Bad request"), + (status = 422, description = "Unprocessable Entity"), (status = 500, description = "Internal server error") ) )] @@ -109,17 +224,29 @@ pub async fn create_warehouse( Json(payload): Json, ) -> Result, AppError> { let request: WarehouseCreateRequest = payload.into(); - state .control_svc .get_profile(request.storage_profile_id) .await - .map_err(|e| AppError::from(e))?; - let warehouse: WarehouseModel = state - .control_svc - .create_warehouse(&request) - .await - .map_err(|e| AppError::from(e))?; + .map_err(|e| { + let fmt = format!( + "{}: failed to get profile with id {}", + e, request.storage_profile_id + ); + AppError::new(e, fmt.as_str()) + })?; + let warehouse: WarehouseModel = + state + .control_svc + .create_warehouse(&request) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get create warehouse with name {}", + e, request.name + ); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(warehouse.into())) } @@ -143,6 +270,12 @@ pub async fn delete_warehouse( .control_svc .delete_warehouse(warehouse_id) .await - .map_err(|e| AppError::from(e))?; + .map_err(|e| { + let fmt = format!( + "{}: failed to get delete warehouse with id {}", + e, warehouse_id + ); + AppError::new(e, fmt.as_str()) + })?; Ok(Json(())) } diff --git a/crates/nexus/src/http/ui/models/aws.rs b/crates/nexus/src/http/ui/models/aws.rs index 40993b1e5..2d3c22c77 100644 --- a/crates/nexus/src/http/ui/models/aws.rs +++ b/crates/nexus/src/http/ui/models/aws.rs @@ -118,6 +118,7 @@ impl From for models::CloudProvider { } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] +#[serde(untagged)] pub enum Credentials { #[serde(rename = "access_key")] AccessKey(AwsAccessKeyCredential), diff --git a/crates/nexus/src/http/ui/models/database.rs b/crates/nexus/src/http/ui/models/database.rs index 2b45024b6..1e2130a64 100644 --- a/crates/nexus/src/http/ui/models/database.rs +++ b/crates/nexus/src/http/ui/models/database.rs @@ -1,14 +1,20 @@ +use crate::http::ui::models::storage_profile::StorageProfile; use crate::http::ui::models::table::{Statistics, TableEntity, TableShort}; -use crate::http::ui::models::warehouse::{WarehouseEntity, WarehouseExtended}; +use crate::http::ui::models::warehouse::{Warehouse, WarehouseEntity, WarehouseExtended}; +use catalog::models; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use utoipa::ToSchema; +use uuid::Uuid; use validator::Validate; +pub const DATABASE_NAME: &str = "database_name"; + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct CreateDatabasePayload { pub name: String, #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option, + pub properties: Option>, } impl CreateDatabasePayload { @@ -25,14 +31,14 @@ impl CreateDatabasePayload { pub struct Database { pub name: String, #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option, - pub id: uuid::Uuid, - pub warehouse_id: uuid::Uuid, + pub properties: Option>, + pub id: Uuid, + pub warehouse_id: Uuid, } impl Database { #[allow(clippy::new_without_default)] - pub fn new(name: String, id: uuid::Uuid, warehouse_id: uuid::Uuid) -> Database { + pub fn new(name: String, id: Uuid, warehouse_id: Uuid) -> Database { Database { name, properties: None, @@ -42,13 +48,24 @@ impl Database { } } +impl From for Database { + fn from(db: models::Database) -> Self { + Database { + id: get_database_id(db.ident.clone()), + name: db.ident.namespace.first().unwrap().to_string(), + properties: db.properties.into(), + warehouse_id: *db.ident.warehouse.id(), + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct DatabaseDashboard { pub name: String, #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option, - pub id: uuid::Uuid, - pub warehouse_id: uuid::Uuid, + pub properties: Option>, + pub id: Uuid, + pub warehouse_id: Uuid, pub warehouse: WarehouseEntity, pub tables: Vec, pub statistics: Statistics, @@ -60,8 +77,8 @@ impl DatabaseDashboard { #[allow(clippy::new_without_default)] pub fn new( name: String, - id: uuid::Uuid, - warehouse_id: uuid::Uuid, + id: Uuid, + warehouse_id: Uuid, warehouse: WarehouseEntity, tables: Vec, statistics: Statistics, @@ -81,7 +98,7 @@ impl DatabaseDashboard { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct DatabaseEntity { - pub id: uuid::Uuid, + pub id: Uuid, pub name: String, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, @@ -93,7 +110,7 @@ pub struct DatabaseEntity { impl DatabaseEntity { #[allow(clippy::new_without_default)] pub fn new( - id: uuid::Uuid, + id: Uuid, name: String, created_at: chrono::DateTime, updated_at: chrono::DateTime, @@ -110,13 +127,26 @@ impl DatabaseEntity { } } +impl From for DatabaseEntity { + fn from(db: models::Database) -> Self { + DatabaseEntity { + id: get_database_id(db.ident.clone()), + name: db.ident.namespace.first().unwrap().to_string(), + created_at: Default::default(), + updated_at: Default::default(), + statistics: Default::default(), + compaction_summary: None, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct DatabaseExtended { pub name: String, #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option, - pub id: uuid::Uuid, - pub warehouse_id: uuid::Uuid, + pub properties: Option>, + pub id: Uuid, + pub warehouse_id: Uuid, #[serde(skip_serializing_if = "Option::is_none")] pub statistics: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -127,39 +157,42 @@ pub struct DatabaseExtended { } impl DatabaseExtended { - #[allow(clippy::new_without_default)] pub fn new( - name: String, - id: uuid::Uuid, - warehouse_id: uuid::Uuid, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - warehouse: WarehouseExtended, + profile: StorageProfile, + warehouse: Warehouse, + database: Database, ) -> DatabaseExtended { DatabaseExtended { - name, - properties: None, - id, - warehouse_id, + id: database.id, + name: database.name, + warehouse_id: database.warehouse_id, + properties: database.properties, statistics: None, compaction_summary: None, - created_at, - updated_at, - warehouse, + created_at: Default::default(), + updated_at: Default::default(), + warehouse: WarehouseExtended::new(warehouse, profile), } } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct DatabaseShort { - pub id: uuid::Uuid, + pub id: Uuid, pub name: String, pub tables: Vec, } +pub fn get_database_id(ident: models::DatabaseIdent) -> Uuid { + Uuid::new_v5( + &Uuid::NAMESPACE_DNS, + ident.namespace.first().unwrap().to_string().as_bytes(), + ) +} + impl DatabaseShort { #[allow(clippy::new_without_default)] - pub fn new(id: uuid::Uuid, name: String, tables: Vec) -> DatabaseShort { + pub fn new(id: Uuid, name: String, tables: Vec) -> DatabaseShort { DatabaseShort { id, name, tables } } } diff --git a/crates/nexus/src/http/ui/models/errors.rs b/crates/nexus/src/http/ui/models/errors.rs index ad76cddc2..571ee6c6f 100644 --- a/crates/nexus/src/http/ui/models/errors.rs +++ b/crates/nexus/src/http/ui/models/errors.rs @@ -1,48 +1,89 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use catalog::error::Error as CatalogError; +use control_plane::error::Error as ControlError; +use utoipa::ToSchema; -#[derive(thiserror::Error, Debug)] -pub enum Error { +#[derive(thiserror::Error, Debug, ToSchema)] +pub enum AppError { #[error("bad request: {0}")] BadRequest(String), - #[error("not found")] - NotFound, - #[error("internal server error")] - InternalServerError, - #[error("already exists")] - AlreadyExists, - #[error("not empty")] - NotEmpty, - #[error("unprocessable entity")] - UnprocessableEntity, - #[error("not implemented")] - Implemented, + #[error("not found: {0}")] + NotFound(String), + #[error("internal server error: {0}")] + InternalServerError(String), + #[error("already exists: {0}")] + AlreadyExists(String), + #[error("not empty: {0}")] + NotEmpty(String), + #[error("unprocessable entity: {0}")] + UnprocessableEntity(String), + #[error("not implemented: {0}")] + NotImplemented(String), #[error("DB error: {0}")] DbError(String), #[error("Iceberg error: {0}")] - IcebergError(#[from] iceberg::Error), + IcebergError(String), } -impl IntoResponse for Error { +impl AppError { + pub fn new>(err: T, message: &str) -> Self { + let mut error = err.into(); + error.with(message); + error + } + + pub fn with(&mut self, new_message: &str) { + match self { + AppError::BadRequest(ref mut msg) => *msg = new_message.to_string(), + AppError::NotFound(ref mut msg) => *msg = new_message.to_string(), + AppError::InternalServerError(ref mut msg) => *msg = new_message.to_string(), + AppError::AlreadyExists(ref mut msg) => *msg = new_message.to_string(), + AppError::NotEmpty(ref mut msg) => *msg = new_message.to_string(), + AppError::UnprocessableEntity(ref mut msg) => *msg = new_message.to_string(), + AppError::NotImplemented(ref mut msg) => *msg = new_message.to_string(), + AppError::DbError(ref mut msg) => *msg = new_message.to_string(), + AppError::IcebergError(ref mut msg) => *msg = new_message.to_string(), + } + } +} +impl IntoResponse for AppError { fn into_response(self) -> Response { let (status, error_message) = match self { - Error::NotFound => (StatusCode::NOT_FOUND, self.to_string()), - Error::DbError(ref _e) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - Error::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), - Error::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg), + AppError::NotFound(ref _e) => (StatusCode::NOT_FOUND, self.to_string()), + AppError::DbError(ref _e) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + AppError::InternalServerError(ref _e) => { + (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()) + } + AppError::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg), _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), }; (status, error_message).into_response() } } -impl From for Error { - fn from(e: utils::Error) -> Self { +impl From for AppError { + fn from(e: ControlError) -> Self { match e { - utils::Error::DbError(e) => Error::DbError(e.to_string()), - utils::Error::SerializeError(e) => Error::DbError(e.to_string()), - utils::Error::DeserializeError(e) => Error::DbError(e.to_string()), - utils::Error::ErrNotFound => Error::NotFound, + ControlError::NotEmpty(e) => AppError::AlreadyExists(e.to_string()), + ControlError::InvalidInput(e) => AppError::BadRequest(e.to_string()), + ControlError::ErrNotFound => AppError::NotFound(e.to_string()), } } } +impl From for AppError { + fn from(e: CatalogError) -> Self { + match e { + CatalogError::ErrNotFound => AppError::NotFound(e.to_string()), + CatalogError::ErrAlreadyExists => AppError::AlreadyExists(e.to_string()), + CatalogError::NotEmpty(e) => AppError::NotEmpty(e.to_string()), + CatalogError::NotImplemented => AppError::NotImplemented(e.to_string()), + CatalogError::InvalidInput(e) => AppError::BadRequest(e.to_string()), + CatalogError::FailedRequirement(e) => AppError::UnprocessableEntity(e.to_string()), + CatalogError::DbError(e) => AppError::DbError(e.to_string()), + CatalogError::IcebergError(e) => AppError::IcebergError(e.to_string()), + _ => AppError::InternalServerError(e.to_string()), + } + } +} + diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index ce64c2c2a..96107816e 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -1,379 +1,64 @@ -use crate::http::ui::models::database::{CompactionSummary, DatabaseExtended}; +use crate::http::ui::models::database::{CompactionSummary, Database, DatabaseExtended}; +use crate::http::ui::models::storage_profile::StorageProfile; +use crate::http::ui::models::warehouse::Warehouse; +use catalog::models as CatalogModels; +use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; use serde::{Deserialize, Serialize}; -use utoipa::ToSchema; +use std::collections::HashMap; +use utoipa::{PartialSchema, ToSchema}; +use uuid::Uuid; use validator::Validate; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableShort { - pub id: uuid::Uuid, +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct TableCreateRequest { pub name: String, + pub location: Option, + pub schema: Schema, + pub partition_spec: Option, + pub write_order: Option, + pub stage_create: Option, + pub properties: Option>, } -impl TableShort { - #[allow(clippy::new_without_default)] - pub fn new(id: uuid::Uuid, name: String) -> TableShort { - TableShort { id, name } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableMetadataV2 { - pub location: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub table_uuid: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub last_updated_ms: Option, - pub last_column_id: i32, - #[serde(skip_serializing_if = "Option::is_none")] - pub schemas: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub current_schema_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub partition_specs: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub default_spec_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub last_partition_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub current_snapshot_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub snapshots: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub snapshot_log: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub metadata_log: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub sort_orders: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub default_sort_order_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub refs: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub format_version: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub last_sequence_number: Option, -} - -impl TableMetadataV2 { - #[allow(clippy::new_without_default)] - pub fn new(location: String, last_column_id: i32) -> TableMetadataV2 { - TableMetadataV2 { - location, - table_uuid: None, - last_updated_ms: None, - last_column_id, - schemas: None, - current_schema_id: Some(0), - partition_specs: None, - default_spec_id: Some(0), - last_partition_id: None, - properties: None, - current_snapshot_id: None, - snapshots: None, - snapshot_log: None, - metadata_log: None, - sort_orders: None, - default_sort_order_id: Some(0), - refs: None, - format_version: None, - last_sequence_number: Some(0), - } - } -} - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, ToSchema, Hash, -)] -pub enum FormatVersion1 { - #[serde(rename = "2")] - Variant2, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct MetadataLogEntry { - pub metadata_file: String, - pub timestamp_ms: i32, -} - -impl MetadataLogEntry { - #[allow(clippy::new_without_default)] - pub fn new(metadata_file: String, timestamp_ms: i32) -> MetadataLogEntry { - MetadataLogEntry { - metadata_file, - timestamp_ms, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct PartitionSpec { - #[serde(skip_serializing_if = "Option::is_none")] - pub spec_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub fields: Option>, -} - -impl PartitionSpec { - #[allow(clippy::new_without_default)] - pub fn new() -> PartitionSpec { - PartitionSpec { - spec_id: Some(0), - fields: None, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct Schema { - #[serde(skip_serializing_if = "Option::is_none")] - pub r#type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub fields: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub schema_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub identifier_field_ids: Option>, -} - -impl Schema { - #[allow(clippy::new_without_default)] - pub fn new() -> Schema { - Schema { - r#type: None, - fields: None, - schema_id: Some(0), - identifier_field_ids: None, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct Snapshot { - pub snapshot_id: i32, - #[serde(skip_serializing_if = "Option::is_none")] - pub parent_snapshot_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub sequence_number: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub timestamp_ms: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub manifest_list: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub summary: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub schema_id: Option, -} - -impl Snapshot { - #[allow(clippy::new_without_default)] - pub fn new(snapshot_id: i32) -> Snapshot { - Snapshot { - snapshot_id, - parent_snapshot_id: None, - sequence_number: None, - timestamp_ms: None, - manifest_list: None, - summary: None, - schema_id: None, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct SnapshotLogEntry { - pub snapshot_id: i32, - pub timestamp_ms: i32, -} - -impl SnapshotLogEntry { - #[allow(clippy::new_without_default)] - pub fn new(snapshot_id: i32, timestamp_ms: i32) -> SnapshotLogEntry { - SnapshotLogEntry { - snapshot_id, - timestamp_ms, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct SnapshotRef { - pub snapshot_id: i32, - pub r#type: SnapshotRefType, - #[serde(skip_serializing_if = "Option::is_none")] - pub min_snapshots_to_keep: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub max_snapshot_age_ms: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub max_ref_age_ms: Option, -} - -impl SnapshotRef { - #[allow(clippy::new_without_default)] - pub fn new(snapshot_id: i32, r#type: SnapshotRefType) -> SnapshotRef { - SnapshotRef { - snapshot_id, - r#type, - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, +impl From for catalog::models::TableCreation { + fn from(schema: TableCreateRequest) -> Self { + catalog::models::TableCreation { + name: schema.name, + location: schema.location, + schema: schema.schema, + partition_spec: schema.partition_spec.map(std::convert::Into::into), + sort_order: schema.write_order, + properties: schema.properties.unwrap_or_default(), } } } -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum SnapshotRefType { - #[serde(rename = "branch")] - Branch, - #[serde(rename = "tag")] - Tag, -} - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum FormatVersion { - #[serde(rename = "1")] - Variant1, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct NestedField { - pub id: i32, +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct Table { + pub id: Uuid, pub name: String, - pub r#type: serde_json::Value, - #[serde(skip_serializing_if = "Option::is_none")] - pub required: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub doc: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub initial_default: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub write_default: Option, + pub metadata: TableMetadataWrapper, + pub metadata_location: String, } -impl NestedField { - #[allow(clippy::new_without_default)] - pub fn new(id: i32, name: String, r#type: serde_json::Value) -> NestedField { - NestedField { - id, - name, - r#type, - required: Some(false), - doc: None, - initial_default: None, - write_default: None, +impl From for Table { + fn from(table: catalog::models::Table) -> Self { + Self { + id: get_table_id(table.clone().ident), + name: table.ident.table, + metadata_location: table.metadata_location, + metadata: TableMetadataWrapper(table.metadata), } } } -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum NullOrder { - #[serde(rename = "nulls-first")] - First, - #[serde(rename = "nulls-last")] - Last, -} - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum Operation { - #[serde(rename = "append")] - Append, - #[serde(rename = "replace")] - Replace, - #[serde(rename = "overwrite")] - Overwrite, - #[serde(rename = "delete")] - Delete, -} - #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct PartitionField { - pub source_id: i32, - pub field_id: i32, - pub transform: String, +pub struct TableShort { + pub id: Uuid, pub name: String, } - -impl PartitionField { - #[allow(clippy::new_without_default)] - pub fn new(source_id: i32, field_id: i32, transform: String, name: String) -> PartitionField { - PartitionField { - source_id, - field_id, - transform, - name, - } - } -} - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum SortDirection { - #[serde(rename = "asc")] - Asc, - #[serde(rename = "desc")] - Desc, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct SortField { - pub source_id: i32, - pub transform: String, - pub direction: SortDirection, - pub null_order: NullOrder, -} - -impl SortField { - #[allow(clippy::new_without_default)] - pub fn new( - source_id: i32, - transform: String, - direction: SortDirection, - null_order: NullOrder, - ) -> SortField { - SortField { - source_id, - transform, - direction, - null_order, - } - } -} - -/// Describes how the data is sorted within the table. Users can sort their data within partitions by columns to gain performance. The order of the sort fields within the list defines the order in which the sort is applied to the data. Args: fields (List[SortField]): The fields how the table is sorted. Keyword Args: order_id (int): An unique id of the sort-order of a table. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct SortOrder { - #[serde(skip_serializing_if = "Option::is_none")] - pub order_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub fields: Option>, -} - -impl SortOrder { - #[allow(clippy::new_without_default)] - pub fn new() -> SortOrder { - SortOrder { - order_id: Some(1), - fields: None, - } - } -} - -#[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, -)] -pub enum Type { - #[serde(rename = "struct")] - Struct, +pub fn get_table_id(ident: CatalogModels::TableIdent) -> Uuid { + Uuid::new_v5(&Uuid::NAMESPACE_DNS, ident.table.to_string().as_bytes()) } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] @@ -407,18 +92,20 @@ impl TableEntity { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TableMetadataWrapper(pub TableMetadata); + + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct TableExtended { - pub id: uuid::Uuid, + pub id: Uuid, pub name: String, pub database_name: String, - pub warehouse_id: uuid::Uuid, - #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option, - pub metadata: serde_json::Value, - #[serde(skip_serializing_if = "Option::is_none")] + pub warehouse_id: Uuid, + pub properties: Option>, + pub metadata: TableMetadataWrapper, + pub metadata_location: String, pub statistics: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, pub created_at: chrono::DateTime, pub updated_at: chrono::DateTime, @@ -428,96 +115,24 @@ pub struct TableExtended { impl TableExtended { #[allow(clippy::new_without_default)] pub fn new( - id: uuid::Uuid, - name: String, - database_name: String, - warehouse_id: uuid::Uuid, - metadata: serde_json::Value, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - database: DatabaseExtended, + profile: StorageProfile, + warehouse: Warehouse, + database: Database, + table: CatalogModels::Table, ) -> TableExtended { TableExtended { - id, - name, - database_name, - warehouse_id, + id: get_table_id(table.clone().ident), + name: table.ident.table, + database_name: table.ident.database.to_string(), + warehouse_id: warehouse.id, properties: None, - metadata, + metadata: TableMetadataWrapper(table.metadata), + metadata_location: table.metadata_location, statistics: None, compaction_summary: None, - created_at, - updated_at, - database, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableMetadataV1 { - pub location: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub table_uuid: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub last_updated_ms: Option, - pub last_column_id: i32, - #[serde(skip_serializing_if = "Option::is_none")] - pub schemas: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub current_schema_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub partition_specs: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub default_spec_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub last_partition_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub current_snapshot_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub snapshots: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub snapshot_log: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub metadata_log: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub sort_orders: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub default_sort_order_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub refs: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - pub format_version: Option, - pub schema: Schema, - #[serde(skip_serializing_if = "Option::is_none")] - pub partition_spec: Option>, -} - -impl TableMetadataV1 { - #[allow(clippy::new_without_default)] - pub fn new(location: String, last_column_id: i32, schema: Schema) -> TableMetadataV1 { - TableMetadataV1 { - location, - table_uuid: None, - last_updated_ms: None, - last_column_id, - schemas: None, - current_schema_id: Some(0), - partition_specs: None, - default_spec_id: Some(0), - last_partition_id: None, - properties: None, - current_snapshot_id: None, - snapshots: None, - snapshot_log: None, - metadata_log: None, - sort_orders: None, - default_sort_order_id: Some(0), - refs: None, - format_version: None, - schema, - partition_spec: None, + created_at: Default::default(), + updated_at: Default::default(), + database: DatabaseExtended::new(profile, warehouse, database), } } } diff --git a/crates/nexus/src/http/ui/models/warehouse.rs b/crates/nexus/src/http/ui/models/warehouse.rs index bb5c21a7f..c152e6f21 100644 --- a/crates/nexus/src/http/ui/models/warehouse.rs +++ b/crates/nexus/src/http/ui/models/warehouse.rs @@ -124,7 +124,7 @@ pub struct WarehouseDashboard { pub updated_at: chrono::DateTime, pub storage_profile: StorageProfile, pub databases: Vec, - pub statistics: Statistics, + pub statistics: Option, #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, } @@ -132,31 +132,23 @@ pub struct WarehouseDashboard { impl WarehouseDashboard { #[allow(clippy::new_without_default)] pub fn new( - name: String, - storage_profile_id: uuid::Uuid, - key_prefix: String, - id: uuid::Uuid, - external_id: uuid::Uuid, - location: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, + warehouse: Warehouse, storage_profile: StorageProfile, databases: Vec, - statistics: Statistics, ) -> WarehouseDashboard { WarehouseDashboard { - name, - storage_profile_id, - key_prefix, - id, - external_id, - location, - created_at, - updated_at, - storage_profile, - databases, - statistics, + id: warehouse.id, + key_prefix: warehouse.key_prefix, + name: warehouse.name, + location: warehouse.location, + storage_profile_id: warehouse.storage_profile_id, + created_at: warehouse.created_at, + updated_at: warehouse.updated_at, + external_id: warehouse.external_id, + statistics: None, compaction_summary: None, + storage_profile, + databases: databases, } } } @@ -178,31 +170,22 @@ pub struct WarehouseEntity { impl WarehouseEntity { #[allow(clippy::new_without_default)] - pub fn new( - name: String, - storage_profile_id: uuid::Uuid, - key_prefix: String, - id: uuid::Uuid, - external_id: uuid::Uuid, - location: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - storage_profile: StorageProfile, - ) -> WarehouseEntity { + pub fn new(warehouse: Warehouse, storage_profile: StorageProfile) -> WarehouseEntity { WarehouseEntity { - name, - storage_profile_id, - key_prefix, - id, - external_id, - location, - created_at, - updated_at, + id: warehouse.id, + key_prefix: warehouse.key_prefix, + name: warehouse.name, + location: warehouse.location, + storage_profile_id: warehouse.storage_profile_id, + created_at: warehouse.created_at, + updated_at: warehouse.updated_at, + external_id: warehouse.external_id, storage_profile, } } } + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct WarehouseExtended { #[validate(length(min = 1))] diff --git a/crates/nexus/src/http/ui/router.rs b/crates/nexus/src/http/ui/router.rs index ea6e282ff..b8e88c6ac 100644 --- a/crates/nexus/src/http/ui/router.rs +++ b/crates/nexus/src/http/ui/router.rs @@ -1,10 +1,10 @@ -use crate::http::ui::handlers::databases::{delete_database, get_database}; +use crate::http::ui::handlers::databases::{create_database, delete_database, get_database}; use crate::http::ui::handlers::profiles::{ create_storage_profile, delete_storage_profile, get_storage_profile, list_storage_profiles, }; -use crate::http::ui::handlers::tables::{get_table, query_table}; +use crate::http::ui::handlers::tables::{create_table, delete_table, get_table, query_table}; use crate::http::ui::handlers::warehouses::{ - create_warehouse, delete_warehouse, get_warehouse, list_warehouses, + create_warehouse, delete_warehouse, get_warehouse, list_warehouses, navigation, }; use crate::state::AppState; use axum::routing::{delete, get, post}; @@ -12,6 +12,7 @@ use axum::Router; pub fn create_router() -> Router { Router::new() + .route("/navigation", get(navigation)) .route("/warehouses", post(create_warehouse).get(list_warehouses)) .route( "/warehouses/:warehouseId", @@ -21,6 +22,11 @@ pub fn create_router() -> Router { "/warehouses/:warehouseId/databases/:databaseName", get(get_database).delete(delete_database), ) + .route("/warehouses/:warehouseId/databases", post(create_database)) + .route( + "/warehouses/:warehouseId/databases/:databaseName/tables", + post(create_table).delete(delete_table), + ) .route( "/warehouses/:warehouseId/databases/:databaseName/tables/:tableName", get(get_table), From 950caba35d05b9aaf5ef03fdbf2dad46552b5368 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 25 Oct 2024 17:22:53 +0300 Subject: [PATCH 2/3] Add databases and tables --- crates/nexus/src/http/ui/models/table.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index 96107816e..def21b080 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -93,8 +93,7 @@ impl TableEntity { } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub struct TableMetadataWrapper(pub TableMetadata); - +pub struct TableMetadataWrapper(TableMetadata); #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct TableExtended { From c485ae75060d7ced867e52c43296a158c219e977 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 25 Oct 2024 17:35:27 +0300 Subject: [PATCH 3/3] Add databases and tables --- crates/catalog/src/models.rs | 4 ++-- crates/nexus/Cargo.toml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/catalog/src/models.rs b/crates/catalog/src/models.rs index cc5ace624..321b91692 100644 --- a/crates/catalog/src/models.rs +++ b/crates/catalog/src/models.rs @@ -128,14 +128,14 @@ impl TableRequirementExt { } } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct Table { pub ident: TableIdent, pub metadata: TableMetadata, pub metadata_location: String, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] pub struct TableIdent { pub database: DatabaseIdent, pub table: String, diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index 707a3763f..11d00f74f 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -31,6 +31,7 @@ tracing = { version = "0.1" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } hyper = "1.5.0" http-body-util = "0.1.0" +anyhow = "1.0.89" [dev-dependencies] tower = { workspace = true }