Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ tempfile = { version = "3" }
utoipa = { version = "5.0.0-beta.0", features = ["uuid", "chrono"] }
utoipa-axum = { version = "0.1.0-beta.2" }
utoipa-swagger-ui = { version = "7.1.1-beta.0", features = ["axum"] }
lazy_static = { version = "1.5" }
lazy_static = { version = "1.5" }
6 changes: 5 additions & 1 deletion crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ thiserror = { workspace = true }
utils = { path = "../utils" }
futures = { workspace = true }
serde = { workspace = true }
datafusion = { version = "41" }
iceberg-catalog-rest = { version = "0.3" }
iceberg-datafusion = { version = "0.3" }
arrow = { version = "52" }

[dev-dependencies]
slatedb = {workspace = true }
object_store = { workspace = true }
object_store = { workspace = true }
23 changes: 23 additions & 0 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use crate::repository::{StorageProfileRepository, WarehouseRepository};
use async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
use datafusion::prelude::*;
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::IcebergCatalogProvider;
use std::collections::HashMap;

#[async_trait]
pub trait ControlService: Send + Sync {
Expand All @@ -27,6 +31,8 @@ pub trait ControlService: Send + Sync {
// async fn get_table(&self, id: Uuid) -> Result<Table>;
// async fn delete_table(&self, id: Uuid) -> Result<()>;
// async fn list_tables(&self) -> Result<Vec<Table>>;

async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)>;
}

pub struct ControlServiceImpl {
Expand Down Expand Up @@ -94,6 +100,23 @@ impl ControlService for ControlServiceImpl {
async fn list_warehouses(&self) -> Result<Vec<Warehouse>> {
self.warehouse_repo.list().await
}

async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)> {
let config = RestCatalogConfig::builder()
.uri("http://0.0.0.0:3000/catalog".to_string())
.warehouse(warehouse_id.to_string())
.props(HashMap::default())
.build();

let catalog = RestCatalog::new(config);

// TODO need manifest file written before the code below works
// let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog))
// .await
// .unwrap();

Ok(("OK"))
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions crates/nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ utoipa-swagger-ui = { workspace = true }
swagger = { version = "6.1", features = ["serdejson", "server", "client", "tls", "tcp"] }
validator = { version = "0.18.1", features = ["derive"] }
thiserror = { version = "1.0.63" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
hyper = "1.5.0"
http-body-util = "0.1.0"

[dev-dependencies]
tower = { workspace = true }
Expand Down
35 changes: 35 additions & 0 deletions crates/nexus/src/http/ui/handlers/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use crate::error::AppError;
use crate::http::ui::models::{aws, database, storage_profile, table, warehouse};
use crate::state::AppState;
use axum::{extract::Path, extract::State, Json};
use axum_macros::debug_handler;
use utoipa::OpenApi;
use uuid::Uuid;
use control_plane::models::{CloudProvider, StorageProfileCreateRequest, Credentials, AwsAccessKeyCredential, WarehouseCreateRequest};
use crate::http::ui::models::table::TableQueryRequest;

#[derive(OpenApi)]
#[openapi(
Expand All @@ -12,10 +15,12 @@ use uuid::Uuid;
delete_table,
get_table,
list_tables,
query_table,
),
components(
schemas(
table::TableExtended,
table::TableQueryResponse,
database::Database,
)
),
Expand Down Expand Up @@ -190,3 +195,33 @@ pub async fn delete_table(
) -> Result<(), AppError> {
Ok(())
}


#[utoipa::path(
post,
path = "/ui/warehouses/{warehouseId}/databases/{databaseName}/tables/{tableName}/query",
request_body = table::TableQueryRequest,
operation_id = "webTableQuery",
params(
("warehouseId" = Uuid, Path, description = "Warehouse ID"),
("databaseName" = Uuid, Path, description = "Database Name"),
("tableName" = Uuid, Path, description = "Table name")
),
responses(
(status = 200, description = "Returns result of the query", body = Vec<table::TableQueryResponse>),
(status = 500, description = "Internal server error")
)
)]
pub async fn query_table(
State(state): State<AppState>,
Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>,
Json(payload): Json<table::TableQueryRequest>,
) -> Result<Json<table::TableQueryResponse>, AppError> {
let request: TableQueryRequest = payload.into();
let result = state.control_svc.query_table(&warehouse_id, &request.query).await?;
Ok(Json(table::TableQueryResponse {
id: Default::default(),
query: request.query.clone(),
result: result.to_string(),
}))
}
39 changes: 39 additions & 0 deletions crates/nexus/src/http/ui/models/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,42 @@ impl Statistics {

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WriteDefault(swagger::AnyOf6<String, bool, i32, f64, swagger::ByteArray, uuid::Uuid>);


#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)]
pub struct TableQueryRequest {
pub query: String,
}

impl TableQueryRequest {
#[allow(clippy::new_without_default)]
pub fn new(
query: String,
) -> TableQueryRequest {
TableQueryRequest {
query,
}
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)]
pub struct TableQueryResponse {
pub id: uuid::Uuid,
pub query: String,
pub result: String,
}

impl crate::http::ui::models::table::TableQueryResponse {
#[allow(clippy::new_without_default)]
pub fn new(
id: uuid::Uuid,
query: String,
result: String,
) -> crate::http::ui::models::table::TableQueryResponse {
crate::http::ui::models::table::TableQueryResponse {
id,
query,
result,
}
}
}
6 changes: 5 additions & 1 deletion crates/nexus/src/http/ui/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::http::ui::handlers::databases::{delete_database, get_database};
use crate::http::ui::handlers::profiles::{
create_storage_profile, delete_storage_profile, get_storage_profile, list_storage_profiles,
};
use crate::http::ui::handlers::tables::get_table;
use crate::http::ui::handlers::tables::{get_table, query_table};
use crate::http::ui::handlers::warehouses::{
create_warehouse, delete_warehouse, get_warehouse, list_warehouses,
};
Expand All @@ -25,6 +25,10 @@ pub fn create_router() -> Router<AppState> {
"/warehouses/:warehouseId/databases/:databaseName/tables/:tableName",
get(get_table),
)
.route(
"/warehouses/:warehouseId/databases/:databaseName/tables/:tableName/query",
post(query_table),
)
.route(
"/storage-profiles",
post(create_storage_profile).get(list_storage_profiles),
Expand Down
73 changes: 70 additions & 3 deletions crates/nexus/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ use object_store::{memory::InMemory, path::Path, ObjectStore};
use slatedb::config::DbOptions;
use slatedb::db::Db as SlateDb;
use std::sync::Arc;
use axum::{
body::{Body, Bytes},
extract::Request,
http::StatusCode,
middleware::{self, Next},
response::{IntoResponse, Response},
routing::post,
Router,
};
use axum::http::Method;
use http_body_util::BodyExt;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

use utils::Db;

Expand Down Expand Up @@ -61,9 +73,64 @@ async fn main() {
// Create the application state
let app_state = state::AppState::new(Arc::new(control_svc), Arc::new(catalog_svc));

// Create the application router and pass the state
let app = http::router::create_app(app_state);
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
format!("{}=debug,tower_http=debug", env!("CARGO_CRATE_NAME")).into()
}),
)
.with(tracing_subscriber::fmt::layer())
.init();

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
let app = http::router::create_app(app_state)
.layer(middleware::from_fn(print_request_response));

let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
.await
.unwrap();
tracing::debug!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
}


async fn print_request_response(
req: Request,
next: Next,
) -> Result<impl IntoResponse, (StatusCode, String)> {
let (req_parts, req_body) = req.into_parts();
let method = req_parts.method.to_string();
let uri = req_parts.uri.to_string();
let bytes = buffer_and_print("request", &method, &uri, req_body).await?;
let req = Request::from_parts(req_parts, Body::from(bytes));

let res = next.run(req).await;

let (resp_parts, resp_body) = res.into_parts();
let bytes = buffer_and_print("response", &method, &uri, resp_body).await?;
let res = Response::from_parts(resp_parts, Body::from(bytes));

Ok(res)
}

async fn buffer_and_print<B>(direction: &str, method:&String, uri:&String, body: B) -> Result<Bytes, (StatusCode,
String)>
where
B: axum::body::HttpBody<Data = Bytes>,
B::Error: std::fmt::Display,
{
let bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(err) => {
return Err((
StatusCode::BAD_REQUEST,
format!("failed to read {direction} body: {err}"),
));
}
};

if let Ok(body) = std::str::from_utf8(&bytes) {
tracing::debug!("{direction} {method} {uri} body = {body:?}");
}

Ok(bytes)
}