diff --git a/Cargo.toml b/Cargo.toml index 08ac97417..dc812d595 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,4 @@ tempfile = { version = "3" } utoipa = { version = "5.0.0-beta.0", features = ["uuid", "chrono"] } utoipa-axum = { version = "0.1.0-beta.2" } utoipa-swagger-ui = { version = "7.1.1-beta.0", features = ["axum"] } -lazy_static = { version = "1.5" } \ No newline at end of file +lazy_static = { version = "1.5" } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index 2224e86c6..5e4943e3a 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -12,7 +12,11 @@ thiserror = { workspace = true } utils = { path = "../utils" } futures = { workspace = true } serde = { workspace = true } +datafusion = { version = "41" } +iceberg-catalog-rest = { version = "0.3" } +iceberg-datafusion = { version = "0.3" } +arrow = { version = "52" } [dev-dependencies] slatedb = {workspace = true } -object_store = { workspace = true } \ No newline at end of file +object_store = { workspace = true } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index 04a006036..f7b1e52f5 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -5,6 +5,10 @@ use crate::repository::{StorageProfileRepository, WarehouseRepository}; use async_trait::async_trait; use std::sync::Arc; use uuid::Uuid; +use datafusion::prelude::*; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; +use iceberg_datafusion::IcebergCatalogProvider; +use std::collections::HashMap; #[async_trait] pub trait ControlService: Send + Sync { @@ -27,6 +31,8 @@ pub trait ControlService: Send + Sync { // async fn get_table(&self, id: Uuid) -> Result; // async fn delete_table(&self, id: Uuid) -> Result<()>; // async fn list_tables(&self) -> Result>; + + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)>; } pub struct ControlServiceImpl { @@ -94,6 +100,23 @@ impl ControlService for ControlServiceImpl { async fn list_warehouses(&self) -> Result> { self.warehouse_repo.list().await } + + async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)> { + let config = RestCatalogConfig::builder() + .uri("http://0.0.0.0:3000/catalog".to_string()) + .warehouse(warehouse_id.to_string()) + .props(HashMap::default()) + .build(); + + let catalog = RestCatalog::new(config); + + // TODO need manifest file written before the code below works + // let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog)) + // .await + // .unwrap(); + + Ok(("OK")) + } } #[cfg(test)] diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index 437dbd01f..4d69a325d 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -27,6 +27,10 @@ utoipa-swagger-ui = { workspace = true } swagger = { version = "6.1", features = ["serdejson", "server", "client", "tls", "tcp"] } validator = { version = "0.18.1", features = ["derive"] } thiserror = { version = "1.0.63" } +tracing = { version = "0.1" } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +hyper = "1.5.0" +http-body-util = "0.1.0" [dev-dependencies] tower = { workspace = true } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index a4e32b2c7..5f17262e8 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -2,8 +2,11 @@ use crate::error::AppError; use crate::http::ui::models::{aws, database, storage_profile, table, warehouse}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; +use axum_macros::debug_handler; use utoipa::OpenApi; use uuid::Uuid; +use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credentials, AwsAccessKeyCredential, WarehouseCreateRequest}; +use crate::http::ui::models::table::TableQueryRequest; #[derive(OpenApi)] #[openapi( @@ -12,10 +15,12 @@ use uuid::Uuid; delete_table, get_table, list_tables, + query_table, ), components( schemas( table::TableExtended, + table::TableQueryResponse, database::Database, ) ), @@ -190,3 +195,33 @@ pub async fn delete_table( ) -> Result<(), AppError> { Ok(()) } + + +#[utoipa::path( + post, + path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query", + request_body = table::TableQueryRequest, + operation_id = "webTableQuery", + params( + ("warehouseId" = Uuid, Path, description = "Warehouse ID"), + ("databaseName" = Uuid, Path, description = "Database Name"), + ("tableName" = Uuid, Path, description = "Table name") + ), + responses( + (status = 200, description = "Returns result of the query", body = Vec), + (status = 500, description = "Internal server error") + ) +)] +pub async fn query_table( + State(state): State, + Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, + Json(payload): Json, +) -> Result, AppError> { + let request: TableQueryRequest = payload.into(); + let result = state.control_svc.query_table(&warehouse_id, &request.query).await?; + Ok(Json(table::TableQueryResponse { + id: Default::default(), + query: request.query.clone(), + result: result.to_string(), + })) +} diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index 538097624..ce64c2c2a 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -576,3 +576,42 @@ impl Statistics { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct WriteDefault(swagger::AnyOf6); + + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +pub struct TableQueryRequest { + pub query: String, +} + +impl TableQueryRequest { + #[allow(clippy::new_without_default)] + pub fn new( + query: String, + ) -> TableQueryRequest { + TableQueryRequest { + query, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] +pub struct TableQueryResponse { + pub id: uuid::Uuid, + pub query: String, + pub result: String, +} + +impl crate::http::ui::models::table::TableQueryResponse { + #[allow(clippy::new_without_default)] + pub fn new( + id: uuid::Uuid, + query: String, + result: String, + ) -> crate::http::ui::models::table::TableQueryResponse { + crate::http::ui::models::table::TableQueryResponse { + id, + query, + result, + } + } +} diff --git a/crates/nexus/src/http/ui/router.rs b/crates/nexus/src/http/ui/router.rs index a9bf0008d..ea6e282ff 100644 --- a/crates/nexus/src/http/ui/router.rs +++ b/crates/nexus/src/http/ui/router.rs @@ -2,7 +2,7 @@ use crate::http::ui::handlers::databases::{delete_database, get_database}; use crate::http::ui::handlers::profiles::{ create_storage_profile, delete_storage_profile, get_storage_profile, list_storage_profiles, }; -use crate::http::ui::handlers::tables::get_table; +use crate::http::ui::handlers::tables::{get_table, query_table}; use crate::http::ui::handlers::warehouses::{ create_warehouse, delete_warehouse, get_warehouse, list_warehouses, }; @@ -25,6 +25,10 @@ pub fn create_router() -> Router { "/warehouses/:warehouseId/databases/:databaseName/tables/:tableName", get(get_table), ) + .route( + "/warehouses/:warehouseId/databases/:databaseName/tables/:tableName/query", + post(query_table), + ) .route( "/storage-profiles", post(create_storage_profile).get(list_storage_profiles), diff --git a/crates/nexus/src/main.rs b/crates/nexus/src/main.rs index c73deb1cf..0706d6a92 100644 --- a/crates/nexus/src/main.rs +++ b/crates/nexus/src/main.rs @@ -6,6 +6,18 @@ use object_store::{memory::InMemory, path::Path, ObjectStore}; use slatedb::config::DbOptions; use slatedb::db::Db as SlateDb; use std::sync::Arc; +use axum::{ + body::{Body, Bytes}, + extract::Request, + http::StatusCode, + middleware::{self, Next}, + response::{IntoResponse, Response}, + routing::post, + Router, +}; +use axum::http::Method; +use http_body_util::BodyExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use utils::Db; @@ -61,9 +73,64 @@ async fn main() { // Create the application state let app_state = state::AppState::new(Arc::new(control_svc), Arc::new(catalog_svc)); - // Create the application router and pass the state - let app = http::router::create_app(app_state); + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { + format!("{}=debug,tower_http=debug", env!("CARGO_CRATE_NAME")).into() + }), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); + let app = http::router::create_app(app_state) + .layer(middleware::from_fn(print_request_response)); + + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000") + .await + .unwrap(); + tracing::debug!("listening on {}", listener.local_addr().unwrap()); axum::serve(listener, app).await.unwrap(); } + + +async fn print_request_response( + req: Request, + next: Next, +) -> Result { + let (req_parts, req_body) = req.into_parts(); + let method = req_parts.method.to_string(); + let uri = req_parts.uri.to_string(); + let bytes = buffer_and_print("request", &method, &uri, req_body).await?; + let req = Request::from_parts(req_parts, Body::from(bytes)); + + let res = next.run(req).await; + + let (resp_parts, resp_body) = res.into_parts(); + let bytes = buffer_and_print("response", &method, &uri, resp_body).await?; + let res = Response::from_parts(resp_parts, Body::from(bytes)); + + Ok(res) +} + +async fn buffer_and_print(direction: &str, method:&String, uri:&String, body: B) -> Result +where + B: axum::body::HttpBody, + B::Error: std::fmt::Display, +{ + let bytes = match body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(err) => { + return Err(( + StatusCode::BAD_REQUEST, + format!("failed to read {direction} body: {err}"), + )); + } + }; + + if let Ok(body) = std::str::from_utf8(&bytes) { + tracing::debug!("{direction} {method} {uri} body = {body:?}"); + } + + Ok(bytes) +}