From 26ea3d4af0c246fad952526599f17fbc3b5b1130 Mon Sep 17 00:00:00 2001 From: Gal Schlezinger Date: Wed, 22 Nov 2023 09:32:39 +0200 Subject: [PATCH] add callback information as a webhook (#10) --- .changeset/fuzzy-jars-compare.md | 10 +++ .changeset/serious-geese-drum.md | 5 ++ Cargo.lock | 45 +++++++++++-- Cargo.toml | 1 + crates/cli/Cargo.toml | 1 + crates/cli/src/main.rs | 5 +- crates/cli/src/task_loop.rs | 32 +++++----- crates/engine_postgres/src/inflight_task.rs | 70 ++++++++++++++------- crates/types/Cargo.toml | 10 +++ crates/types/src/lib.rs | 33 ++++++++++ crates/web_api/Cargo.toml | 2 + crates/web_api/src/lib.rs | 5 +- crates/web_api/src/openapi.rs | 44 +++++++++++++ crates/web_api/src/router.rs | 35 ++--------- packages/core/src/index.ts | 36 +++++++++-- packages/nextjs/src/index.ts | 57 +++++++++++++---- 16 files changed, 297 insertions(+), 94 deletions(-) create mode 100644 .changeset/fuzzy-jars-compare.md create mode 100644 .changeset/serious-geese-drum.md create mode 100644 crates/types/Cargo.toml create mode 100644 crates/types/src/lib.rs create mode 100644 crates/web_api/src/openapi.rs diff --git a/.changeset/fuzzy-jars-compare.md b/.changeset/fuzzy-jars-compare.md new file mode 100644 index 0000000..6ced01a --- /dev/null +++ b/.changeset/fuzzy-jars-compare.md @@ -0,0 +1,10 @@ +--- +"@pointguard/nextjs": patch +"@pointguard/core": patch +--- + +use the webhook openapi definition for types + +the Next.js adapter now returns 200 for all execution requests, +because we successfuly applied them. But the JSON might be an error +(that is managed by Pointguard). Errors are values, Exceptions are bugs. diff --git a/.changeset/serious-geese-drum.md b/.changeset/serious-geese-drum.md new file mode 100644 index 0000000..8f9edb8 --- /dev/null +++ b/.changeset/serious-geese-drum.md @@ -0,0 +1,5 @@ +--- +"@pointguard/cli": patch +--- + +add executeTask webhook with the request/response schemas diff --git a/Cargo.lock b/Cargo.lock index feff068..a38436a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,30 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-extra" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ab90e7b70bea63a153137162affb6a0bce26b584c24a4c7885509783e2cf30b" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "pin-project-lite", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1298,6 +1322,7 @@ dependencies = [ "colored", "futures", "pointguard_engine_postgres", + "pointguard_types", "pointguard_web_api", "reqwest", "serde", @@ -1329,18 +1354,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "pointguard_types" +version = "0.1.0" +dependencies = [ + "chrono", + "schemars", + "serde", + "serde_json", +] + [[package]] name = "pointguard_web_api" version = "0.1.0" dependencies = [ "aide", "axum", + "axum-extra", "chrono", "flume", "futures", "http", "nanoid", "pointguard_engine_postgres", + "pointguard_types", "rust-embed", "schemars", "serde", @@ -1651,18 +1688,18 @@ checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index db7648b..6dbda0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ members = [ "crates/web_api", "crates/engine_postgres", "crates/cli", + "crates/types", ] resolver = "2" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 90be465..fc833b0 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" colored = "2.0.4" tracing-subscriber = "0.3.17" pointguard_engine_postgres = { path = "../engine_postgres" } +pointguard_types = { path = "../types" } tokio = { version = "1.33.0", features = [ "macros", "time", diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 91216de..3646ef4 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -87,6 +87,8 @@ impl Serve { .serve(termination); tokio::join!(task_loop, serving); + + tracing::info!("goodbye!"); } } @@ -108,7 +110,8 @@ struct OpenApiSpec { impl OpenApiSpec { fn call(self) { - let spec = pointguard_web_api::api_router().1; + let mut spec = pointguard_web_api::openapi::new(); + let _ = pointguard_web_api::api_router(&mut spec); if self.pretty { serde_json::to_writer_pretty(std::io::stdout(), &spec) .expect("writing OpenAPI spec to stdout"); diff --git a/crates/cli/src/task_loop.rs b/crates/cli/src/task_loop.rs index 1ad0034..1065dc6 100644 --- a/crates/cli/src/task_loop.rs +++ b/crates/cli/src/task_loop.rs @@ -1,21 +1,12 @@ use futures::Future; use pointguard_engine_postgres::{self as db, postgres::PgPool}; - -#[derive(Debug, serde::Serialize)] -#[serde(rename_all = "camelCase")] -struct InvokedTask<'a> { - job_name: &'a str, - input: &'a serde_json::Value, - retry_count: i32, - max_retries: i32, - created_at: &'a chrono::DateTime, -} +use pointguard_types::{InvokedTaskPayload, InvokedTaskResponse}; #[tracing::instrument(skip_all, fields(id = %task.id, endpoint = %task.endpoint))] async fn execute_task(http: reqwest::Client, task: db::InflightTask, db: PgPool) { let response = http .post(&task.endpoint) - .json(&InvokedTask { + .json(&InvokedTaskPayload { job_name: &task.job_name[..], input: &task.data, retry_count: task.retry_count, @@ -26,16 +17,25 @@ async fn execute_task(http: reqwest::Client, task: db::InflightTask, db: PgPool) .await .and_then(|res| res.error_for_status()); + let response = match response { + Err(err) => Err(err), + Ok(res) => res.json::().await, + } + .unwrap_or_else(|err| InvokedTaskResponse::Failure { + reason: err.to_string(), + retriable: true, + }); + match response { - Ok(_) => { + InvokedTaskResponse::Success {} => { tracing::info!("invocation completed"); task.done(&db).await; } - Err(err) => { - tracing::error!("invocation failed: {err}"); - task.failed(&db, &err.to_string()).await; + InvokedTaskResponse::Failure { reason, retriable } => { + tracing::error!("invocation failed: {reason}"); + task.failed(&db, &reason, retriable).await; } - } + }; } pub async fn run(db: db::postgres::PgPool, termination: impl Future) { diff --git a/crates/engine_postgres/src/inflight_task.rs b/crates/engine_postgres/src/inflight_task.rs index 29c5b52..4dde953 100644 --- a/crates/engine_postgres/src/inflight_task.rs +++ b/crates/engine_postgres/src/inflight_task.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + #[derive(Debug)] pub struct InflightTask { pub id: i64, @@ -13,6 +15,22 @@ pub struct InflightTask { cleaned_up: bool, } +enum RetryStatus { + Retry, + GiveUp, + Bailed, +} + +impl Display for RetryStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RetryStatus::Retry => write!(f, "retry"), + RetryStatus::GiveUp => write!(f, "giving up"), + RetryStatus::Bailed => write!(f, "bailed (not retriable)"), + } + } +} + impl InflightTask { pub async fn done(mut self, conn: &sqlx::PgPool) { let mut tx = conn.begin().await.expect("failed to start transaction"); @@ -39,10 +57,37 @@ impl InflightTask { self.cleaned_up = true; } - pub async fn failed(mut self, conn: &sqlx::PgPool, message: &str) { - if self.max_retries == self.retry_count { + pub async fn failed(mut self, conn: &sqlx::PgPool, message: &str, retriable: bool) { + let status = if retriable && self.max_retries > self.retry_count { + RetryStatus::Retry + } else if retriable { + RetryStatus::GiveUp + } else { + RetryStatus::Bailed + }; + + if let RetryStatus::Retry = status { + sqlx::query!( + " + UPDATE + tasks + SET + worker_id = NULL, + started_at = NULL, + run_at = now() + retry_delay, + updated_at = now(), + retry_count = retry_count + 1 + WHERE + id = $1 + ", + self.id, + ) + .execute(conn) + .await + .expect("failed to update task"); + } else { tracing::error!( - "task {} failed {} times, giving up", + "task {} failed {} times, {status}", self.id, self.retry_count + 1 ); @@ -71,25 +116,6 @@ impl InflightTask { .await .expect("failed to delete task"); tx.commit().await.expect("failed to commit transaction"); - } else { - sqlx::query!( - " - UPDATE - tasks - SET - worker_id = NULL, - started_at = NULL, - run_at = now() + retry_delay, - updated_at = now(), - retry_count = retry_count + 1 - WHERE - id = $1 - ", - self.id, - ) - .execute(conn) - .await - .expect("failed to update task"); } self.cleaned_up = true; diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml new file mode 100644 index 0000000..7d3c130 --- /dev/null +++ b/crates/types/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "pointguard_types" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrono = { version = "0.4.31", features = ["serde"] } +schemars = "0.8.16" +serde = "1.0.193" +serde_json = "1.0.108" diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs new file mode 100644 index 0000000..6c54b38 --- /dev/null +++ b/crates/types/src/lib.rs @@ -0,0 +1,33 @@ +#[derive(Debug, serde::Serialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct InvokedTaskPayload<'a> { + /// The job name to invoke + pub job_name: &'a str, + /// The input data of the task + pub input: &'a serde_json::Value, + /// The amount of times we retried this task + pub retry_count: i32, + /// The maximum amount of times we can retry this task + pub max_retries: i32, + /// The time when this task was enqueued at + pub created_at: &'a chrono::DateTime, +} + +const fn bool_true() -> bool { + true +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum InvokedTaskResponse { + /// A successful invocation + Success {}, + /// A failed invocation + Failure { + /// The reason why it failed + reason: String, + /// Whether or not this task is retriable + #[serde(default = "bool_true")] + retriable: bool, + }, +} diff --git a/crates/web_api/Cargo.toml b/crates/web_api/Cargo.toml index 7b5ded3..1776848 100644 --- a/crates/web_api/Cargo.toml +++ b/crates/web_api/Cargo.toml @@ -8,6 +8,7 @@ aide = { version = "0.12.0", features = ["axum", "redoc"] } schemars = { version = "0.8.15", features = ["url", "chrono"] } axum = "0.6.20" pointguard_engine_postgres = { path = "../engine_postgres" } +pointguard_types = { path = "../types" } serde = { version = "1.0.192", features = ["derive"] } nanoid = "0.4.0" serde_json = "1.0.108" @@ -21,3 +22,4 @@ http = "0.2.10" tower = "0.4.13" flume = "0.11.0" futures = "0.3.29" +axum-extra = { version = "0.8.0", features = ["json-lines"] } diff --git a/crates/web_api/src/lib.rs b/crates/web_api/src/lib.rs index 3da2d48..a923f30 100644 --- a/crates/web_api/src/lib.rs +++ b/crates/web_api/src/lib.rs @@ -1,5 +1,6 @@ mod admin; mod events; +pub mod openapi; mod router; use axum::Extension; @@ -26,9 +27,9 @@ pub struct Server { impl Server { pub async fn serve(self, shutdown_signal: impl Future) { let enqueue_tasks = EnqueuedTasks::from(flume::unbounded()); + let mut api = openapi::new(); - let (app, api) = api_router(); - let mut app = app + let mut app = api_router(&mut api) .with_state(AppState { db: self.pool }) .layer(Extension(api)) .layer(Extension(Arc::new(enqueue_tasks))); diff --git a/crates/web_api/src/openapi.rs b/crates/web_api/src/openapi.rs new file mode 100644 index 0000000..6808286 --- /dev/null +++ b/crates/web_api/src/openapi.rs @@ -0,0 +1,44 @@ +use aide::openapi::{Info, OpenApi, Operation, PathItem}; +use axum::Json; +use pointguard_types::{InvokedTaskPayload, InvokedTaskResponse}; +use schemars::JsonSchema; + +pub fn new() -> OpenApi { + let mut api = OpenApi { + info: Info { + description: Some("pointguard api".to_string()), + ..Info::default() + }, + ..OpenApi::default() + }; + + register_component::(&mut api, "InvokedTaskPayload"); + + let mut operation = Operation::default(); + let _ = aide::transform::TransformOperation::new(&mut operation) + .input::>() + .response::<200, Json>(); + + api.webhooks.insert( + "executeTask".to_string(), + aide::openapi::ReferenceOr::Item(PathItem { + post: Some(operation), + ..Default::default() + }), + ); + + api +} + +fn register_component(api: &mut OpenApi, name: &str) { + let mut components = api.components.take().unwrap_or_default(); + components.schemas.insert( + name.to_string(), + aide::openapi::SchemaObject { + json_schema: schemars::schema::Schema::Object(schemars::schema_for!(T).schema), + external_docs: None, + example: None, + }, + ); + api.components = Some(components); +} diff --git a/crates/web_api/src/router.rs b/crates/web_api/src/router.rs index dd76999..1fc3d3e 100644 --- a/crates/web_api/src/router.rs +++ b/crates/web_api/src/router.rs @@ -8,7 +8,7 @@ use aide::{ routing::{get, post}, ApiRouter, IntoApiResponse, }, - openapi::{Info, OpenApi}, + openapi::OpenApi, redoc::Redoc, }; use axum::{ @@ -86,33 +86,8 @@ async fn post_tasks( Json(id) } -pub fn api_router() -> (axum::Router, OpenApi) { - let mut api = OpenApi { - info: Info { - description: Some("pointguard api".to_string()), - ..Info::default() - }, - ..OpenApi::default() - }; - - // let mut components = aide::openapi::Components { - // ..Default::default() - // }; - - // components.schemas.insert( - // "hello".to_string(), - // aide::openapi::SchemaObject { - // json_schema: schemars::schema::Schema::Object( - // schemars::schema_for!(NewTaskBody).schema, - // ), - // external_docs: None, - // example: None, - // }, - // ); - - // api.components = Some(components); - - let app = ApiRouter::new() +pub fn api_router(api: &mut OpenApi) -> axum::Router { + ApiRouter::new() .route("/api", Redoc::new("/api/openapi.json").axum_route()) .route("/api/openapi.json", get(serve_api)) .nest("/", admin_routes()) @@ -121,9 +96,7 @@ pub fn api_router() -> (axum::Router, OpenApi) { .api_route("/api/v1/tasks/:id/cancel", post(cancel_task)) .api_route("/api/v1/tasks/enqueued", get(get_enqueued_tasks)) .api_route("/api/v1/tasks/finished", get(get_finished_tasks)) - .finish_api_with(&mut api, |api| api.default_response::()); - - (app, api) + .finish_api_with(api, |api| api.default_response::()) } fn generate_nanoid() -> String { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f49d5fb..71ec560 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,14 +1,40 @@ import * as Schema from "@effect/schema/Schema"; -import { Client } from "@pointguard/api-client"; +import { Client, components, paths } from "@pointguard/api-client"; + +export * from "@pointguard/api-client"; + +export const Retriable = Symbol.for("@pointguard/core/Retriable"); +const isNonRetriable = (e: unknown): boolean => + typeof e === "object" && + e !== null && + (("retry" in e && e.retry === false) || + (Retriable in e && e[Retriable] === false)); +export const isRetriable = (e: unknown): boolean => !isNonRetriable(e); + +export class RejectedError extends Error { + [Retriable] = false; + name = "RejectedJobError"; +} + +type AllEnqueueParameters = + paths["/api/v1/tasks"]["post"]["requestBody"]["content"]["application/json"]; const EnqueueOptions = Schema.struct({ - runAt: Schema.string.pipe(Schema.dateFromString, Schema.optional), + runAt: Schema.string.pipe( + Schema.dateFromString, + Schema.nullable, + Schema.optional + ), maxRetries: Schema.number.pipe( Schema.greaterThanOrEqualTo(0), + Schema.nullable, Schema.optional ), - name: Schema.string.pipe(Schema.optional), -}); + name: Schema.string.pipe(Schema.nullable, Schema.optional), +}) satisfies Schema.Schema< + Omit, + any +>; const encodeEnqueueOptions = Schema.encodeSync(EnqueueOptions); @@ -154,7 +180,7 @@ export const IncomingJob = Schema.struct({ retryCount: Schema.number, maxRetries: Schema.number, createdAt: Schema.string.pipe(Schema.dateFromString), -}); +}) satisfies Schema.Schema; export type DecodedIncomingJob = Schema.Schema.To; export type EncodedIncomingJob = Schema.Schema.From; diff --git a/packages/nextjs/src/index.ts b/packages/nextjs/src/index.ts index 7cc6b86..e827732 100644 --- a/packages/nextjs/src/index.ts +++ b/packages/nextjs/src/index.ts @@ -1,4 +1,13 @@ -import { Job, parseIncomingJob } from "@pointguard/core"; +import { + Job, + RejectedError, + isRetriable, + parseIncomingJob, + webhooks, +} from "@pointguard/core"; + +type ReturnValue = + webhooks["executeTask"]["post"]["responses"]["200"]["content"]["application/json"]; export { type DecodedIncomingJob, @@ -8,20 +17,27 @@ export { type Job, } from "@pointguard/core"; -export function createHandler(opts: { - jobs: Job[]; -}): (request: Request) => Promise { - const jobsByName = opts.jobs.reduce((acc, job) => { - acc.set(job.name, job); - return acc; - }, new Map>()); - - return async (request: Request) => { - const body = await request.json().then(parseIncomingJob); +async function handleIncomingJob( + jobsByName: Map>, + request: Request +): Promise { + try { + const body = await request + .json() + .catch((err) => { + throw new Error(`failed to parse request body`, { cause: err }); + }) + .then(parseIncomingJob) + .catch((err) => { + throw new RejectedError( + `request body doesn't match the expected format`, + { cause: err } + ); + }); const job = jobsByName.get(body.jobName); if (!job) { - return new Response("job not found", { status: 404 }); + throw new RejectedError(`job ${body.jobName} is not defined`); } await job.handler(body.input, { @@ -31,6 +47,21 @@ export function createHandler(opts: { jobName: body.jobName, }); - return new Response("ok"); + return { success: {} }; + } catch (e) { + return { failure: { reason: String(e), retriable: isRetriable(e) } }; + } +} + +export function createHandler(opts: { + jobs: Job[]; +}): (request: Request) => Promise { + const jobsByName = new Map>(); + for (const job of opts.jobs) { + jobsByName.set(job.name, job); + } + + return async (request: Request) => { + return Response.json(await handleIncomingJob(jobsByName, request)); }; }