From e06b8a31a50d86b306ebee398feaa11b5647713f Mon Sep 17 00:00:00 2001 From: andheroe Date: Thu, 24 Oct 2024 19:22:48 +0300 Subject: [PATCH 1/3] Add query_table endpoint and logging middleware --- Cargo.toml | 2 +- crates/control_plane/Cargo.toml | 6 +- crates/control_plane/src/service.rs | 29 +++++++ crates/nexus/Cargo.toml | 4 + crates/nexus/src/http/ui/handlers/tables.rs | 52 ++++++++++++ crates/nexus/src/http/ui/models/table.rs | 23 ++++++ crates/nexus/src/http/ui/router.rs | 6 +- crates/nexus/src/main.rs | 87 ++++++++++++++++++++- tests/.env | 16 ++-- 9 files changed, 211 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 08ac97417..dc812d595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,4 @@ tempfile = { version = "3" } utoipa = { version = "5.0.0-beta.0", features = ["uuid", "chrono"] } utoipa-axum = { version = "0.1.0-beta.2" } utoipa-swagger-ui = { version = "7.1.1-beta.0", features = ["axum"] } -lazy_static = { version = "1.5" } \ No newline at end of file +lazy_static = { version = "1.5" } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index 2224e86c6..5e4943e3a 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -12,7 +12,11 @@ thiserror = { workspace = true } utils = { path = "../utils" } futures = { workspace = true } serde = { workspace = true } +datafusion = { version = "41" } +iceberg-catalog-rest = { version = "0.3" } +iceberg-datafusion = { version = "0.3" } +arrow = { version = "52" } [dev-dependencies] slatedb = {workspace = true } -object_store = { workspace = true } \ No newline at end of file +object_store = { workspace = true } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index 04a006036..b6f9ac90c 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -5,6 +5,10 @@ use crate::repository::{StorageProfileRepository, WarehouseRepository}; use async_trait::async_trait; use std::sync::Arc; use uuid::Uuid; +use datafusion::prelude::*; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_datafusion::IcebergCatalogProvider; +use std::collections::HashMap; #[async_trait] pub trait ControlService: Send + Sync { @@ -27,6 +31,8 @@ pub trait ControlService: Send + Sync { // async fn get_table(&self, id: Uuid) -> Result; // async fn delete_table(&self, id: Uuid) -> Result<()>; // async fn list_tables(&self) -> Result>; + + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<()>; } pub struct ControlServiceImpl { @@ -94,6 +100,29 @@ impl ControlService for ControlServiceImpl { async fn list_warehouses(&self) -> Result> { self.warehouse_repo.list().await } + + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<()> { + let config = RestCatalogConfig::builder() + .uri("http://0.0.0.0:3000/catalog".to_string()) + .warehouse(warehouse_id.to_string()) + .props(HashMap::default()) + .build(); + + let catalog = RestCatalog::new(config); + let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) + .await + .unwrap(); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", Arc::new(catalog)); + + let provider = ctx.catalog("catalog").unwrap(); + let schemas = provider.schema_names(); + println!("{schemas:?}"); + assert_eq!(schemas.len(), 6); + + Ok(()) + } } #[cfg(test)] diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index 437dbd01f..4d69a325d 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -27,6 +27,10 @@ utoipa-swagger-ui = { workspace = true } swagger = { version = "6.1", features = ["serdejson", "server", "client", "tls", "tcp"] } validator = { version = "0.18.1", features = ["derive"] } thiserror = { version = "1.0.63" } +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +hyper = "1.5.0" +http-body-util = "0.1.0" [dev-dependencies] tower = { workspace = true } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index a4e32b2c7..04e97719b 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -4,6 +4,7 @@ use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; use utoipa::OpenApi; use uuid::Uuid; +use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credentials, AwsAccessKeyCredential, WarehouseCreateRequest}; #[derive(OpenApi)] #[openapi( @@ -12,10 +13,12 @@ use uuid::Uuid; delete_table, get_table, list_tables, + query_table, ), components( schemas( table::TableExtended, + table::TableQueryResult, database::Database, ) ), @@ -190,3 +193,52 @@ pub async fn delete_table( ) -> Result<(), AppError> { Ok(()) } + + +#[utoipa::path( + post, + path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query", + operation_id = "webTableQuery", + params( + ("warehouseId" = Uuid, Path, description = "Warehouse ID"), + ("databaseName" = Uuid, Path, description = "Database Name"), + ("tableName" = Uuid, Path, description = "Table name") + ), + responses( + (status = 200, description = "Returns result of the query", body = Vec), + (status = 500, description = "Internal server error") + ) +)] +pub async fn query_table( + State(state): State, + Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, +) -> Result, AppError> { + ///////// For testing only + let storage_profile_create = StorageProfileCreateRequest { + r#type: CloudProvider::AWS, + region: "us-east-2".to_string(), + bucket: "test_bucket".to_string(), + credentials: Credentials::AccessKey(AwsAccessKeyCredential { + aws_access_key_id: "test_access_key".to_string(), + aws_secret_access_key: "test_secret_access_key".to_string(), + }), + sts_role_arn: None, + endpoint: None, + }; + let storage_profile = state.control_svc.create_profile(&storage_profile_create).await?; + let warehouse_create = WarehouseCreateRequest { + prefix: "test_prefix".to_string(), + name: "test_name".to_string(), + storage_profile_id: storage_profile.id, + }; + let warehouse = state.control_svc.create_warehouse(&warehouse_create).await?; + // state.control_svc. + ///////// + let query = "dfssdf".to_string(); + let result = state.control_svc.query_table(&warehouse_id, &query).await?; + Ok(Json(table::TableQueryResult { + id: Default::default(), + query, + result: "result".to_string(), + })) +} diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index 538097624..d42725fd6 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -576,3 +576,26 @@ impl Statistics { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct WriteDefault(swagger::AnyOf6); + + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +pub struct TableQueryResult { + pub id: uuid::Uuid, + pub query: String, + pub result: String, +} + +impl crate::http::ui::models::table::TableQueryResult { + #[allow(clippy::new_without_default)] + pub fn new( + id: uuid::Uuid, + query: String, + result: String, + ) -> crate::http::ui::models::table::TableQueryResult { + crate::http::ui::models::table::TableQueryResult { + id, + query, + result, + } + } +} diff --git a/crates/nexus/src/http/ui/router.rs b/crates/nexus/src/http/ui/router.rs index a9bf0008d..ea6e282ff 100644 --- a/crates/nexus/src/http/ui/router.rs +++ b/crates/nexus/src/http/ui/router.rs @@ -2,7 +2,7 @@ use crate::http::ui::handlers::databases::{delete_database, get_database}; use crate::http::ui::handlers::profiles::{ create_storage_profile, delete_storage_profile, get_storage_profile, list_storage_profiles, }; -use crate::http::ui::handlers::tables::get_table; +use crate::http::ui::handlers::tables::{get_table, query_table}; use crate::http::ui::handlers::warehouses::{ create_warehouse, delete_warehouse, get_warehouse, list_warehouses, }; @@ -25,6 +25,10 @@ pub fn create_router() -> Router { "/warehouses/:warehouseId/databases/:databaseName/tables/:tableName", get(get_table), ) + .route( + "/warehouses/:warehouseId/databases/:databaseName/tables/:tableName/query", + post(query_table), + ) .route( "/storage-profiles", post(create_storage_profile).get(list_storage_profiles), diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index c73deb1cf..c30a880a4 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -6,6 +6,18 @@ use object_store::{memory::InMemory, path::Path, ObjectStore}; use slatedb::config::DbOptions; use slatedb::db::Db as SlateDb; use std::sync::Arc; +use axum::{ + body::{Body, Bytes}, + extract::Request, + http::StatusCode, + middleware::{self, Next}, + response::{IntoResponse, Response}, + routing::post, + Router, +}; +use axum::http::Method; +use http_body_util::BodyExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use utils::Db; @@ -61,9 +73,78 @@ async fn main() { // Create the application state let app_state = state::AppState::new(Arc::new(control_svc), Arc::new(catalog_svc)); - // Create the application router and pass the state - let app = http::router::create_app(app_state); + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + format!("{}=debug,tower_http=debug", env!("CARGO_CRATE_NAME")).into() + }), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + let app = http::router::create_app(app_state) + .layer(middleware::from_fn(print_request_response)); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000") + .await + .unwrap(); + tracing::debug!("listening on {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); + + + + // // Set up tracing to log to stdout + // let subscriber = FmtSubscriber::builder() + // .with_max_level(Level::INFO) + // .finish(); + // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + // + // // Create the application router and pass the state + // let app = http::router::create_app(app_state).layer(middleware::from_fn(log_request)); + // + // let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + // axum::serve(listener, app).await.unwrap(); +} + + +async fn print_request_response( + req: Request, + next: Next, +) -> Result { + let (req_parts, req_body) = req.into_parts(); + let method = req_parts.method.to_string(); + let uri = req_parts.uri.to_string(); + let bytes = buffer_and_print("request", &method, &uri, req_body).await?; + let req = Request::from_parts(req_parts, Body::from(bytes)); + + let res = next.run(req).await; + + let (resp_parts, resp_body) = res.into_parts(); + let bytes = buffer_and_print("response", &method, &uri, resp_body).await?; + let res = Response::from_parts(resp_parts, Body::from(bytes)); + + Ok(res) +} + +async fn buffer_and_print(direction: &str, method:&String, uri:&String, body: B) -> Result +where + B: axum::body::HttpBody, + B::Error: std::fmt::Display, +{ + let bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(err) => { + return Err(( + StatusCode::BAD_REQUEST, + format!("failed to read {direction} body: {err}"), + )); + } + }; + + if let Ok(body) = std::str::from_utf8(&bytes) { + tracing::debug!("{direction} {method} {uri} body = {body:?}"); + } + + Ok(bytes) } diff --git a/tests/.env b/tests/.env index 4b3318e71..bb502c804 100644 --- a/tests/.env +++ b/tests/.env @@ -1,8 +1,8 @@ -export MANAGEMENT_URL="http://localhost:3000" -export CATALOG_URL="http://localhost:3000/catalog" -export S3_ACCESS_KEY="access-key" -export S3_SECRET_KEY="secret-key" -export S3_BUCKET="mybucket" -export S3_ENDPOINT= -export S3_REGION="us-east-2" -export S3_PATH_STYLE_ACCESS= \ No newline at end of file +MANAGEMENT_URL="http://localhost:3000" +CATALOG_URL="http://localhost:3000/catalog" +S3_ACCESS_KEY="access-key" +S3_SECRET_KEY="secret-key" +S3_BUCKET="mybucket" +S3_ENDPOINT= +S3_REGION="us-east-2" +S3_PATH_STYLE_ACCESS= \ No newline at end of file From c6132c593469ad5b9bf390c1c6df857d28d4c8c5 Mon Sep 17 00:00:00 2001 From: andheroe Date: Fri, 25 Oct 2024 15:24:32 +0300 Subject: [PATCH 2/3] Refactor table query handling logic and update return types --- crates/control_plane/src/service.rs | 20 ++++------ crates/nexus/src/http/ui/handlers/tables.rs | 41 ++++++--------------- crates/nexus/src/http/ui/models/table.rs | 24 ++++++++++-- 3 files changed, 39 insertions(+), 46 deletions(-) diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index b6f9ac90c..f7b1e52f5 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -32,7 +32,7 @@ pub trait ControlService: Send + Sync { // async fn delete_table(&self, id: Uuid) -> Result<()>; // async fn list_tables(&self) -> Result>; - async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<()>; + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)>; } pub struct ControlServiceImpl { @@ -101,7 +101,7 @@ impl ControlService for ControlServiceImpl { self.warehouse_repo.list().await } - async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<()> { + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)> { let config = RestCatalogConfig::builder() .uri("http://0.0.0.0:3000/catalog".to_string()) .warehouse(warehouse_id.to_string()) @@ -109,19 +109,13 @@ impl ControlService for ControlServiceImpl { .build(); let catalog = RestCatalog::new(config); - let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) - .await - .unwrap(); - - let ctx = SessionContext::new(); - ctx.register_catalog("catalog", Arc::new(catalog)); - let provider = ctx.catalog("catalog").unwrap(); - let schemas = provider.schema_names(); - println!("{schemas:?}"); - assert_eq!(schemas.len(), 6); + // TODO need manifest file written before the code below works + // let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) + // .await + // .unwrap(); - Ok(()) + Ok(("OK")) } } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index 04e97719b..5f17262e8 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -2,9 +2,11 @@ use crate::error::AppError; use crate::http::ui::models::{aws, database, storage_profile, table, warehouse}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use axum_macros::debug_handler; use utoipa::OpenApi; use uuid::Uuid; use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credentials, AwsAccessKeyCredential, WarehouseCreateRequest}; +use crate::http::ui::models::table::TableQueryRequest; #[derive(OpenApi)] #[openapi( @@ -18,7 +20,7 @@ use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credenti components( schemas( table::TableExtended, - table::TableQueryResult, + table::TableQueryResponse, database::Database, ) ), @@ -198,6 +200,7 @@ pub async fn delete_table( #[utoipa::path( post, path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query", + request_body = table::TableQueryRequest, operation_id = "webTableQuery", params( ("warehouseId" = Uuid, Path, description = "Warehouse ID"), @@ -205,40 +208,20 @@ pub async fn delete_table( ("tableName" = Uuid, Path, description = "Table name") ), responses( - (status = 200, description = "Returns result of the query", body = Vec), + (status = 200, description = "Returns result of the query", body = Vec), (status = 500, description = "Internal server error") ) )] pub async fn query_table( State(state): State, Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, -) -> Result, AppError> { - ///////// For testing only - let storage_profile_create = StorageProfileCreateRequest { - r#type: CloudProvider::AWS, - region: "us-east-2".to_string(), - bucket: "test_bucket".to_string(), - credentials: Credentials::AccessKey(AwsAccessKeyCredential { - aws_access_key_id: "test_access_key".to_string(), - aws_secret_access_key: "test_secret_access_key".to_string(), - }), - sts_role_arn: None, - endpoint: None, - }; - let storage_profile = state.control_svc.create_profile(&storage_profile_create).await?; - let warehouse_create = WarehouseCreateRequest { - prefix: "test_prefix".to_string(), - name: "test_name".to_string(), - storage_profile_id: storage_profile.id, - }; - let warehouse = state.control_svc.create_warehouse(&warehouse_create).await?; - // state.control_svc. - ///////// - let query = "dfssdf".to_string(); - let result = state.control_svc.query_table(&warehouse_id, &query).await?; - Ok(Json(table::TableQueryResult { + Json(payload): Json, +) -> Result, AppError> { + let request: TableQueryRequest = payload.into(); + let result = state.control_svc.query_table(&warehouse_id, &request.query).await?; + Ok(Json(table::TableQueryResponse { id: Default::default(), - query, - result: "result".to_string(), + query: request.query.clone(), + result: result.to_string(), })) } diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index d42725fd6..ce64c2c2a 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -579,20 +579,36 @@ pub struct WriteDefault(swagger::AnyOf6 TableQueryRequest { + TableQueryRequest { + query, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +pub struct TableQueryResponse { pub id: uuid::Uuid, pub query: String, pub result: String, } -impl crate::http::ui::models::table::TableQueryResult { +impl crate::http::ui::models::table::TableQueryResponse { #[allow(clippy::new_without_default)] pub fn new( id: uuid::Uuid, query: String, result: String, - ) -> crate::http::ui::models::table::TableQueryResult { - crate::http::ui::models::table::TableQueryResult { + ) -> crate::http::ui::models::table::TableQueryResponse { + crate::http::ui::models::table::TableQueryResponse { id, query, result, From 1bd341c7b2c93b8c98919d7440c79ba5152d4415 Mon Sep 17 00:00:00 2001 From: andheroe Date: Fri, 25 Oct 2024 15:27:46 +0300 Subject: [PATCH 3/3] Add 'export' back to environment variables and remove redundant code --- crates/nexus/src/main.rs | 14 -------------- tests/.env | 16 ++++++++-------- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index c30a880a4..0706d6a92 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -90,20 +90,6 @@ async fn main() { .unwrap(); tracing::debug!("listening on {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); - - - - // // Set up tracing to log to stdout - // let subscriber = FmtSubscriber::builder() - // .with_max_level(Level::INFO) - // .finish(); - // tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); - // - // // Create the application router and pass the state - // let app = http::router::create_app(app_state).layer(middleware::from_fn(log_request)); - // - // let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); - // axum::serve(listener, app).await.unwrap(); } diff --git a/tests/.env b/tests/.env index bb502c804..4b3318e71 100644 --- a/tests/.env +++ b/tests/.env @@ -1,8 +1,8 @@ -MANAGEMENT_URL="http://localhost:3000" -CATALOG_URL="http://localhost:3000/catalog" -S3_ACCESS_KEY="access-key" -S3_SECRET_KEY="secret-key" -S3_BUCKET="mybucket" -S3_ENDPOINT= -S3_REGION="us-east-2" -S3_PATH_STYLE_ACCESS= \ No newline at end of file +export MANAGEMENT_URL="http://localhost:3000" +export CATALOG_URL="http://localhost:3000/catalog" +export S3_ACCESS_KEY="access-key" +export S3_SECRET_KEY="secret-key" +export S3_BUCKET="mybucket" +export S3_ENDPOINT= +export S3_REGION="us-east-2" +export S3_PATH_STYLE_ACCESS= \ No newline at end of file