Skip to content

balasivapindra/python-data-api

Repository files navigation

Python Data API — FastAPI & Flask REST APIs with Kafka Integration

FastAPI Flask Kafka PostgreSQL AWS ECS Docker

Production-grade REST APIs built with FastAPI and Flask that consume trade data from Apache Kafka, persist to PostgreSQL, and serve real-time and historical endpoints. Deployed on AWS ECS Fargate behind an Application Load Balancer and API Gateway.

Architecture

graph LR
    K[Kafka Cluster] -->|trades topic| CS[Consumer Service]
    CS -->|persist| PG[(PostgreSQL)]
    CS -->|broadcast| WS[WebSocket Clients]
    PG --> FA[FastAPI]
    PG --> FL[Flask]
    FA --> ALB[Application Load Balancer]
    FL --> ALB
    ALB --> APIGW[API Gateway]
    APIGW --> C[Clients]
    R[(Redis Cache)] -.->|hot data| FA

    style K fill:#231F20,color:#fff
    style PG fill:#4169E1,color:#fff
    style FA fill:#009688,color:#fff
    style FL fill:#333,color:#fff
    style ALB fill:#FF9900,color:#fff
    style APIGW fill:#FF9900,color:#fff
    style R fill:#DC382D,color:#fff
Loading

Features

  • FastAPI async REST API with Pydantic v2 models and dependency injection
  • Flask app with blueprints, Marshmallow serialization, and app factory pattern
  • Kafka consumer running as a background task ingesting trade events
  • WebSocket endpoint streaming live trades to connected clients
  • JWT and API key authentication with configurable middleware
  • Token bucket rate limiting per client IP
  • Redis caching for frequently accessed metrics
  • SQLAlchemy async with PostgreSQL for persistence
  • Cursor-based pagination for efficient large dataset traversal
  • Structured JSON logging with correlation IDs for distributed tracing
  • Docker multi-stage build for optimized container images
  • Terraform IaC for AWS ECS Fargate, ALB, API Gateway, ECR, and RDS

API Endpoints

FastAPI (port 8000)

Method Path Description
GET /health Liveness probe
GET /ready Readiness probe (checks DB, Kafka)
GET /trades List trades (cursor-paginated)
GET /trades/{id} Get trade by ID
POST /trades Create a new trade
GET /metrics/daily Daily aggregated metrics
GET /metrics/realtime Real-time metrics from Kafka
WebSocket /ws/trades Live trade stream

Flask (port 5000)

Method Path Description
GET /health Health check
GET /api/v1/data/ List datasets
POST /api/v1/data/ Create dataset
GET /api/v1/data/{id} Get dataset
GET /api/v1/pipelines/ List pipelines
POST /api/v1/pipelines/ Create pipeline
POST /api/v1/pipelines/{id}/trigger Trigger pipeline run

Quick Start

Local Development with Docker

git clone https://github.com/balasivapindra/python-data-api.git
cd python-data-api

# Start all services (FastAPI, Flask, PostgreSQL, Redis, Kafka)
docker-compose up -d

# FastAPI docs: http://localhost:8000/docs
# Flask API:    http://localhost:5000/api/v1/data/

Without Docker

python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

# Ensure PostgreSQL, Redis, and Kafka are running locally
uvicorn fastapi_app.main:app --reload --port 8000

Running Tests

pytest tests/ -v --cov=fastapi_app --cov=flask_app --cov=common

Authentication

All non-health endpoints require authentication via one of two methods:

API Key — include in the request header:

X-API-Key: your-api-key

JWT Bearer Token — include in the Authorization header:

Authorization: Bearer eyJhbGciOiJIUzI1NiIs...

Rate Limiting

Requests are rate-limited using a token bucket algorithm:

  • 100 requests per 60-second window per client IP
  • Health check and documentation paths are exempt
  • Response headers include X-RateLimit-Limit and X-RateLimit-Remaining
  • Exceeding the limit returns 429 Too Many Requests with a Retry-After header

WebSocket Live Data

Connect to /ws/trades for real-time trade streaming:

import asyncio
import websockets
import json

async def stream_trades():
    async with websockets.connect("ws://localhost:8000/ws/trades") as ws:
        # Optionally filter by symbols
        await ws.send(json.dumps({"symbols": ["AAPL", "GOOG"]}))

        async for message in ws:
            trade = json.loads(message)
            print(trade)

asyncio.run(stream_trades())

AWS Deployment

Infrastructure is managed with Terraform in infra/terraform/:

cd infra/terraform
terraform init
terraform plan -var-file=production.tfvars
terraform apply -var-file=production.tfvars

This provisions:

  • ECS Fargate cluster with the application containers
  • Application Load Balancer with HTTPS termination
  • API Gateway with Lambda authorizer
  • ECR repository for container images
  • RDS PostgreSQL with encryption and automated backups

See docs/deployment.md for the full deployment guide.

Project Structure

python-data-api/
├── fastapi_app/          # Async FastAPI application
│   ├── models/           # Pydantic v2 data models
│   ├── routes/           # API route handlers
│   ├── services/         # Business logic layer
│   ├── middleware/        # Auth, rate limiting, logging
│   ├── db/               # SQLAlchemy async + repositories
│   └── kafka/            # Confluent Kafka client wrappers
├── flask_app/            # Flask application
│   ├── blueprints/       # Route blueprints
│   └── services/         # Business logic
├── common/               # Shared utilities
├── infra/terraform/      # AWS infrastructure as code
├── tests/                # Comprehensive pytest suite
└── docs/                 # API reference and deployment guide

License

MIT

updated

ecs

tests

About

FastAPI and Flask REST APIs with Kafka integration, JWT auth, and WebSocket streaming on AWS ECS

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors