Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/catalog/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ impl TableRequirementExt {
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Table {
pub ident: TableIdent,
pub metadata: TableMetadata,
pub metadata_location: String,
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct TableIdent {
pub database: DatabaseIdent,
pub table: String,
Expand Down
3 changes: 2 additions & 1 deletion crates/nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ tokio = { workspace = true }
control_plane = { path = "../control_plane" }
catalog = { path = "../catalog" }
chrono = { workspace = true }
uuid = { workspace = true }
uuid = { workspace = true, features = ["v4", "v5"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
serde = { workspace = true }
Expand All @@ -31,6 +31,7 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
hyper = "1.5.0"
http-body-util = "0.1.0"
anyhow = "1.0.89"

[dev-dependencies]
tower = { workspace = true }
Expand Down
192 changes: 129 additions & 63 deletions crates/nexus/src/http/ui/handlers/databases.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::error::AppError;
use crate::http::ui::models::aws;
use crate::http::ui::models::database;
use crate::http::ui::models::storage_profile;
use crate::http::ui::models::table::Statistics;
use crate::http::ui::models::warehouse;
use crate::http::ui::models::database::{get_database_id, DatabaseDashboard};
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::table::{get_table_id, TableEntity};
use crate::http::ui::models::warehouse::WarehouseEntity;
use crate::state::AppState;
use axum::{extract::Path, extract::State, Json};
use catalog::models::{DatabaseIdent, WarehouseIdent};
use iceberg::NamespaceIdent;
use swagger;
use utoipa::OpenApi;
use uuid::Uuid;
Expand All @@ -20,7 +21,12 @@ use uuid::Uuid;
components(
schemas(
database::CreateDatabasePayload,
database::Database
database::Database,
database::DatabaseDashboard,
database::DatabaseEntity,
database::DatabaseExtended,
database::DatabaseShort,
AppError,
)
),
tags(
Expand All @@ -33,96 +39,156 @@ pub struct ApiDoc;
post,
path = "/ui/warehouses/{warehouseId}/databases",
operation_id = "webCreateDatabase",
params(
("warehouseId" = Uuid, description = "Warehouse ID"),
),
request_body = database::CreateDatabasePayload,
responses(
(status = 200, description = "Successful Response", body = database::Database),
(status = 400, description = "Bad request"),
(status = 500, description = "Internal server error")
(status = 400, description = "Bad request", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
(status = 500, description = "Internal server error", body = AppError)
)
)]
pub async fn create_database(
State(state): State<AppState>,
Path(payload): Path<database::CreateDatabasePayload>,
Path(warehouse_id): Path<Uuid>,
Json(payload): Json<database::CreateDatabasePayload>,
) -> Result<Json<database::Database>, AppError> {
Ok(Json(database::Database {
name: "".to_string(),
properties: None,
id: Default::default(),
warehouse_id: Default::default(),
}))
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(payload.name),
};
let res = state
.catalog_svc
.create_namespace(&ident, payload.properties.unwrap_or_default())
.await
.map_err(|e| {
let fmt = format!("{}: failed to create database with ident {}", e, &ident);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(res.into()))
}

#[utoipa::path(
delete,
path = "/ui/warehouses/{warehouseId}/databases/{databaseName}",
operation_id = "webDeleteDatabase",
params(
("warehouseId" = Uuid, description = "Warehouse ID"),
("databaseName" = String, description = "Database Name"),
),
responses(
(status = 204, description = "Successful Response"),
(status = 404, description = "Database not found")
(status = 404, description = "Database not found", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
)
)]
pub async fn delete_database(
State(state): State<AppState>,
Path((warehouse_id, database_name)): Path<(Uuid, String)>,
) -> Result<(), AppError> {
Ok(())
) -> Result<Json<()>, AppError> {
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse_id),
namespace: NamespaceIdent::new(database_name),
};

state
.catalog_svc
.drop_namespace(&ident)
.await
.map_err(|e| {
let fmt = format!("{}: failed to delete database with ident {}", e, &ident);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(()))
}

#[utoipa::path(
get,
path = "/ui/warehouses/{warehouseId}/databases/{databaseName}",
params(
("warehouseId" = Uuid, description = "Warehouse ID"),
("databaseName" = String, description = "Database Name"),
),
operation_id = "webDatabaseDashboard",
responses(
(status = 200, description = "Successful Response", body = database::DatabaseDashboard),
(status = 204, description = "Successful Response"),
(status = 404, description = "Database not found")
(status = 200, description = "Successful Response", body = DatabaseDashboard),
(status = 404, description = "Database not found", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
)
)]
pub async fn get_database(
State(state): State<AppState>,
Path((warehouse_id, database_name)): Path<(Uuid, String)>,
) -> Result<Json<database::DatabaseDashboard>, AppError> {
Ok(Json(database::DatabaseDashboard {
name: "".to_string(),
properties: None,
id: Default::default(),
warehouse_id: Default::default(),
warehouse: warehouse::WarehouseEntity {
name: "".to_string(),
storage_profile_id: Default::default(),
key_prefix: "".to_string(),
id: Default::default(),
external_id: Default::default(),
location: "".to_string(),
created_at: Default::default(),
updated_at: Default::default(),
storage_profile: storage_profile::StorageProfile {
r#type: aws::CloudProvider::AWS,
region: "".to_string(),
bucket: "".to_string(),
credentials: Default::default(),
sts_role_arn: None,
endpoint: None,
id: Default::default(),
created_at: Default::default(),
updated_at: Default::default(),
},
},
tables: vec![],
statistics: Statistics {
commit_count: 0,
op_append_count: 0,
op_overwrite_count: 0,
op_delete_count: 0,
op_replace_count: 0,
total_bytes: 0,
bytes_added: 0,
bytes_removed: 0,
total_rows: 0,
rows_added: 0,
rows_deleted: 0,
table_count: None,
database_count: None,
},
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let profile = state
.control_svc
.get_profile(warehouse.storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get profile by id {}",
e, warehouse.storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(database_name),
};
let database = state.catalog_svc.get_namespace(&ident).await.map_err(|e| {
let fmt = format!(
"{}: failed to get database with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})?;
let tables = state.catalog_svc.list_tables(&ident).await
.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(DatabaseDashboard {
name: database.ident.to_string(),
properties: Option::from(database.properties),
id: get_database_id(database.ident),
warehouse_id,
warehouse: WarehouseEntity::new(warehouse.into(), profile.into()),
tables: tables
.into_iter()
.map(|t| {
let ident = t.ident.clone();
TableEntity {
id: get_table_id(t.ident),
name: ident.table,
created_at: Default::default(),
updated_at: Default::default(),
statistics: Default::default(),
compaction_summary: None,
}
})
.collect(),
statistics: Default::default(),
compaction_summary: None,
}))
}
51 changes: 37 additions & 14 deletions crates/nexus/src/http/ui/handlers/profiles.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::AppError;
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::{aws, storage_profile};
use crate::state::AppState;
use axum::{extract::Path, extract::State, Json};
Expand All @@ -22,6 +22,7 @@ use uuid::Uuid;
aws::AwsAccessKeyCredential,
aws::AwsRoleCredential,
aws::CloudProvider,
AppError,
)
),
tags(
Expand All @@ -37,20 +38,25 @@ pub struct ApiDoc;
request_body = storage_profile::CreateStorageProfilePayload,
responses(
(status = 200, description = "Successful Response", body = storage_profile::StorageProfile),
(status = 400, description = "Bad request"),
(status = 500, description = "Internal server error")
(status = 400, description = "Bad request", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
(status = 500, description = "Internal server error", body = AppError)
)
)]
pub async fn create_storage_profile(
State(state): State<AppState>,
Json(payload): Json<storage_profile::CreateStorageProfilePayload>,
) -> Result<Json<storage_profile::StorageProfile>, AppError> {
let request: StorageProfileCreateRequest = payload.into();
let profile: StorageProfile = state
.control_svc
.create_profile(&request)
.await
.map_err(|e| AppError::from(e))?;
let profile: StorageProfile =
state
.control_svc
.create_profile(&request)
.await
.map_err(|e| {
let fmt = format!("{}: failed to create storage profile", e);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(profile.into()))
}

Expand All @@ -63,7 +69,8 @@ pub async fn create_storage_profile(
),
responses(
(status = 200, description = "Successful Response", body = storage_profile::StorageProfile),
(status = 404, description = "Not found"),
(status = 404, description = "Not found", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
)
)]
pub async fn get_storage_profile(
Expand All @@ -74,7 +81,13 @@ pub async fn get_storage_profile(
.control_svc
.get_profile(storage_profile_id)
.await
.map_err(|e| AppError::from(e))?;
.map_err(|e| {
let fmt = format!(
"{}: failed to get storage profile with id {}",
e, storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(profile.into()))
}

Expand All @@ -87,7 +100,8 @@ pub async fn get_storage_profile(
),
responses(
(status = 200, description = "Successful Response", body = storage_profile::StorageProfile),
(status = 404, description = "Not found"),
(status = 404, description = "Not found", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
)
)]
pub async fn delete_storage_profile(
Expand All @@ -98,7 +112,13 @@ pub async fn delete_storage_profile(
.control_svc
.delete_profile(storage_profile_id)
.await
.map_err(|e| AppError::from(e))?;
.map_err(|e| {
let fmt = format!(
"{}: failed to delete storage profile with id {}",
e, storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(()))
}

Expand All @@ -108,12 +128,15 @@ pub async fn delete_storage_profile(
path = "/ui/storage-profiles/",
responses(
(status = 200, body = Vec<storage_profile::StorageProfile>),
(status = 500, description = "Internal server error")
(status = 500, description = "Internal server error", body = AppError)
)
)]
pub async fn list_storage_profiles(
State(state): State<AppState>,
) -> Result<Json<Vec<storage_profile::StorageProfile>>, AppError> {
let profiles = state.control_svc.list_profiles().await?;
let profiles = state.control_svc.list_profiles().await.map_err(|e| {
let fmt = format!("{}: failed to list storage profile", e);
AppError::new(e, fmt.as_str())
})?;
Ok(Json(profiles.into_iter().map(|p| p.into()).collect()))
}
Loading