Skip to content

ayoabass777/Sentinel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Sentinel

AI-augmented news intelligence pipeline — ingest news from multiple sources, extract structured data with LLMs, serve via API.

Python Kafka Delta Lake License

What It Does

Sentinel monitors news sources (GDELT, RSS feeds), fetches articles, uses LLMs to extract structured data (entities, sentiment, topics), and serves clean data through a REST API.

GDELT ──┐                                      
        ├─► Redis Dedup ─► Kafka ─► Fetcher ─► LLM Parser ─► Kafka ─► Bronze Writer ─► Delta Lake ─► API
RSS ────┘                           (txn)        (txn)                                Bronze/Silver

Key features:

  • Fan-in ingestion — Multiple sources into unified pipeline
  • 4-level deduplication — Redis (L1/L2) → Bronze key → Silver MERGE
  • Exactly-once processing — Kafka transactions end-to-end
  • Pluggable LLM — Swap OpenAI ↔ Anthropic ↔ DeepSeek via env var or --provider flag
  • Stateful content versioning — MERGE tracks content changes, bumps version, stores previous hash
  • Data quality — Freshness status (fresh/old/stale), ingestion lag, completeness, data value scoring
  • Late event tracking — Flag late data with freshness_status, don't drop it

Architecture

Sentinel Architecture

┌──────────────┐
│    GDELT     │──┐    ┌─────────────┐     ┌─────────────┐
│   Producer   │  ├───►│ Redis Dedup │────►│sentinel.urls│
└──────────────┘  │    │  L1+L2 keys │     └──────┬──────┘
┌──────────────┐  │    └─────────────┘            │
│     RSS      │──┘                               ▼
│   Producer   │                        ┌─────────────────────┐
└──────────────┘                        │ Fetcher (txn)       │
                                        │ begin→fetch→commit  │
                                        └──────────┬──────────┘
                                                   ▼
                                        ┌──────────────────┐
                                        │sentinel.raw_html │
                                        └──────────┬───────┘
                                                   ▼
                                        ┌─────────────────────┐
                                        │ LLM Parser (txn)    │
                                        │ extract + score     │
                                        └──────────┬──────────┘
                      ┌──────────┐                 │
                      │   DLQ    │◄── failures     │
                      │ Replayer │                 │
                      └──────────┘                 ▼
                                      ┌────────────────────────┐
                                      │sentinel.parsed_articles│
                                      └───────────┬────────────┘
                                                  ▼
                                        ┌──────────────────┐
                                        │  Bronze Writer   │
                                        └─────────┬────────┘
                                                  ▼
                                            ┌─────────────┐
                                            │Delta Bronze │
                                            │ (CDF)       │
                                            └──────┬──────┘
                                                   ▼
                                            ┌─────────────┐
                                            │Delta Silver │
                                            │  (MERGE)    │
                                            └──────┬──────┘
                                                   ▼
                                            ┌─────────────┐
                                            │  FastAPI +  │
                                            │  Dashboard  │
                                            └─────────────┘

Quick Start

Prerequisites

  • Python 3.12
  • Java 17 (for PySpark)
  • Docker & Docker Compose
  • OpenAI, Anthropic, or DeepSeek API key

Setup

# Clone
git clone https://github.com/ayoabass777/sentinel.git
cd sentinel

# Virtual environment
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
pip install -r requirements.txt

# Configure
cp .env.example .env
# Edit .env — add your LLM API key

# Start infrastructure
docker-compose up -d

# Verify (wait ~30s)
docker-compose ps  # All should be "healthy" or "running"

Run the Pipeline

# Terminal 1: Producer (ingests URLs)
python -m sentinel.producers.gdelt

# Terminal 2: Fetcher (downloads HTML)
python -m sentinel.consumers.fetcher 0

# Terminal 3: Parser (extracts with LLM)
python -m sentinel.consumers.llm_parser 0

# Terminal 4: Bronze Writer (persists to Delta Lake)
python -m sentinel.consumers.bronze_writer 0

Verify Data

# Check Bronze Delta table
python -c "from deltalake import DeltaTable; dt = DeltaTable('data/delta/bronze/articles'); print(f'Version: {dt.version()}, Rows: {len(dt.to_pyarrow_table())}')"

# Transform to Silver
python -m sentinel.transforms.bronze_to_silver

# Start API
uvicorn sentinel.api.main:app --reload --port 8000

# Query
curl http://localhost:8000/articles | jq .
curl http://localhost:8000/stats

Stop

docker-compose down      # Keep data
docker-compose down -v   # Wipe data

Configuration

Environment Variables

# Required
LLM_PROVIDER=openai              # 'openai', 'anthropic', or 'deepseek'
OPENAI_API_KEY=sk-...            # If using OpenAI
ANTHROPIC_API_KEY=sk-ant-...     # If using Anthropic
DEEPSEEK_API_KEY=sk-...          # If using DeepSeek

# Optional (defaults work for local)
KAFKA_BOOTSTRAP_SERVERS=localhost:9094
REDIS_HOST=localhost
REDIS_PORT=6379
LOG_LEVEL=INFO

LLM Provider

Switch providers without code changes:

# OpenAI (default)
LLM_PROVIDER=openai
OPENAI_MODEL=gpt-4o

# Anthropic
LLM_PROVIDER=anthropic
LLM_MODEL=claude-sonnet-4-20250514

# DeepSeek (cheapest)
LLM_PROVIDER=deepseek
DEEPSEEK_MODEL=deepseek-chat

