Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion crates/api-snowflake-rest/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::models::{
QueryRequest, QueryRequestBody,
};
use crate::server::error::Result;
use crate::server::helpers::handle_query_ok_result;
use crate::server::logic::{handle_login_request, handle_query_request};
use api_snowflake_rest_sessions::TokenizedSession;
use api_snowflake_rest_sessions::layer::Host;
Expand All @@ -12,9 +13,17 @@ use axum::Json;
use axum::extract::{ConnectInfo, Query, State};
use axum::http::HeaderMap;
use executor::RunningQueryId;
use serde::Deserialize;
use executor::models::{QueryContext, SessionMetadata, SessionMetadataAttr};
use executor::utils::DataSerializationFormat;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::net::SocketAddr;

const NATIVE_APP_FUNCTION_SESSION_ID: &str = "native-app-service-function";
const NATIVE_APP_FUNCTION_USER: &str = "SNOWFLAKE_SERVICE_FUNCTION";
const NATIVE_APP_FUNCTION_DATABASE_ENV: &str = "RUSTICE_SERVICE_FUNCTION_DATABASE";
const NATIVE_APP_FUNCTION_SCHEMA_ENV: &str = "RUSTICE_SERVICE_FUNCTION_SCHEMA";

#[derive(Debug, Deserialize)]
pub struct SessionQueryParams {
#[serde(default)]
Expand All @@ -25,6 +34,16 @@ pub struct SessionQueryParams {
request_guid: Option<String>,
}

#[derive(Debug, Deserialize)]
pub struct ServiceFunctionRequest {
data: Vec<Vec<Value>>,
}

#[derive(Debug, Serialize)]
pub struct ServiceFunctionResponse {
data: Vec<Vec<Value>>,
}

#[tracing::instrument(
name = "api_snowflake_rest::login",
level = "debug",
Expand Down Expand Up @@ -95,6 +114,92 @@ pub async fn abort(
Ok(Json(serde_json::value::Value::Null))
}

#[tracing::instrument(
name = "api_snowflake_rest::native_app_service_function_query",
level = "debug",
skip(state, request),
ret(level = tracing::Level::TRACE)
)]
pub async fn native_app_service_function_query(
State(state): State<AppState>,
Json(request): Json<ServiceFunctionRequest>,
) -> Json<ServiceFunctionResponse> {
let mut data = Vec::with_capacity(request.data.len());

for row in request.data {
let row_index = row.first().cloned().unwrap_or_else(|| json!(0));
let result = if let Some(sql_text) = row.get(1).and_then(Value::as_str) {
execute_native_app_query(&state, sql_text).await
} else {
service_function_error_response(
"Service function row must contain a SQL string argument",
)
};
data.push(vec![row_index, Value::String(result)]);
}

Json(ServiceFunctionResponse { data })
}

async fn execute_native_app_query(state: &AppState, sql_text: &str) -> String {
if let Err(error) = state
.execution_svc
.create_session(NATIVE_APP_FUNCTION_SESSION_ID)
.await
{
return service_function_error_response(&error.to_string());
}

let database = std::env::var(NATIVE_APP_FUNCTION_DATABASE_ENV).ok();
let schema = std::env::var(NATIVE_APP_FUNCTION_SCHEMA_ENV).ok();
let mut session_metadata = SessionMetadata::default();
session_metadata.set_attr(
SessionMetadataAttr::UserName,
NATIVE_APP_FUNCTION_USER.to_string(),
);
if let Some(database) = database.clone() {
session_metadata.set_attr(SessionMetadataAttr::Database, database);
}
if let Some(schema) = schema.clone() {
session_metadata.set_attr(SessionMetadataAttr::Schema, schema);
}

let query_context =
QueryContext::new(database, schema, None).with_session_metadata(Some(session_metadata));
let query_id = query_context.query_id;
let query_result = match state
.execution_svc
.query(NATIVE_APP_FUNCTION_SESSION_ID, sql_text, query_context)
.await
{
Ok(query_result) => query_result,
Err(error) => return service_function_error_response(&error.to_string()),
};

let response = match handle_query_ok_result(
sql_text,
query_id,
query_result,
DataSerializationFormat::Json,
) {
Ok(response) => response,
Err(error) => return service_function_error_response(&error.to_string()),
};

match serde_json::to_string(&response) {
Ok(response) => response,
Err(error) => service_function_error_response(&error.to_string()),
}
}

