Self-healing data pipelines — AI detects anomalies and fixes them autonomously using safe database forks. Zero human intervention. Zero production risk.
Built for the Deep Agents Hackathon @ RSAC 2026 (AWS Builder Loft, San Francisco).
┌─────────────────────────────────────────────────────────────────┐
│ DataGuard Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ DATA SOURCES PIPELINE STORAGE │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ GitHub API │──┐ │ │ │ │ │
│ │ Stripe │──┤ │ Airbyte │───▶│ Ghost DB │ │
│ │ Salesforce │──┤──▶│ (Pipeline │ │ (PostgreSQL) │ │
│ │ Segment │──┤ │ Manager) │ │ │ │
│ │ Snowflake CDC│──┘ │ │ │ raw_events │ │
│ └──────────────┘ └──────────────┘ │ dq_metrics │ │
│ │ remediations│ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────────────────────────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ DataGuard │ │
│ │ Agent │ │
│ │ (5s loop) │ │
│ └──────┬───────┘ │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Aerospike │ │ Claude │ │ Ghost DB │ │
│ │ │ │ (Sonnet) │ │ Forking │ │
│ │•baselines│ │ │ │ │ │
│ │•patterns │ │•diagnosis│ │ Fork ──▶ │ │
│ │•metrics │ │•fix SQL │ │ Test ──▶ │ │
│ │•vectors │ │•chatbot │ │ Promote │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ <1ms ~2s ~5s │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Real-Time Dashboard (SSE) │ │
│ │ 3D Viz · Anomaly Log · Metrics · AI Chatbot │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
FLOW: Detect Anomaly → Search Aerospike Cache → Hit? Reuse Fix
→ Miss? Ask Claude
→ Fork Ghost DB → Apply Fix → Validate → Promote or Rollback
Every 5 seconds, the agent runs this loop:
- Ingest — Airbyte syncs records from 5 upstream sources (GitHub, Stripe, Salesforce, Segment, Snowflake CDC) into Ghost DB
- Check — Runs quality checks: null rates, duplicate rates, row counts, schema drift
- Detect — Compares current metrics against Aerospike-stored baselines using z-score anomaly detection (threshold: z > 2.5)
- Match — Searches Aerospike for similar past anomalies using 64-dimensional vector embeddings with cosine similarity
- Reason — New pattern? Claude analyzes the data, diagnoses root cause, generates fix SQL. Known pattern? Reuses cached fix instantly
- Fix Safely — Forks the entire Ghost DB, applies fix on the fork, validates, promotes fork as new primary. Production is never directly modified
- Learn — Stores anomaly fingerprint + fix in Aerospike. Repeat incidents are resolved in <1ms without any AI call
| Tool | Role | Remove it and... |
|---|---|---|
| Ghost DB | Cloud PostgreSQL with instant forking. Stores all data + enables safe fork-test-promote remediation | No safe remediation. Fixes hit production directly |
| Aerospike | Sub-millisecond key-value store. Caches baselines, pattern-matches past anomalies, stores metrics | No anomaly detection baselines. No fix reuse. Every anomaly needs a Claude call |
| Airbyte | Pipeline orchestration. Manages data sync from 5 sources, monitors pipeline health | No data flows in. Nothing to monitor |
| Claude API | AI reasoning engine. Diagnoses anomalies, generates fix SQL, powers dashboard chatbot | No intelligent diagnosis. No fix generation. No chatbot |
| Anomaly | Detection | Fix | Result |
|---|---|---|---|
NULL spike — 24.2% of event_type was NULL (threshold: 15%) |
Z-score against Aerospike baseline | UPDATE raw_events SET event_type = 'unknown' WHERE event_type IS NULL |
Fork → fix → validate → promoted |
| Second NULL spike — 22.5% NULL (repeat pattern) | Aerospike pattern match found cached fix | Reused fix — no Claude call needed | Instant fix from cache |
Schema drift — value column dropped |
Schema diff detected missing column | ALTER TABLE raw_events ADD COLUMN value DOUBLE PRECISION DEFAULT 0 |
Fork → fix → validate → promoted |
raw_events — Pipeline data
| Column | Type | Description |
|---|---|---|
| id | SERIAL | Primary key |
| source | TEXT | Origin platform (github-api, stripe-webhooks, etc.) |
| event_type | TEXT | Event name (user.created, payment.processed, etc.) |
| payload | JSONB | Event data (userId, amount, region, timestamp) |
| value | DOUBLE PRECISION | Numeric value for aggregation |
| ingested_at | TIMESTAMPTZ | Ingestion timestamp |
dq_metrics — Quality check results
| Column | Type | Description |
|---|---|---|
| check_name | TEXT | null_rate, duplicate_rate, row_count, schema_drift |
| table_name | TEXT | Table checked |
| metric_value | DOUBLE PRECISION | Measured value |
| threshold | DOUBLE PRECISION | Anomaly threshold |
| is_anomaly | BOOLEAN | Whether threshold was exceeded |
remediations — Fix audit log
| Column | Type | Description |
|---|---|---|
| action | TEXT | fork_and_fix |
| fork_db_id | TEXT | Ghost DB fork ID used |
| status | TEXT | pending, promoted, failed |
| fix_sql | TEXT | SQL that was applied |
| diagnosis | TEXT | Claude's root cause analysis |
| Set | Records | Purpose |
|---|---|---|
metrics |
Time-series | Every quality check result with timestamps |
baselines |
Per-column | Statistical profiles: mean, stddev, sample count |
anomaly_fingerprints |
Per-incident | Pattern cache: 64-dim vector embedding, fix SQL, success/fail |
agent_state |
Singleton | Agent status, cycle count, uptime, remediation counts |
| Source | Simulates |
|---|---|
| github-api | Developer activity events |
| stripe-webhooks | Payment processing events |
| salesforce-events | CRM data changes |
| segment-tracking | User analytics events |
| snowflake-cdc | Data warehouse change capture |
├── src/
│ ├── agent.ts # Core agent loop — detect, reason, fix
│ ├── ghost_manager.ts # Ghost DB client — create, fork, promote, delete
│ ├── aerospike_manager.ts # Aerospike client — metrics, baselines, vector search
│ ├── airbyte_manager.ts # Airbyte pipeline — sync, health checks
│ ├── claude_client.ts # Claude API — anomaly diagnosis + fix generation
│ ├── quality_checks.ts # Statistical checks — nulls, dupes, schema drift
│ ├── api_server.ts # REST API + SSE streaming + chat endpoint
│ ├── config.ts # Environment config
│ └── logger.ts # Colored terminal logging
├── demo/
│ ├── demo.ts # Full demo runner — 4 phases with anomaly injection
│ └── data_generator.ts # Synthetic data + anomaly injectors
├── frontend/
│ ├── index.html # Dashboard — 3D viz, metrics, chatbot (single file)
│ └── serve.ts # Standalone frontend server
├── docs/ # Design specs and PRD
├── .env.example # Environment template
├── package.json
└── tsconfig.json
- Node.js 18+
- Ghost CLI installed and logged in
- Aerospike running (Docker or Codespace)
- Anthropic API key
# Clone
git clone https://github.com/Kush614/DataGuard.git
cd DataGuard
# Install dependencies
npm install
# Configure environment
cp .env.example .env
# Edit .env with your API keys
# Install and login to Ghost CLI
# Download from https://ghost.build
ghost login
# Start Aerospike (Docker)
npm run docker:up
# Run the full demo
npm run demo# Full demo with anomaly injection + dashboard
npm run demo
# Dashboard opens at http://localhost:3334
# Development mode with hot reload
npm run dev
# Type check
npm run typecheck| Variable | Description |
|---|---|
ANTHROPIC_API_KEY |
Claude API key from console.anthropic.com |
GHOST_DB_ID |
(Optional) Existing Ghost DB ID to reuse |
GHOST_DB_CONN |
(Optional) Existing Ghost DB connection string |
AEROSPIKE_HOST |
Aerospike server host (default: 127.0.0.1) |
AEROSPIKE_PORT |
Aerospike server port (default: 3000) |
AEROSPIKE_REST_PORT |
Aerospike REST gateway port (default: 8081) |
The real-time dashboard at http://localhost:3334 includes:
- Overview — 3D visualization of data flowing between Ghost DB, Aerospike, and Airbyte nodes. Live metrics cards, sparklines, data quality gauges
- Pipeline — Airbyte pipeline flow diagram, sync history, source/destination config
- Database — Ghost DB primary instance, row counts, schema viewer, fork timeline with status (active/promoted/deleted)
- Anomalies — Full anomaly list with severity, Claude's diagnosis, fix SQL, resolution status
- Metrics — Aerospike operations count, baselines table, anomaly fingerprint cache
- AI Chatbot — Floating chat panel powered by Claude. Ask natural language questions about the live system state. 8 pinned quick-start questions
The demo runner (npm run demo) executes 4 phases:
- Infrastructure Setup — Creates Ghost DB, initializes schema, connects to Aerospike, sets up Airbyte pipeline
- Baseline Collection — Syncs 200 clean records across 4 batches. Agent builds statistical baselines in Aerospike
- Anomaly Injection — Injects NULL spike (80 rows with NULL event_type) + schema drift (drops
valuecolumn). Agent detects and auto-remediates via fork-test-promote - Steady State — Continues monitoring. Second NULL spike injected — resolved instantly from Aerospike pattern cache (no Claude call)
| Operation | Latency |
|---|---|
| Aerospike metric read/write | < 1ms |
| Aerospike pattern search | < 2ms |
| Ghost DB fork creation | ~5s |
| Claude anomaly diagnosis | ~2s |
| Full detection-to-fix cycle | < 30s |
| Detection time vs industry avg | 5s vs 8-12 hours |
TypeScript, Node.js, Ghost DB, Aerospike, Airbyte, Claude API (Anthropic), PostgreSQL, Three.js, HTML/CSS, GitHub Codespaces, Docker
MIT