Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added crates/.DS_Store
Binary file not shown.
Binary file added crates/nexus/.DS_Store
Binary file not shown.
Binary file added crates/nexus/src/.DS_Store
Binary file not shown.
1 change: 0 additions & 1 deletion crates/nexus/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ pub mod control {
pub mod ui {
pub mod handlers;
pub mod models;
pub mod models;
}
59 changes: 59 additions & 0 deletions crates/nexus/src/http/ui/handlers/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use crate::http::ui::models::database::Database;
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::storage_profile::StorageProfile;
use crate::http::ui::models::warehouse::Warehouse;
use crate::state::AppState;
use catalog::models::{DatabaseIdent, Table};
use control_plane::models::Warehouse as WarehouseModel;
use uuid::Uuid;

impl AppState {
pub async fn get_warehouse_model(
&self,
warehouse_id: Uuid,
) -> Result<WarehouseModel, AppError> {
self.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})
}
pub async fn get_warehouse_by_id(&self, warehouse_id: Uuid) -> Result<Warehouse, AppError> {
self.get_warehouse_model(warehouse_id).await.map(|warehouse| warehouse.into())
}

pub async fn get_profile_by_id(
&self,
storage_profile_id: Uuid,
) -> Result<StorageProfile, AppError> {
self.control_svc
.get_profile(storage_profile_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get profile by id {}", e, storage_profile_id);
AppError::new(e, fmt.as_str())
}).map(|profile| profile.into())
}

pub async fn get_database_by_ident(&self, ident: &DatabaseIdent) -> Result<Database, AppError> {
self.catalog_svc
.get_namespace(ident)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get database with db ident {}", e, &ident);
AppError::new(e, fmt.as_str())
}).map(|database| database.into())
}

pub async fn list_tables(&self, ident: &DatabaseIdent) -> Result<Vec<Table>, AppError> {
self.catalog_svc.list_tables(ident).await.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})
}
}
51 changes: 7 additions & 44 deletions crates/nexus/src/http/ui/handlers/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,7 @@ pub async fn create_database(
Path(warehouse_id): Path<Uuid>,
Json(payload): Json<database::CreateDatabasePayload>,
) -> Result<Json<database::Database>, AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(payload.name),
Expand Down Expand Up @@ -130,48 +123,18 @@ pub async fn get_database(
State(state): State<AppState>,
Path((warehouse_id, database_name)): Path<(Uuid, String)>,
) -> Result<Json<database::DatabaseDashboard>, AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let profile = state
.control_svc
.get_profile(warehouse.storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get profile by id {}",
e, warehouse.storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let profile = state.get_profile_by_id(warehouse.storage_profile_id).await?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(database_name),
};
let database = state.catalog_svc.get_namespace(&ident).await.map_err(|e| {
let fmt = format!(
"{}: failed to get database with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})?;
let tables = state.catalog_svc.list_tables(&ident).await
.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})?;
let database = state.get_database_by_ident(&ident).await?;
let tables = state.catalog_svc.list_tables(&ident).await?;
Ok(Json(DatabaseDashboard {
name: database.ident.to_string(),
name: ident.namespace.first().unwrap().to_string(),
properties: Option::from(database.properties),
id: get_database_id(database.ident),
id: get_database_id(ident),
warehouse_id,
warehouse: WarehouseEntity::new(warehouse.into(), profile.into()),
tables: tables
Expand Down
2 changes: 2 additions & 0 deletions crates/nexus/src/http/ui/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ pub mod databases;
pub mod profiles;
pub mod tables;
pub mod warehouses;

pub mod common;
12 changes: 1 addition & 11 deletions crates/nexus/src/http/ui/handlers/profiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,7 @@ pub async fn get_storage_profile(
State(state): State<AppState>,
Path(storage_profile_id): Path<Uuid>,
) -> Result<Json<storage_profile::StorageProfile>, AppError> {
let profile: StorageProfile = state
.control_svc
.get_profile(storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get storage profile with id {}",
e, storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
let profile = state.get_profile_by_id(storage_profile_id).await?;
Ok(Json(profile.into()))
}

Expand Down
59 changes: 16 additions & 43 deletions crates/nexus/src/http/ui/handlers/tables.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::table;
use crate::http::ui::models::table::TableQueryRequest;
use crate::http::ui::models::table::{Table, TableCreateRequest, TableExtended};
use crate::http::ui::models::table::{
Table, TableCreateRequest, TableExtended, TableQueryResponse,
};
use crate::state::AppState;
use axum::{extract::Path, extract::State, Json};
use catalog::models::{DatabaseIdent, TableIdent, WarehouseIdent};
Expand All @@ -19,7 +21,11 @@ use uuid::Uuid;
),
components(
schemas(
table::TableQueryResponse,
TableQueryResponse,
TableQueryRequest,
TableExtended,
TableCreateRequest,
Table,
AppError,
)
),
Expand Down Expand Up @@ -49,34 +55,15 @@ pub async fn get_table(
State(state): State<AppState>,
Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>,
) -> Result<Json<TableExtended>, AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let profile = state
.control_svc
.get_profile(warehouse.storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get profile by id {}",
e, warehouse.storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
.get_profile_by_id(warehouse.storage_profile_id)
.await?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(database_name),
};
let database = state.catalog_svc.get_namespace(&ident).await.map_err(|e| {
let fmt = format!("{}: failed to get database with db ident {}", e, &ident);
AppError::new(e, fmt.as_str())
})?;

let database = state.get_database_by_ident(&ident).await?;
let table_ident = TableIdent {
database: ident,
table: table_name,
Expand Down Expand Up @@ -116,14 +103,7 @@ pub async fn create_table(
Path((warehouse_id, database_name)): Path<(Uuid, String)>,
Json(payload): Json<TableCreateRequest>,
) -> Result<Json<Table>, AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_model(warehouse_id).await?;
let db_ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(database_name),
Expand Down Expand Up @@ -158,14 +138,7 @@ pub async fn delete_table(
State(state): State<AppState>,
Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>,
) -> Result<(), AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let table_ident = TableIdent {
database: DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
Expand All @@ -187,15 +160,15 @@ pub async fn delete_table(
#[utoipa::path(
post,
path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query",
request_body = table::TableQueryRequest,
request_body = TableQueryRequest,
operation_id = "webTableQuery",
params(
("warehouseId" = Uuid, Path, description = "Warehouse ID"),
("databaseName" = Uuid, Path, description = "Database Name"),
("tableName" = Uuid, Path, description = "Table name")
),
responses(
(status = 200, description = "Returns result of the query", body = Vec<table::TableQueryResponse>),
(status = 200, description = "Returns result of the query", body = TableQueryResponse),
(status = 500, description = "Internal server error")
)
)]
Expand Down
55 changes: 10 additions & 45 deletions crates/nexus/src/http/ui/handlers/warehouses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,7 @@ pub async fn navigation(
let mut databases_short = Vec::new();

for database in databases {
let tables = state
.catalog_svc
.list_tables(&database.ident)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &database.ident
);
AppError::new(e, fmt.as_str())
})?;
let tables = state.catalog_svc.list_tables(&database.ident).await?;
let ident = database.ident.clone();
databases_short.push(DatabaseShort {
id: get_database_id(database.ident),
Expand Down Expand Up @@ -110,7 +100,7 @@ pub async fn navigation(
operation_id = "webWarehousesDashboard",
responses(
(status = 200, description = "List all warehouses", body = warehouse::WarehousesDashboard),
(status = 500, description = "List all warehouses", body = AppError),
(status = 500, description = "List all warehouses error", body = AppError),

)
)]
Expand Down Expand Up @@ -161,25 +151,10 @@ pub async fn get_warehouse(
State(state): State<AppState>,
Path(warehouse_id): Path<Uuid>,
) -> Result<Json<warehouse::WarehouseDashboard>, AppError> {
let warehouse = state
.control_svc
.get_warehouse(warehouse_id)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouse by id {}", e, warehouse_id);
AppError::new(e, fmt.as_str())
})?;
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let profile = state
.control_svc
.get_profile(warehouse.storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get profile by id {}",
e, warehouse.storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
.get_profile_by_id(warehouse.storage_profile_id)
.await?;
let databases = state
.catalog_svc
.list_namespaces(&WarehouseIdent::new(warehouse.id), None)
Expand Down Expand Up @@ -215,26 +190,16 @@ pub async fn get_warehouse(
operation_id = "webCreateWarehouse",
responses(
(status = 201, description = "Warehouse created", body = warehouse::Warehouse),
(status = 422, description = "Unprocessable Entity"),
(status = 500, description = "Internal server error")
(status = 422, description = "Unprocessable Entity", body = AppError),
(status = 500, description = "Internal server error", body = AppError)
)
)]
pub async fn create_warehouse(
State(state): State<AppState>,
Json(payload): Json<warehouse::CreateWarehousePayload>,
) -> Result<Json<warehouse::Warehouse>, AppError> {
let request: WarehouseCreateRequest = payload.into();
state
.control_svc
.get_profile(request.storage_profile_id)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get profile with id {}",
e, request.storage_profile_id
);
AppError::new(e, fmt.as_str())
})?;
state.get_profile_by_id(request.storage_profile_id).await?;
let warehouse: WarehouseModel =
state
.control_svc
Expand All @@ -253,13 +218,13 @@ pub async fn create_warehouse(
#[utoipa::path(
delete,
path = "/ui/warehouses/{warehouseId}",
operation_id = "webCreteWarehouse",
operation_id = "webCreateWarehouse",
params(
("warehouseId" = Uuid, Path, description = "Warehouse ID")
),
responses(
(status = 204, description = "Warehouse deleted"),
(status = 404, description = "Warehouse not found")
(status = 404, description = "Warehouse not found", body = AppError)
)
)]
pub async fn delete_warehouse(
Expand Down
Loading