Skip to content

bbuckle1959/universal-bridge

Repository files navigation

Universal Bridge

Notice: Archived / As-Is Community Release > This repository is published as a free, unmonetized community service. It is functionally complete, stable, and fully open-source. However, it is not actively maintained. No issues or pull requests will be reviewed. You are highly encouraged to fork this repository and adapt it to your own needs!

Universal Bridge is an agnostic webhook relay: it accepts HTTP payloads from any source, transforms them with isolated JavaScript, and forwards the result to a configured destination. Pipelines are declared in YAML—no code changes required to add or change routes.

Incoming webhooks are persisted to a SQLite task queue before the HTTP response is returned, so work survives restarts and is processed by a throttled worker pool instead of fire-and-forget in-memory handlers.

Concept

Many integrations follow the same pattern: receive → queue → reshape → forward. Universal Bridge separates that into four stages:

  1. Ingress — A Fastify HTTP server exposes /ingress/*. Each path slug maps to a pipeline. Incoming requests are wrapped in an Envelope (execution id, source, timestamp, headers, raw payload). The envelope is written to disk as a pending task, then the caller receives 202 Accepted with the execution id.

  2. Task queueBridgeDatabase (data/bridge.db) stores tasks in SQLite (WAL mode). Workers claim work transactionally; successful runs delete the row; exhausted failures remain as failed for inspection or retry.

  3. Transformation — A ScriptRunner executes a .js file from transformations/ inside an isolated-vm isolate (128MB cap, 5s timeout). Scripts define transform(envelope) and return JSON-serializable output.

  4. Egress — An HttpDispatcher sends the transformed payload using fetch, with retries and exponential backoff (1s, 2s, 4s) on network errors and HTTP 5xx.

The BridgeEngine loads config/pipelines.yaml, starts the HTTP listener and worker loop, and logs one line per completed execution:

[bridge] executionId=<uuid> pipeline=<id> durationMs=<ms> status=success
flowchart LR
  Client -->|POST /ingress/slug| HttpListener
  HttpListener -->|persist pending| SQLite[(data/bridge.db)]
  HttpListener -->|202 + executionId| Client
  SQLite -->|claim task| BridgeEngine
  BridgeEngine --> ScriptRunner
  ScriptRunner --> HttpDispatcher
  HttpDispatcher --> Destination
  BridgeEngine -->|success: delete| SQLite
  BridgeEngine -->|fail: mark failed| SQLite
Loading

Installation

Prerequisites

  • Node.js 20+ (22 recommended)
  • npm
  • A C++ build toolchain may be required on some platforms to compile native addons (better-sqlite3, isolated-vm)
  • Node.js 20+ requires --no-node-snapshot when using isolated-vm (included in npm start)

Steps

  1. Clone or download the repository and open a terminal in the project root.

  2. Install dependencies:

    npm install
  3. Configure at least one pipeline in config/pipelines.yaml (see Configuration).

  4. Add or edit transformation scripts under transformations/.

  5. Start the bridge:

    npm start

    On first run, the data/ directory and data/bridge.db are created automatically.

    Optional environment variable:

    Variable Default Description
    PORT 3000 HTTP listen port

    Graceful shutdown: Ctrl+C (SIGINT) or SIGTERM stops the worker loop, waits for in-flight tasks, then closes the HTTP server and database.

Running the example

The repository includes a ready-made solar hardware alert pipeline. It simulates a device webhook, transforms it with transformations/solar_to_teams.js, and POSTs the result to httpbin.org so you can inspect the outbound payload without configuring a real Teams or Slack URL.

Example files

File Purpose
config/pipelines.yaml Active pipeline config loaded at startup
config/pipelines.example.yaml Same schema as a reference copy
examples/solar-hardware-payload.json Sample JSON body to POST to the bridge
transformations/solar_to_teams.js Transform script used by the example pipeline
data/bridge.db SQLite task store (created at runtime, gitignored)

Step-by-step

1. Install dependencies (from the project root):

npm install

2. Confirm configurationconfig/pipelines.yaml should define the solar-teams-alert pipeline with ingress slug solar-hardware. If you are setting up from scratch, copy the example config:

cp config/pipelines.example.yaml config/pipelines.yaml

On Windows (PowerShell):

Copy-Item config\pipelines.example.yaml config\pipelines.yaml

3. Start the bridge (leave this terminal open):

npm start

You should see Fastify listening on port 3000 (or your PORT value).

4. Send the example payload — in a second terminal, post the sample file to the ingress route:

curl -X POST http://localhost:3000/ingress/solar-hardware \
  -H "Content-Type: application/json" \
  -d @examples/solar-hardware-payload.json

Windows (PowerShell):

Invoke-RestMethod -Uri "http://localhost:3000/ingress/solar-hardware" `
  -Method POST `
  -ContentType "application/json" `
  -Body (Get-Content -Raw examples\solar-hardware-payload.json)

5. Verify the run

  • HTTP response (immediate)202 with a body like {"accepted":true,"id":"<uuid>"}. The task is already on disk at this point.

  • Bridge console (after workers finish) — a success line, for example:

    [bridge] executionId=<uuid> pipeline=solar-teams-alert durationMs=120 status=success
    
  • Transformed output — the script maps dev_id and val into a notification shape. With httpbin egress, inspect the POST at https://httpbin.org/post; the JSON body should resemble:

    {
      "title": "System Notification",
      "sourceDevice": "sensor-01",
      "status": "ALARM",
      "capturedMetrics": { "temp": 42.5 }
    }

If status=fail appears in the console, check network access to https://httpbin.org/post, that solar_to_teams.js exists under transformations/, and whether a row remains in data/bridge.db with status = failed (see Task queue).

Configuration

Pipelines are defined in config/pipelines.yaml:

pipelines:
  - id: solar-teams-alert
    ingress:
      path: /ingress/solar-hardware
    transformation:
      script: solar_to_teams.js
    egress:
      url: https://httpbin.org/post
      method: POST
Field Description
id Unique pipeline identifier (used as envelope source and in logs).
ingress.path Route slug after /ingress/ (e.g. /ingress/solar-hardware → slug solar-hardware).
ingress.method Allowed HTTP method for this pipeline (defaults to POST).
transformation.script Basename of a .js file in transformations/.
egress.url Outbound webhook or API URL.
egress.method HTTP method (defaults to POST).
egress.headers Optional static headers for outbound requests.

Usage

Send a webhook

curl -X POST http://localhost:3000/ingress/solar-hardware \
  -H "Content-Type: application/json" \
  -d '{"dev_id": "sensor-01", "val": 42.5}'

Immediate response (202):

{"accepted": true, "id": "<execution-uuid>"}

The id is the execution id stored in SQLite. Processing happens asynchronously via the worker pool.

Console (after workers complete the task):

[bridge] executionId=<execution-uuid> pipeline=solar-teams-alert durationMs=12 status=success

On failure, status=fail is logged with an error field, and the task row is marked failed in the database (see below).

Write a transformation script

Create transformations/my_transform.js:

function transform(envelope) {
  return {
    title: "Forwarded event",
    receivedAt: envelope.timestamp,
    data: envelope.payload,
  };
}

Reference it in YAML as script: my_transform.js. The function receives the full envelope; return any JSON-serializable object.

Task queue

Tasks are stored in data/bridge.db in the tasks table:

Column Description
id Execution id (same as envelope id and 202 response)
pipelineId Pipeline that received the webhook
envelope JSON-serialized Envelope
status pending, processing, or failed
retries Incremented each time processing fails after egress/transform errors
created_at UTC timestamp when the task was enqueued

Lifecycle

  1. Ingress inserts a row with status = pending and returns 202.
  2. A worker claims the task (processing) subject to concurrency limits.
  3. On success, the row is deleted.
  4. On failure (transform error, egress error after HTTP retries, or non-2xx response), the row is set to failed and retries is incremented.
  5. Workers may reclaim failed tasks while retries < 3 (same limit as configured in BridgeEngine).
  6. On startup, any processing rows (e.g. from a crash) are reset to pending.

Inspect failed work with any SQLite client, for example:

sqlite3 data/bridge.db "SELECT id, pipelineId, status, retries, created_at FROM tasks WHERE status = 'failed';"

Project layout

universal-bridge/
├── config/
│   ├── pipelines.yaml          # Active pipeline definitions
│   └── pipelines.example.yaml
├── data/
│   └── bridge.db               # SQLite task queue (runtime, gitignored)
├── examples/
│   └── solar-hardware-payload.json
├── transformations/            # Sandboxed .js transform scripts
├── src/
│   ├── core/
│   │   ├── database.ts           # SQLite task store
│   │   ├── engine.ts             # BridgeEngine + worker pool
│   │   └── types.ts              # Envelope & config types
│   ├── ingress/
│   │   └── httpListener.ts       # Fastify ingress
│   ├── transform/
│   │   └── scriptRunner.ts       # VM-based script execution
│   ├── egress/
│   │   └── httpDispatcher.ts     # Outbound HTTP with retries
│   └── index.ts                  # Application entry point
└── package.json

Behavior notes

  • Durability — The envelope is committed to SQLite before 202 is returned; a crash after accept still leaves a pending task to process on restart.
  • Worker concurrency — At most 5 tasks run globally and 2 per pipeline at once, so downstream endpoints are not flooded.
  • Egress retriesHttpDispatcher retries up to 3 times with 1s, 2s, and 4s backoff on fetch failures or HTTP 5xx. Client errors (4xx) are not retried.
  • Task-level retries — After a failed run, workers may retry failed tasks until retries reaches 3; beyond that, rows stay on disk for manual inspection or a future bulk re-run tool.
  • Script isolation — Transforms run in a fresh V8 isolate via isolated-vm (128MB memory limit, 5s timeout). The envelope is copied in with ExternalCopy; only envelopeReference is visible to user code.
  • Unknown slugs — Requests to unconfigured /ingress/* paths return 404 (nothing is enqueued).

License

MIT

About

A lightweight, zero-dependency, headless webhook relay and data transformation pipeline. Configure pipelines via YAML and map payloads with isolated JavaScript.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Contributors