From bfa8b5b1a410bc7ea60ea876a3fbe8cbb87322f9 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 23 Oct 2024 15:45:05 +0300 Subject: [PATCH] Implement warehouses and profiles --- .../nexus/src/http/ui/handlers/databases.rs | 2 +- crates/nexus/src/http/ui/handlers/profiles.rs | 48 ++++---- crates/nexus/src/http/ui/handlers/tables.rs | 4 +- .../nexus/src/http/ui/handlers/warehouses.rs | 89 +++++++++----- crates/nexus/src/http/ui/mod.rs | 4 +- crates/nexus/src/http/ui/models/aws.rs | 116 ++++++++++++++++-- crates/nexus/src/http/ui/models/errors.rs | 3 +- .../src/http/ui/models/storage_profile.rs | 41 ++++++- crates/nexus/src/http/ui/models/table.rs | 2 +- crates/nexus/src/http/ui/models/warehouse.rs | 58 ++++++--- 10 files changed, 272 insertions(+), 95 deletions(-) diff --git a/crates/nexus/src/http/ui/handlers/databases.rs b/crates/nexus/src/http/ui/handlers/databases.rs index 7cdf9975b..5ed5c7042 100644 --- a/crates/nexus/src/http/ui/handlers/databases.rs +++ b/crates/nexus/src/http/ui/handlers/databases.rs @@ -96,7 +96,7 @@ pub async fn get_database( created_at: Default::default(), updated_at: Default::default(), storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::S3, + r#type: aws::CloudProvider::AWS, region: "".to_string(), bucket: "".to_string(), credentials: Default::default(), diff --git a/crates/nexus/src/http/ui/handlers/profiles.rs b/crates/nexus/src/http/ui/handlers/profiles.rs index 7ff37415d..fa9325249 100644 --- a/crates/nexus/src/http/ui/handlers/profiles.rs +++ b/crates/nexus/src/http/ui/handlers/profiles.rs @@ -2,6 +2,7 @@ use crate::error::AppError; use crate::http::ui::models::{aws, storage_profile}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use control_plane::models::{StorageProfile, StorageProfileCreateRequest}; use utoipa::OpenApi; use uuid::Uuid; @@ -44,17 +45,13 @@ pub async fn create_storage_profile( State(state): State, Json(payload): Json, ) -> Result, AppError> { - Ok(Json(storage_profile::StorageProfile { - r#type: aws::CloudProvider::S3, - region: "2".to_string(), - bucket: "".to_string(), - credentials: Default::default(), - sts_role_arn: None, - endpoint: None, - id: Default::default(), - created_at: Default::default(), - updated_at: Default::default(), - })) + let request: StorageProfileCreateRequest = payload.into(); + let profile: StorageProfile = state + .control_svc + .create_profile(&request) + .await + .map_err(|e| AppError::from(e))?; + Ok(Json(profile.into())) } #[utoipa::path( @@ -71,19 +68,14 @@ pub async fn create_storage_profile( )] pub async fn get_storage_profile( State(state): State, - Path(id): Path, + Path(storage_profile_id): Path, ) -> Result, AppError> { - Ok(Json(storage_profile::StorageProfile { - r#type: aws::CloudProvider::S3, - region: "1".to_string(), - bucket: "".to_string(), - credentials: Default::default(), - sts_role_arn: None, - endpoint: None, - id: Default::default(), - created_at: Default::default(), - updated_at: Default::default(), - })) + let profile: StorageProfile = state + .control_svc + .get_profile(storage_profile_id) + .await + .map_err(|e| AppError::from(e))?; + Ok(Json(profile.into())) } #[utoipa::path( @@ -100,8 +92,13 @@ pub async fn get_storage_profile( )] pub async fn delete_storage_profile( State(state): State, - Path(id): Path, + Path(storage_profile_id): Path, ) -> Result, AppError> { + state + .control_svc + .delete_profile(storage_profile_id) + .await + .map_err(|e| AppError::from(e))?; Ok(Json(())) } @@ -117,5 +114,6 @@ pub async fn delete_storage_profile( pub async fn list_storage_profiles( State(state): State, ) -> Result>, AppError> { - Ok(Json(vec![])) + let profiles = state.control_svc.list_profiles().await?; + Ok(Json(profiles.into_iter().map(|p| p.into()).collect())) } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index d42fde963..a4e32b2c7 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -90,7 +90,7 @@ pub async fn get_table( statistics: None, compaction_summary: None, storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::S3, + r#type: aws::CloudProvider::AWS, region: "22".to_string(), bucket: "2".to_string(), credentials: Default::default(), @@ -155,7 +155,7 @@ pub async fn create_table( statistics: None, compaction_summary: None, storage_profile: storage_profile::StorageProfile { - r#type: aws::CloudProvider::S3, + r#type: aws::CloudProvider::AWS, region: "2".to_string(), bucket: "2".to_string(), credentials: Default::default(), diff --git a/crates/nexus/src/http/ui/handlers/warehouses.rs b/crates/nexus/src/http/ui/handlers/warehouses.rs index 7aa69072c..733423fa6 100644 --- a/crates/nexus/src/http/ui/handlers/warehouses.rs +++ b/crates/nexus/src/http/ui/handlers/warehouses.rs @@ -2,6 +2,7 @@ use crate::error::AppError; use crate::http::ui::models::warehouse; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use control_plane::models::{Warehouse as WarehouseModel, WarehouseCreateRequest}; use utoipa::OpenApi; use uuid::Uuid; @@ -31,14 +32,33 @@ pub struct ApiDoc; path = "/ui/warehouses", operation_id = "webWarehousesDashboard", responses( - (status = 200, description = "List all warehouses", body = Vec), + (status = 200, description = "List all warehouses", body = warehouse::WarehousesDashboard), (status = 500, description = "Internal server error") ) )] pub async fn list_warehouses( State(state): State, -) -> Result>, AppError> { - Ok(Json(vec![])) +) -> Result, AppError> { + let warehouses = state.control_svc.list_warehouses().await?; + let storage_profiles = state.control_svc.list_profiles().await?; + let mut extended_warehouses = Vec::new(); + + for warehouse in warehouses { + let mut extended_warehouse = + warehouse::WarehouseExtended::new(warehouse.clone().into(), Default::default()); + if let Some(profile) = storage_profiles + .iter() + .find(|p| p.id == extended_warehouse.storage_profile_id) + { + extended_warehouse.storage_profile = profile.clone().into(); + extended_warehouses.push(extended_warehouse) + } + } + Ok(Json(warehouse::WarehousesDashboard { + warehouses: extended_warehouses, + statistics: Default::default(), + compaction_summary: None, + })) } #[utoipa::path( @@ -49,25 +69,28 @@ pub async fn list_warehouses( ("warehouseId" = Uuid, Path, description = "Warehouse ID") ), responses( - (status = 200, description = "Warehouse found", body = warehouse::Warehouse), + (status = 200, description = "Warehouse found", body = warehouse::WarehouseExtended), (status = 404, description = "Warehouse not found") ) )] pub async fn get_warehouse( State(state): State, Path(warehouse_id): Path, -) -> Result, AppError> { - // let warehouse = state.warehouse_service.get_warehouse(warehouse_id).await?; - Ok(Json(warehouse::Warehouse { - name: "".to_string(), - storage_profile_id: Default::default(), - key_prefix: "key".to_string(), - id: Default::default(), - external_id: Default::default(), - location: "".to_string(), - created_at: Default::default(), - updated_at: Default::default(), - })) +) -> Result, AppError> { + let warehouse = state.control_svc.get_warehouse(warehouse_id).await?; + + let mut extended_warehouse = + warehouse::WarehouseExtended::new(warehouse.into(), Default::default()); + + if let Ok(profile) = state + .control_svc + .get_profile(extended_warehouse.storage_profile_id) + .await + { + extended_warehouse.storage_profile = profile.into(); + } + + Ok(Json(extended_warehouse)) } #[utoipa::path( @@ -85,22 +108,25 @@ pub async fn create_warehouse( State(state): State, Json(payload): Json, ) -> Result, AppError> { - Ok(Json(warehouse::Warehouse { - name: "".to_string(), - storage_profile_id: Default::default(), - key_prefix: "".to_string(), - id: Default::default(), - external_id: Default::default(), - location: "".to_string(), - created_at: Default::default(), - updated_at: Default::default(), - })) + let request: WarehouseCreateRequest = payload.into(); + + state + .control_svc + .get_profile(request.storage_profile_id) + .await + .map_err(|e| AppError::from(e))?; + let warehouse: WarehouseModel = state + .control_svc + .create_warehouse(&request) + .await + .map_err(|e| AppError::from(e))?; + Ok(Json(warehouse.into())) } #[utoipa::path( delete, path = "/ui/warehouses/{warehouseId}", - operation_id = "webCeleteWarehouse", + operation_id = "webCreteWarehouse", params( ("warehouseId" = Uuid, Path, description = "Warehouse ID") ), @@ -112,6 +138,11 @@ pub async fn create_warehouse( pub async fn delete_warehouse( State(state): State, Path(warehouse_id): Path, -) -> Result<(), AppError> { - Ok(()) +) -> Result, AppError> { + state + .control_svc + .delete_warehouse(warehouse_id) + .await + .map_err(|e| AppError::from(e))?; + Ok(Json(())) } diff --git a/crates/nexus/src/http/ui/mod.rs b/crates/nexus/src/http/ui/mod.rs index 0b03afec4..e260adaa0 100644 --- a/crates/nexus/src/http/ui/mod.rs +++ b/crates/nexus/src/http/ui/mod.rs @@ -1,3 +1,3 @@ -pub mod router; -pub mod models; pub mod handlers; +pub mod models; +pub mod router; diff --git a/crates/nexus/src/http/ui/models/aws.rs b/crates/nexus/src/http/ui/models/aws.rs index 13e424da2..40993b1e5 100644 --- a/crates/nexus/src/http/ui/models/aws.rs +++ b/crates/nexus/src/http/ui/models/aws.rs @@ -1,3 +1,4 @@ +use control_plane::models; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use validator::Validate; @@ -20,6 +21,23 @@ impl AwsAccessKeyCredential { } } +impl From for models::AwsAccessKeyCredential { + fn from(credential: AwsAccessKeyCredential) -> Self { + models::AwsAccessKeyCredential { + aws_access_key_id: credential.aws_access_key_id, + aws_secret_access_key: credential.aws_secret_access_key, + } + } +} +impl From for AwsAccessKeyCredential { + fn from(credential: models::AwsAccessKeyCredential) -> Self { + AwsAccessKeyCredential { + aws_access_key_id: credential.aws_access_key_id, + aws_secret_access_key: credential.aws_secret_access_key, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct AwsRoleCredential { #[validate(length(min = 1))] @@ -38,26 +56,108 @@ impl AwsRoleCredential { } } +impl From for models::AwsRoleCredential { + fn from(credential: AwsRoleCredential) -> Self { + models::AwsRoleCredential { + role_arn: credential.role_arn, + external_id: credential.external_id, + } + } +} +impl From for AwsRoleCredential { + fn from(credential: models::AwsRoleCredential) -> Self { + AwsRoleCredential { + role_arn: credential.role_arn, + external_id: credential.external_id, + } + } +} + #[derive( - Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema, + Debug, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Serialize, + Deserialize, + Hash, + Default, + ToSchema, )] pub enum CloudProvider { - #[serde(rename = "s3")] - S3, + #[serde(rename = "aws")] + #[default] + AWS, #[serde(rename = "gcs")] - Gcs, + GCS, #[serde(rename = "azure")] - Azure, + AZURE, +} + +impl From for CloudProvider { + fn from(provider: models::CloudProvider) -> Self { + match provider { + models::CloudProvider::AWS => CloudProvider::AWS, + models::CloudProvider::GCS => CloudProvider::GCS, + models::CloudProvider::AZURE => CloudProvider::AZURE, + } + } +} + +impl From for models::CloudProvider { + fn from(provider: CloudProvider) -> Self { + match provider { + CloudProvider::AWS => models::CloudProvider::AWS, + CloudProvider::GCS => models::CloudProvider::GCS, + CloudProvider::AZURE => models::CloudProvider::AZURE, + } + } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] pub enum Credentials { - AwsAccessKeyCredential(AwsAccessKeyCredential), - AwsRoleCredential(AwsRoleCredential), + #[serde(rename = "access_key")] + AccessKey(AwsAccessKeyCredential), + #[serde(rename = "role")] + Role(AwsRoleCredential), } impl Default for Credentials { fn default() -> Self { - Credentials::AwsAccessKeyCredential(AwsAccessKeyCredential::default()) + Credentials::AccessKey(AwsAccessKeyCredential::default()) + } +} +impl From for Credentials { + fn from(credentials: models::Credentials) -> Self { + match credentials { + models::Credentials::AccessKey(creds) => Credentials::AccessKey( + AwsAccessKeyCredential::new(creds.aws_access_key_id, creds.aws_secret_access_key), + ), + models::Credentials::Role(creds) => { + Credentials::Role(AwsRoleCredential::new(creds.role_arn, creds.external_id)) + } + } + } +} + +impl From for models::Credentials { + fn from(credentials: Credentials) -> Self { + match credentials { + Credentials::AccessKey(creds) => models::Credentials::AccessKey( + models::AwsAccessKeyCredential { + aws_access_key_id: creds.aws_access_key_id, + aws_secret_access_key: creds.aws_secret_access_key, + }, + ), + Credentials::Role(creds) => { + models::Credentials::Role(models::AwsRoleCredential { + role_arn: creds.role_arn, + external_id: creds.external_id, + }) + } + } } } diff --git a/crates/nexus/src/http/ui/models/errors.rs b/crates/nexus/src/http/ui/models/errors.rs index de5c934f1..ad76cddc2 100644 --- a/crates/nexus/src/http/ui/models/errors.rs +++ b/crates/nexus/src/http/ui/models/errors.rs @@ -27,7 +27,7 @@ impl IntoResponse for Error { fn into_response(self) -> Response { let (status, error_message) = match self { Error::NotFound => (StatusCode::NOT_FOUND, self.to_string()), - Error::DbError(ref e) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + Error::DbError(ref _e) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), Error::InternalServerError => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), Error::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg), _ => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), @@ -36,7 +36,6 @@ impl IntoResponse for Error { (status, error_message).into_response() } } - impl From for Error { fn from(e: utils::Error) -> Self { match e { diff --git a/crates/nexus/src/http/ui/models/storage_profile.rs b/crates/nexus/src/http/ui/models/storage_profile.rs index d1a382170..abeb3ee2b 100644 --- a/crates/nexus/src/http/ui/models/storage_profile.rs +++ b/crates/nexus/src/http/ui/models/storage_profile.rs @@ -1,4 +1,6 @@ use crate::http::ui::models::aws::{CloudProvider, Credentials}; +use chrono::{DateTime, Utc}; +use control_plane::models; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use validator::Validate; @@ -37,7 +39,20 @@ impl CreateStorageProfilePayload { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +impl From for models::StorageProfileCreateRequest { + fn from(payload: CreateStorageProfilePayload) -> Self { + models::StorageProfileCreateRequest { + r#type: payload.r#type.into(), + region: payload.region, + bucket: payload.bucket, + credentials: payload.credentials.into(), + sts_role_arn: payload.sts_role_arn, + endpoint: payload.endpoint, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, Default, ToSchema)] pub struct StorageProfile { #[serde(rename = "type")] pub r#type: CloudProvider, @@ -51,8 +66,8 @@ pub struct StorageProfile { #[serde(skip_serializing_if = "Option::is_none")] pub endpoint: Option, pub id: uuid::Uuid, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, + pub created_at: DateTime, + pub updated_at: DateTime, } impl StorageProfile { @@ -63,8 +78,8 @@ impl StorageProfile { bucket: String, credentials: Credentials, id: uuid::Uuid, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, + created_at: DateTime, + updated_at: DateTime, ) -> StorageProfile { StorageProfile { r#type, @@ -79,3 +94,19 @@ impl StorageProfile { } } } + +impl From for StorageProfile { + fn from(profile: models::StorageProfile) -> Self { + StorageProfile { + r#type: profile.r#type.into(), + region: profile.region, + bucket: profile.bucket, + credentials: profile.credentials.into(), + sts_role_arn: profile.sts_role_arn, + endpoint: profile.endpoint, + id: profile.id, + created_at: DateTime::from_naive_utc_and_offset(profile.created_at, Utc), + updated_at: DateTime::from_naive_utc_and_offset(profile.updated_at, Utc), + } + } +} diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index 24281b5b7..538097624 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -522,7 +522,7 @@ impl TableMetadataV1 { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default, Validate, ToSchema)] pub struct Statistics { pub commit_count: i32, pub op_append_count: i32, diff --git a/crates/nexus/src/http/ui/models/warehouse.rs b/crates/nexus/src/http/ui/models/warehouse.rs index 7999fd161..bb5c21a7f 100644 --- a/crates/nexus/src/http/ui/models/warehouse.rs +++ b/crates/nexus/src/http/ui/models/warehouse.rs @@ -3,6 +3,8 @@ use crate::http::ui::models::database::{CompactionSummary, DatabaseEntity, DatabaseShort}; use crate::http::ui::models::storage_profile::StorageProfile; use crate::http::ui::models::table::Statistics; +use chrono::{DateTime, Utc}; +use control_plane::models; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use validator::Validate; @@ -43,12 +45,23 @@ impl CreateWarehousePayload { } } +impl From for models::WarehouseCreateRequest { + fn from(payload: CreateWarehousePayload) -> Self { + models::WarehouseCreateRequest { + prefix: payload.key_prefix, + name: payload.name, + storage_profile_id: payload.storage_profile_id, + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct Warehouse { #[validate(length(min = 1))] pub name: String, pub storage_profile_id: uuid::Uuid, #[validate(length(min = 1))] + #[serde(rename = "prefix")] pub key_prefix: String, pub id: uuid::Uuid, pub external_id: uuid::Uuid, @@ -82,6 +95,21 @@ impl Warehouse { } } +impl From for Warehouse { + fn from(warehouse: control_plane::models::Warehouse) -> Self { + Warehouse { + id: warehouse.id, + key_prefix: warehouse.prefix, + name: warehouse.name, + location: warehouse.location, + storage_profile_id: warehouse.storage_profile_id, + created_at: DateTime::from_naive_utc_and_offset(warehouse.created_at, Utc), + updated_at: DateTime::from_naive_utc_and_offset(warehouse.updated_at, Utc), + external_id: Default::default(), + } + } +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct WarehouseDashboard { #[validate(length(min = 1))] @@ -196,26 +224,16 @@ pub struct WarehouseExtended { impl WarehouseExtended { #[allow(clippy::new_without_default)] - pub fn new( - name: String, - storage_profile_id: uuid::Uuid, - key_prefix: String, - id: uuid::Uuid, - external_id: uuid::Uuid, - location: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - storage_profile: StorageProfile, - ) -> WarehouseExtended { + pub fn new(warehouse: Warehouse, storage_profile: StorageProfile) -> WarehouseExtended { WarehouseExtended { - name, - storage_profile_id, - key_prefix, - id, - external_id, - location, - created_at, - updated_at, + id: warehouse.id, + key_prefix: warehouse.key_prefix, + name: warehouse.name, + location: warehouse.location, + storage_profile_id: warehouse.storage_profile_id, + created_at: warehouse.created_at, + updated_at: warehouse.updated_at, + external_id: warehouse.external_id, statistics: None, compaction_summary: None, storage_profile, @@ -241,7 +259,7 @@ impl WarehouseShort { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, Default, ToSchema)] pub struct WarehousesDashboard { pub warehouses: Vec, pub statistics: Statistics,