# Or per-instance via CLI flag
python -m sentinel.consumers.llm_parser 0 --provider deepseek

Project Structure

sentinel/
├── docker-compose.yml          # Kafka + Redis + Postgres + UIs
├── requirements.txt
├── .env.example
├── Images/
│   ├── sentinel-architecture.svg   # Architecture diagram
│   └── Sentinel dashboard.png      # Dashboard screenshot
├── docs/
│   └── data_contracts.md           # Pydantic model documentation
├── src/sentinel/
│   ├── config.py               # All configuration
│   ├── contracts.py            # Pydantic models, dedup helpers
│   ├── producers/
│   │   ├── base.py             # Base producer (idempotent)
│   │   ├── gdelt.py            # GDELT news source
│   │   └── rss.py              # RSS feed source
│   ├── consumers/
│   │   ├── fetcher.py          # URL → HTML (transactional)
│   │   ├── llm_parser.py       # HTML → parsed_articles topic (transactional)
│   │   ├── bronze_writer.py    # parsed_articles → Delta Lake Bronze
│   │   └── dlq_replayer.py     # Exponential backoff replay
│   ├── llm/
│   │   ├── base.py             # Provider interface
│   │   ├── openai.py           # OpenAI implementation
│   │   ├── anthropic.py        # Anthropic implementation
│   │   ├── deepseek.py         # DeepSeek implementation
│   │   └── factory.py          # get_llm_provider()
│   ├── transforms/
│   │   ├── bronze_to_silver.py # PySpark + Delta MERGE (CDF incremental + full-scan)
│   │   └── migrate_jsonl_to_delta.py  # One-time JSONL → Delta migration
│   ├── dashboard/
│   │   └── index.html          # React dashboard
│   └── api/
│       └── main.py             # FastAPI (reads Silver Delta via PyArrow)
└── data/                       # Delta Lake tables

API Reference

Endpoint Method Description
/health GET Health check
/articles GET List articles (filters: company, topic, sentiment, source, freshness)
/articles/{url} GET Get single article
/companies/{name} GET Articles mentioning company
/search?q= GET Keyword search
/stats GET Pipeline statistics

Example

# All articles about "OpenAI"
curl "http://localhost:8000/companies/OpenAI"

# Filter by topic
curl "http://localhost:8000/articles?topic=funding"

# Search
curl "http://localhost:8000/search?q=artificial+intelligence"

How It Works

Deduplication (4 Levels)

Level Key Where TTL Catches
L1 sentinel:src:{source}:{id} Redis 24h Same source record
L2 sentinel:url:{normalized} Redis 6h Cross-source same URL
L3 url + kafka_ts Bronze Replay after commit
L4 url + content_hash Silver Update only if changed

Exactly-Once Guarantees

Kafka transactions wrap the entire read-process-write cycle:

producer.begin_transaction()
# Process message
# Write to downstream topic / Bronze
producer.send_offsets_to_transaction(...)
producer.commit_transaction()
# Atomic: all or nothing

Data Quality Scoring

Each article gets a data_value_score (0-1):

Dimension Weight Scoring
Freshness 40% 1.0 if <1h, 0.5 if <24h, 0.0 if >7d
Completeness 40% % of required fields filled
Accuracy 20% LLM confidence

Articles with data_value_score < 0.3 are flagged in Silver.

Dead Letter Queue

Failed messages retry with exponential backoff:

1 min → 5 min → 30 min → dead.letter (manual review)
Retryable Non-retryable
timeout, rate_limit, http_5xx http_4xx, schema_error, json_decode_error

Monitoring

UIs (Local)

Service URL
Kafka UI http://localhost:8082
Redis Commander http://localhost:8081
API Docs http://localhost:8000/docs

Logs

# Kafka
docker-compose logs -f kafka

# Check topic lag
docker exec sentinel_kafka /opt/kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe --group sentinel-fetcher

Production Considerations

Local Production
Docker Kafka Amazon MSK
Docker Redis ElastiCache
Local filesystem S3
Delta Lake (local) Delta Lake on S3
Manual runs ECS Fargate + KEDA autoscaling
localhost API ECS Fargate + ALB

Scaling

  • Producers: Add more sources (RSS feeds, other APIs)
  • Fetcher: Scale horizontally with consumer groups (max = partition count, default 3)
  • Parser: Scale with LLM rate limits in mind (max = partition count)
  • Silver: Bronze → Silver runs incrementally via CDF, with full-scan reconciliation fallback

Tech Stack

  • Streaming: Apache Kafka (KRaft mode, 3 partitions default)
  • Dedup: Redis (L1 source key + L2 URL key)
  • Storage: Delta Lake (Bronze/Silver, CDF-enabled)
  • Processing: PySpark (Bronze → Silver MERGE)
  • API: FastAPI + deltalake + PyArrow (Spark-free reads)
  • LLM: OpenAI / Anthropic / DeepSeek (pluggable via --provider flag)
  • Validation: Pydantic

Related Projects

This is part of a data engineering portfolio:

  1. Ballistics — Batch pipeline (API → Airflow → S3 → Postgres → dbt)
  2. Pulse — Streaming pipeline (Kafka → Postgres → dbt → Metabase)
  3. Sentinel — AI-augmented pipeline (Kafka → LLM → Delta Lake → FastAPI)

License

MIT

About

AI-augmented news intelligence pipeline — Kafka, Delta Lake, PySpark, LLM extraction (OpenAI/Anthropic/DeepSeek), 4-level dedup, stateful MERGE, FastAPI

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors