Architecture Overview

We will design an eCommerce system with microservices using FastAPI, PostgreSQL, and Redis. The architecture consists of:

User Service (Handles authentication, user profiles)

Product Service (Manages products, inventory)

Order Service (Processes orders, payments)

Payment Service (Handles transactions)

Cart Service (Manages user carts)

API Gateway (Single entry point for requests)

Message Queue (RabbitMQ/Kafka) (For asynchronous processing)

API Design

Each microservice exposes REST APIs:

User Service

POST /register → Register a new user

POST /login → Authenticate user

GET /users/{user_id} → Fetch user details

Product Service

GET /products → List all products

GET /products/{id} → Get product details

POST /products → Add a new product

PUT /products/{id} → Update product

DELETE /products/{id} → Delete product

Order Service

POST /orders → Create a new order

GET /orders/{order_id} → Get order details

Payment Service

POST /payments → Process payment

Cart Service

POST /cart/add → Add item to cart

GET /cart/{user_id} → View cart

Database Schema (PostgreSQL)

PostgreSQL for structured data storage.

User Table

In [None]:
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(255) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash TEXT NOT NULL
);

Product Table

In [None]:
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    price DECIMAL(10, 2) NOT NULL,
    stock INT NOT NULL
);

Orders Table

In [None]:
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INT REFERENCES users(id),
    total_price DECIMAL(10, 2) NOT NULL,
    status VARCHAR(50) DEFAULT 'Pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Microservices Implementation (FastAPI)

User Service (FastAPI)

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import bcrypt

app = FastAPI()

users_db = {}

class User(BaseModel):
    username: str
    email: str
    password: str

@app.post("/register")
def register(user: User):
    hashed_pw = bcrypt.hashpw(user.password.encode(), bcrypt.gensalt()).decode()
    
    if user.email in users_db:
        raise HTTPException(status_code=400, detail="User already exists")
    users_db[user.email] = {"username": user.username, "password": hashed_pw}
    return {"message": "User registered successfully"}


product service

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

products_db = {}

class Product(BaseModel):
    name: str
    description: str
    price: float
    stock: int

@app.post("/products")
def add_product(product: Product):
    product_id = len(products_db) + 1
    products_db[product_id] = product.dict()
    return {"id": product_id, "message": "Product added successfully"}

@app.get("/products")
def list_products():
    return products_db

Order Service

In [None]:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

orders_db = {}

class Order(BaseModel):
    user_id: int
    total_price: float

@app.post("/orders")
def create_order(order: Order):
    order_id = len(orders_db) + 1
    orders_db[order_id] = {"user_id": order.user_id, "total_price": order.total_price, "status": "Pending"}
    return {"order_id": order_id, "message": "Order placed successfully"}


Cart Service

In [None]:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

cart_db = {}

class CartItem(BaseModel):
    user_id: int
    product_id: int
    quantity: int

@app.post("/cart/add")
def add_to_cart(item: CartItem):
    cart_db.setdefault(item.user_id, []).append(item.dict())
    return {"message": "Item added to cart"}


API Gateway (FastAPI)

The API Gateway routes requests to the correct microservice.

In [None]:
from fastapi import FastAPI
import requests

app = FastAPI()

USER_SERVICE = "http://localhost:8001"
PRODUCT_SERVICE = "http://localhost:8002"

@app.post("/register")
def register(user: dict):
    response = requests.post(f"{USER_SERVICE}/register", json=user)
    return response.json()

@app.get("/products")
def get_products():
    response = requests.get(f"{PRODUCT_SERVICE}/products")
    return response.json()


Message Queue (RabbitMQ) for Async Processing

For order and payment processing, we use RabbitMQ.

Publishing Message (Order Service)

In [None]:
import pika, json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')

def place_order(order):
    channel.basic_publish(exchange='', routing_key='order_queue', body=json.dumps(order))


Consuming Message (Payment Service)