fn service_function_error_response(message: &str) -> String {
json!({
"success": false,
"message": message,
})
.to_string()
}

#[tracing::instrument(
name = "api_snowflake_rest::session",
level = "debug",
Expand Down
6 changes: 5 additions & 1 deletion crates/api-snowflake-rest/src/server/router.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::handlers::{abort, heartbeat, login, query, session};
use super::handlers::{abort, heartbeat, login, native_app_service_function_query, query, session};
use super::layer::require_auth;
use super::state::AppState;
use api_snowflake_rest_sessions::layer::Host;
Expand All @@ -14,6 +14,10 @@ pub fn create_auth_router() -> Router<AppState> {
.route("/session/v1/login-request", post(login))
.route("/session/heartbeat", post(heartbeat))
.route("/session", post(session))
.route(
"/snowflake-function/query",
post(native_app_service_function_query),
)
}

pub fn create_router() -> Router<AppState> {
Expand Down
25 changes: 25 additions & 0 deletions crates/api-snowflake-rest/src/tests/test_gzip_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,31 @@ mod tests {
assert!(query_response.code.is_none()); // no code set on success
}

#[tokio::test]
async fn test_native_app_service_function_query() {
let addr = run_test_rest_api_server(Some(rest_default_cfg("json")), None).await;
let client = reqwest::Client::new();
let function_url = format!("http://127.0.0.1:{}/snowflake-function/query", addr.port());

let res = client
.request(Method::POST, function_url)
.json(&json!({
"data": [
[0, "SELECT 1"]
]
}))
.send()
.await
.unwrap();
assert_eq!(http::StatusCode::OK, res.status());

let body: serde_json::Value = res.json().await.unwrap();
let response = body["data"][0][1].as_str().unwrap();
let response: JsonResponse = serde_json::from_str(response).unwrap();
assert!(response.success);
assert_eq!(response.data.unwrap().total, Some(1));
}

#[tokio::test]
async fn test_spcs_trusted_ingress_login_skips_demo_password() {
let rest_cfg = rest_default_cfg("json").with_trust_spcs_ingress(true);
Expand Down
22 changes: 19 additions & 3 deletions deploy/native-app/CONSUMER_QUICKSTART.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ There are two SQL paths:

- Snowflake UI and regular `snow sql` manage the Native App, create/check
Snowflake-managed Iceberg tables, approve external access, and inspect service
status.
status. After the app service is running, they can also run simple Rustice
smoke queries through `APP_PUBLIC.RUSTICE_QUERY(...)`.
- `embucket-snow` sends SQL to the Rustice container through the SPCS public
ingress endpoint. Use this path when you want Rustice, not a Snowflake
warehouse, to execute the query.
ingress endpoint. Use this path for normal Rustice SQL work, dbt, and result
rendering that behaves like a Snowflake-compatible CLI.

For a copy-paste Snowflake Worksheet, use
[`customer_quickstart.sql`](customer_quickstart.sql). Run it first, then use the
Expand Down Expand Up @@ -262,6 +263,21 @@ CALL RUSTICE_NATIVE_APP.APP_PUBLIC.SERVICE_LOGS(100);
`SERVICE_ENDPOINTS()` returns the public SPCS ingress host. Copy the host
without `https://`.

Run a simple query through Rustice from Snowsight Worksheets or regular
Snowflake CLI:

```sql
SELECT PARSE_JSON(
RUSTICE_NATIVE_APP.APP_PUBLIC.RUSTICE_QUERY(
'SELECT * FROM rustice_spcs.public.smoke ORDER BY id'
)
);
```

`RUSTICE_QUERY(...)` returns the Snowflake REST-style Rustice response as JSON.
It is intended for smoke tests and simple UI checks. For normal SQL workflows,
use `embucket-snow`.

Stop the service when finished:

```sql
Expand Down
6 changes: 5 additions & 1 deletion deploy/native-app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ CALL <installed_app>.APP_PUBLIC.REGISTER_REFERENCE(
);
CALL <installed_app>.APP_PUBLIC.START_APP();
CALL <installed_app>.APP_PUBLIC.SERVICE_ENDPOINTS();
SELECT PARSE_JSON(<installed_app>.APP_PUBLIC.RUSTICE_QUERY('SELECT 1'));
CALL <installed_app>.APP_PUBLIC.SERVICE_LOGS(100);
CALL <installed_app>.APP_PUBLIC.SERVICE_PREVIOUS_LOGS(100);
```
Expand All @@ -278,7 +279,10 @@ when `START_APP()` is called, so compute changes should be followed by
`START_APP()`.

The consumer then points `embucket-snow` or dbt at the returned public ingress
host. For dbt Snowplow, use the runbook in
host for normal Rustice SQL workflows. `APP_PUBLIC.RUSTICE_QUERY(...)` is a
small Snowflake UI / `snow sql` smoke-test wrapper that returns the Rustice
Snowflake REST-style response as JSON text; it is not intended to replace the
patched connector for dbt or regular CLI usage. For dbt Snowplow, use the runbook in
`../test-dbt-snowplow-web/README.md` with:

```bash
Expand Down
5 changes: 3 additions & 2 deletions deploy/native-app/SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ Containers:
Public endpoints:

- One SPCS public ingress endpoint named `main` on port `3000`.
- The endpoint serves the Snowflake-compatible REST API and `/health` readiness
endpoint.
- The endpoint serves the Snowflake-compatible REST API, `/health` readiness
endpoint, and `/snowflake-function/query` for the app-owned
`APP_PUBLIC.RUSTICE_QUERY(...)` service function.
- Public access is protected by Snowflake SPCS ingress authentication.

External integrations:
Expand Down
4 changes: 4 additions & 0 deletions deploy/native-app/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ Then start the service:
CALL <app_name>.APP_PUBLIC.START_APP();
CALL <app_name>.APP_PUBLIC.SERVICE_STATUS();
CALL <app_name>.APP_PUBLIC.SERVICE_ENDPOINTS();
SELECT PARSE_JSON(<app_name>.APP_PUBLIC.RUSTICE_QUERY('SELECT 1'));
CALL <app_name>.APP_PUBLIC.SERVICE_LOGS(100);
CALL <app_name>.APP_PUBLIC.SERVICE_PREVIOUS_LOGS(100);
```

Use the endpoint host returned by `SERVICE_ENDPOINTS()` with the
`embucket-snow` CLI or dbt Snowflake adapter patch.
`RUSTICE_QUERY(...)` is a Snowflake UI / `snow sql` smoke-test wrapper. It
returns the Rustice Snowflake REST-style response as JSON text; use
`embucket-snow` for normal Rustice SQL workflows.

The service mounts the bound secret as a file and points Rustice to it with
`ICEBERG_REST_CREDENTIAL_FILE`; Rustice uses that credential to exchange and
Expand Down
13 changes: 12 additions & 1 deletion deploy/native-app/app/setup_script.sql
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ DECLARE
auto_suspend_secs NUMBER;
create_pool_sql STRING;
create_service_sql STRING;
create_query_function_sql STRING;
BEGIN
SELECT
MAX(IFF(key = 'eai_name', value, NULL)),
Expand Down Expand Up @@ -320,6 +321,7 @@ BEGIN
' horizon_database => ' || CHR(39) || TO_JSON(TO_VARIANT(horizon_database)) || CHR(39) || ',' ||
' horizon_scope => ' || CHR(39) || TO_JSON(TO_VARIANT('session:role:' || horizon_role)) || CHR(39) || ',' ||
' client_database => ' || CHR(39) || TO_JSON(TO_VARIANT(client_database)) || CHR(39) || ',' ||
' client_schema => ' || CHR(39) || TO_JSON(TO_VARIANT(client_schema)) || CHR(39) || ',' ||
' horizon_schemas => ' || CHR(39) || TO_JSON(TO_VARIANT(horizon_schemas)) || CHR(39) || ',' ||
' horizon_tables => ' || CHR(39) || TO_JSON(TO_VARIANT(COALESCE(horizon_tables, ''))) || CHR(39) || ',' ||
' horizon_eager_load => ' || CHR(39) || TO_JSON(TO_VARIANT('0')) || CHR(39) || ',' ||
Expand All @@ -334,7 +336,16 @@ BEGIN
EXECUTE IMMEDIATE create_service_sql;
GRANT SERVICE ROLE core.rustice_service!rustice_user TO APPLICATION ROLE app_user;

RETURN 'Rustice service created. Call APP_PUBLIC.SERVICE_STATUS() and APP_PUBLIC.SERVICE_ENDPOINTS().';
create_query_function_sql := 'CREATE OR REPLACE FUNCTION app_public.rustice_query(sql_text STRING)' ||
' RETURNS STRING' ||
' SERVICE = core.rustice_service' ||
' ENDPOINT = main' ||
' MAX_BATCH_ROWS = 1' ||
' AS ''/snowflake-function/query''';
EXECUTE IMMEDIATE create_query_function_sql;
GRANT USAGE ON FUNCTION app_public.rustice_query(STRING) TO APPLICATION ROLE app_user;

RETURN 'Rustice service created. Call APP_PUBLIC.SERVICE_STATUS(), APP_PUBLIC.SERVICE_ENDPOINTS(), or SELECT PARSE_JSON(APP_PUBLIC.RUSTICE_QUERY(''SELECT 1'')).';
END;
$$;

Expand Down
9 changes: 9 additions & 0 deletions deploy/native-app/customer_quickstart.sql
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ CALL RUSTICE_NATIVE_APP.APP_PUBLIC.SERVICE_STATUS();
CALL RUSTICE_NATIVE_APP.APP_PUBLIC.SERVICE_ENDPOINTS();
CALL RUSTICE_NATIVE_APP.APP_PUBLIC.SERVICE_LOGS(100);

-- Simple Rustice smoke query from Snowflake UI / regular snow sql.
-- RUSTICE_QUERY returns the Snowflake REST-style Rustice response as JSON text.
-- Use embucket-snow for normal Rustice SQL workflows and dbt.
SELECT PARSE_JSON(
RUSTICE_NATIVE_APP.APP_PUBLIC.RUSTICE_QUERY(
'SELECT * FROM rustice_spcs.public.smoke ORDER BY id'
)
);

-- Stop the service when testing is finished.
--
-- CALL RUSTICE_NATIVE_APP.APP_PUBLIC.SUSPEND_APP();
2 changes: 2 additions & 0 deletions deploy/native-app/service/rustice_spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ spec:
ICEBERG_REST_TABLES: {{ horizon_tables }}
ICEBERG_REST_EAGER_LOAD: {{ horizon_eager_load }}
ICEBERG_REST_CREDENTIAL_FILE: "/etc/rustice/horizon_credential/secret_string"
RUSTICE_SERVICE_FUNCTION_DATABASE: {{ client_database }}
RUSTICE_SERVICE_FUNCTION_SCHEMA: {{ client_schema }}
AWS_REGION: {{ s3_region }}
AWS_DEFAULT_REGION: {{ s3_region }}
secrets:
Expand Down
Loading