Skip to content

Commit

Permalink
add callback information as a webhook (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
Schniz committed Nov 22, 2023
1 parent eaccaec commit 26ea3d4
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 94 deletions.
10 changes: 10 additions & 0 deletions .changeset/fuzzy-jars-compare.md
@@ -0,0 +1,10 @@
---
"@pointguard/nextjs": patch
"@pointguard/core": patch
---

use the webhook openapi definition for types

the Next.js adapter now returns 200 for all execution requests,
because we successfuly applied them. But the JSON might be an error
(that is managed by Pointguard). Errors are values, Exceptions are bugs.
5 changes: 5 additions & 0 deletions .changeset/serious-geese-drum.md
@@ -0,0 +1,5 @@
---
"@pointguard/cli": patch
---

add executeTask webhook with the request/response schemas
45 changes: 41 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -4,5 +4,6 @@ members = [
"crates/web_api",
"crates/engine_postgres",
"crates/cli",
"crates/types",
]
resolver = "2"
1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
colored = "2.0.4"
tracing-subscriber = "0.3.17"
pointguard_engine_postgres = { path = "../engine_postgres" }
pointguard_types = { path = "../types" }
tokio = { version = "1.33.0", features = [
"macros",
"time",
Expand Down
5 changes: 4 additions & 1 deletion crates/cli/src/main.rs
Expand Up @@ -87,6 +87,8 @@ impl Serve {
.serve(termination);

tokio::join!(task_loop, serving);

tracing::info!("goodbye!");
}
}

Expand All @@ -108,7 +110,8 @@ struct OpenApiSpec {

impl OpenApiSpec {
fn call(self) {
let spec = pointguard_web_api::api_router().1;
let mut spec = pointguard_web_api::openapi::new();
let _ = pointguard_web_api::api_router(&mut spec);
if self.pretty {
serde_json::to_writer_pretty(std::io::stdout(), &spec)
.expect("writing OpenAPI spec to stdout");
Expand Down
32 changes: 16 additions & 16 deletions crates/cli/src/task_loop.rs
@@ -1,21 +1,12 @@
use futures::Future;
use pointguard_engine_postgres::{self as db, postgres::PgPool};

#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
struct InvokedTask<'a> {
job_name: &'a str,
input: &'a serde_json::Value,
retry_count: i32,
max_retries: i32,
created_at: &'a chrono::DateTime<chrono::Utc>,
}
use pointguard_types::{InvokedTaskPayload, InvokedTaskResponse};

#[tracing::instrument(skip_all, fields(id = %task.id, endpoint = %task.endpoint))]
async fn execute_task(http: reqwest::Client, task: db::InflightTask, db: PgPool) {
let response = http
.post(&task.endpoint)
.json(&InvokedTask {
.json(&InvokedTaskPayload {
job_name: &task.job_name[..],
input: &task.data,
retry_count: task.retry_count,
Expand All @@ -26,16 +17,25 @@ async fn execute_task(http: reqwest::Client, task: db::InflightTask, db: PgPool)
.await
.and_then(|res| res.error_for_status());

let response = match response {
Err(err) => Err(err),
Ok(res) => res.json::<InvokedTaskResponse>().await,
}
.unwrap_or_else(|err| InvokedTaskResponse::Failure {
reason: err.to_string(),
retriable: true,
});

match response {
Ok(_) => {
InvokedTaskResponse::Success {} => {
tracing::info!("invocation completed");
task.done(&db).await;
}
Err(err) => {
tracing::error!("invocation failed: {err}");
task.failed(&db, &err.to_string()).await;
InvokedTaskResponse::Failure { reason, retriable } => {
tracing::error!("invocation failed: {reason}");
task.failed(&db, &reason, retriable).await;
}
}
};
}

pub async fn run(db: db::postgres::PgPool, termination: impl Future<Output = ()>) {
Expand Down
70 changes: 48 additions & 22 deletions crates/engine_postgres/src/inflight_task.rs
@@ -1,3 +1,5 @@
use std::fmt::Display;

#[derive(Debug)]
pub struct InflightTask {
pub id: i64,
Expand All @@ -13,6 +15,22 @@ pub struct InflightTask {
cleaned_up: bool,
}

enum RetryStatus {
Retry,
GiveUp,
Bailed,
}

impl Display for RetryStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RetryStatus::Retry => write!(f, "retry"),
RetryStatus::GiveUp => write!(f, "giving up"),
RetryStatus::Bailed => write!(f, "bailed (not retriable)"),
}
}
}

impl InflightTask {
pub async fn done(mut self, conn: &sqlx::PgPool) {
let mut tx = conn.begin().await.expect("failed to start transaction");
Expand All @@ -39,10 +57,37 @@ impl InflightTask {
self.cleaned_up = true;
}

pub async fn failed(mut self, conn: &sqlx::PgPool, message: &str) {
if self.max_retries == self.retry_count {
pub async fn failed(mut self, conn: &sqlx::PgPool, message: &str, retriable: bool) {
let status = if retriable && self.max_retries > self.retry_count {
RetryStatus::Retry
} else if retriable {
RetryStatus::GiveUp
} else {
RetryStatus::Bailed
};

if let RetryStatus::Retry = status {
sqlx::query!(
"
UPDATE
tasks
SET
worker_id = NULL,
started_at = NULL,
run_at = now() + retry_delay,
updated_at = now(),
retry_count = retry_count + 1
WHERE
id = $1
",
self.id,
)
.execute(conn)
.await
.expect("failed to update task");
} else {
tracing::error!(
"task {} failed {} times, giving up",
"task {} failed {} times, {status}",
self.id,
self.retry_count + 1
);
Expand Down Expand Up @@ -71,25 +116,6 @@ impl InflightTask {
.await
.expect("failed to delete task");
tx.commit().await.expect("failed to commit transaction");
} else {
sqlx::query!(
"
UPDATE
tasks
SET
worker_id = NULL,
started_at = NULL,
run_at = now() + retry_delay,
updated_at = now(),
retry_count = retry_count + 1
WHERE
id = $1
",
self.id,
)
.execute(conn)
.await
.expect("failed to update task");
}

self.cleaned_up = true;
Expand Down
10 changes: 10 additions & 0 deletions crates/types/Cargo.toml
@@ -0,0 +1,10 @@
[package]
name = "pointguard_types"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = { version = "0.4.31", features = ["serde"] }
schemars = "0.8.16"
serde = "1.0.193"
serde_json = "1.0.108"
33 changes: 33 additions & 0 deletions crates/types/src/lib.rs
@@ -0,0 +1,33 @@
#[derive(Debug, serde::Serialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct InvokedTaskPayload<'a> {
/// The job name to invoke
pub job_name: &'a str,
/// The input data of the task
pub input: &'a serde_json::Value,
/// The amount of times we retried this task
pub retry_count: i32,
/// The maximum amount of times we can retry this task
pub max_retries: i32,
/// The time when this task was enqueued at
pub created_at: &'a chrono::DateTime<chrono::Utc>,
}

const fn bool_true() -> bool {
true
}

#[derive(Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum InvokedTaskResponse {
/// A successful invocation
Success {},
/// A failed invocation
Failure {
/// The reason why it failed
reason: String,
/// Whether or not this task is retriable
#[serde(default = "bool_true")]
retriable: bool,
},
}
2 changes: 2 additions & 0 deletions crates/web_api/Cargo.toml
Expand Up @@ -8,6 +8,7 @@ aide = { version = "0.12.0", features = ["axum", "redoc"] }
schemars = { version = "0.8.15", features = ["url", "chrono"] }
axum = "0.6.20"
pointguard_engine_postgres = { path = "../engine_postgres" }
pointguard_types = { path = "../types" }
serde = { version = "1.0.192", features = ["derive"] }
nanoid = "0.4.0"
serde_json = "1.0.108"
Expand All @@ -21,3 +22,4 @@ http = "0.2.10"
tower = "0.4.13"
flume = "0.11.0"
futures = "0.3.29"
axum-extra = { version = "0.8.0", features = ["json-lines"] }
5 changes: 3 additions & 2 deletions crates/web_api/src/lib.rs
@@ -1,5 +1,6 @@
mod admin;
mod events;
pub mod openapi;
mod router;

use axum::Extension;
Expand All @@ -26,9 +27,9 @@ pub struct Server {
impl Server {
pub async fn serve(self, shutdown_signal: impl Future<Output = ()>) {
let enqueue_tasks = EnqueuedTasks::from(flume::unbounded());
let mut api = openapi::new();

let (app, api) = api_router();
let mut app = app
let mut app = api_router(&mut api)
.with_state(AppState { db: self.pool })
.layer(Extension(api))
.layer(Extension(Arc::new(enqueue_tasks)));
Expand Down

0 comments on commit 26ea3d4

Please sign in to comment.