In [None]:
import pika, json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='order_queue')

def process_payment(ch, method, properties, body):
    order = json.loads(body)
    print(f"Processing payment for order: {order}")

channel.basic_consume(queue='order_queue', on_message_callback=process_payment, auto_ack=True)
channel.start_consuming()


Database Schema for Reviews

In [None]:
CREATE TABLE reviews (
    id SERIAL PRIMARY KEY,
    user_id INT REFERENCES users(id),
    product_id INT REFERENCES products(id),
    rating INT CHECK (rating BETWEEN 1 AND 5),
    comment TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Implementing the New Services

Review Service (FastAPI + PostgreSQL)

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import psycopg2

app = FastAPI()

conn = psycopg2.connect("dbname=ecommerce user=postgres password=secret")

class Review(BaseModel):
    user_id: int
    product_id: int
    rating: int
    comment: str

@app.post("/reviews")
def add_review(review: Review):
    cur = conn.cursor()
    cur.execute("INSERT INTO reviews (user_id, product_id, rating, comment) VALUES (%s, %s, %s, %s)",
                (review.user_id, review.product_id, review.rating, review.comment))
    conn.commit()
    return {"message": "Review added successfully"}

@app.get("/reviews/{product_id}")
def get_reviews(product_id: int):
    cur = conn.cursor()
    cur.execute("SELECT * FROM reviews WHERE product_id = %s", (product_id,))
    reviews = cur.fetchall()
    return {"reviews": reviews}


Search Service (FastAPI + Elasticsearch)

In [None]:
from fastapi import FastAPI
from elasticsearch import Elasticsearch

app = FastAPI()
es = Elasticsearch("http://localhost:9200")

@app.get("/search")
def search_products(q: str):
    query = {"query": {"multi_match": {"query": q, "fields": ["name", "description"]}}}
    res = es.search(index="products", body=query)
    return res["hits"]["hits"]


Recommendation Service (FastAPI + Machine Learning)

In [None]:
from fastapi import FastAPI
import random

app = FastAPI()

# Dummy ML-based recommendations
@app.get("/recommendations/{user_id}")
def get_recommendations(user_id: int):
    recommended_product_ids = random.sample(range(1, 100), 5)
    return {"user_id": user_id, "recommended_products": recommended_product_ids}


Analytics Service (Kafka + ClickHouse)

Kafka Producer (Event Logging in API Gateway)

In [None]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'))

def log_event(user_id, event_type, metadata):
    event = {"user_id": user_id, "event_type": event_type, "metadata": metadata}
    producer.send("analytics_events", event)


Kafka Consumer (Saving to ClickHouse)

In [None]:
from kafka import KafkaConsumer
import json
import clickhouse_driver

consumer = KafkaConsumer("analytics_events", bootstrap_servers="localhost:9092", value_deserializer=lambda x: json.loads(x.decode('utf-8')))
client = clickhouse_driver.Client(host="localhost")

for message in consumer:
    event = message.value
    client.execute("INSERT INTO analytics (user_id, event_type, metadata) VALUES", [(event["user_id"], event["event_type"], str(event["metadata"]))])


Updating API Gateway to Route Requests

In [None]:
from fastapi import FastAPI
import requests

app = FastAPI()

REVIEW_SERVICE = "http://localhost:8004"
SEARCH_SERVICE = "http://localhost:8005"
RECOMMENDATION_SERVICE = "http://localhost:8006"

@app.post("/reviews")
def add_review(review: dict):
    return requests.post(f"{REVIEW_SERVICE}/reviews", json=review).json()

@app.get("/search")
def search_products(q: str):
    return requests.get(f"{SEARCH_SERVICE}/search?q={q}").json()

@app.get("/recommendations/{user_id}")
def get_recommendations(user_id: int):
    return requests.get(f"{RECOMMENDATION_SERVICE}/recommendations/{user_id}").json()
