# **4. FastAPI**

---

## **4.1 Core Concepts**

### 1. Versioning Strategies

#### (a) Path-based versioning

```python
from fastapi import FastAPI

app = FastAPI(title="API Versioning Example")

@app.get("/v1/items")
def get_items_v1():
    return {"version": "v1", "items": ["apple", "banana"]}

@app.get("/v2/items")
def get_items_v2():
    return {"version": "v2", "items": [{"name": "apple"}, {"name": "banana"}]}
```

#### (b) Sub-application mounting

```python
from fastapi import FastAPI

v1 = FastAPI(title="API v1")
v2 = FastAPI(title="API v2")

@v1.get("/items")
def get_items_v1():
    return {"version": "v1", "items": ["apple", "banana"]}

@v2.get("/items")
def get_items_v2():
    return {"version": "v2", "items": [{"name": "apple"}, {"name": "banana"}]}

app = FastAPI(title="Versioned API")
app.mount("/v1", v1)
app.mount("/v2", v2)
```

---

### 2. Tags, Routes, and Metadata Handling

```python
from fastapi import FastAPI

tags_metadata = [
    {"name": "users", "description": "Operations related to users"},
    {"name": "items", "description": "Operations related to items"},
]

app = FastAPI(
    title="Tags Example API",
    version="1.0.0",
    description="API demonstrating tags and metadata",
    openapi_tags=tags_metadata,
)

@app.get("/users", tags=["users"])
def list_users():
    return ["Alice", "Bob"]

@app.get("/items", tags=["items"])
def list_items():
    return ["apple", "banana"]
```

---

### 3. Path Operations, Dependencies, and Response Models

#### (a) Path & Query Parameters

```python
from fastapi import FastAPI

app = FastAPI()

@app.get("/items/{item_id}")
def get_item(item_id: int, q: str | None = None):
    return {"item_id": item_id, "query": q}
```

#### (b) Dependencies

```python
from fastapi import FastAPI, Depends

app = FastAPI()

def pagination(skip: int = 0, limit: int = 10):
    return {"skip": skip, "limit": limit}

@app.get("/products")
def read_products(pagination: dict = Depends(pagination)):
    return {"pagination": pagination, "products": ["laptop", "phone"]}
```

#### (c) Response Models

```python
from fastapi import FastAPI
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

app = FastAPI()

@app.get("/users/{user_id}", response_model=User)
def get_user(user_id: int):
    return User(id=user_id, name="Alice", email="alice@example.com")
```

---

## **4.2 OpenAPI / Swagger**

### 1. Customizing Swagger UI

```python
from fastapi import FastAPI

app = FastAPI(
    title="Custom Swagger Example",
    description="Customized Swagger & ReDoc paths",
    version="1.0.0",
    docs_url="/docs-ui",       # Swagger UI
    redoc_url="/redoc-ui",     # ReDoc
    openapi_url="/api-schema.json"  # OpenAPI JSON
)

@app.get("/ping")
def ping():
    return {"message": "pong"}
```

---

### 2. Custom OpenAPI Schema

```python
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi

app = FastAPI()

@app.get("/items/{item_id}")
def read_item(item_id: int):
    return {"item_id": item_id}

# Override OpenAPI schema
def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema
    
    openapi_schema = get_openapi(
        title="My Custom API",
        version="2.0.0",
        description="Custom OpenAPI schema with branding",
        routes=app.routes,
    )
    openapi_schema["info"]["x-logo"] = {
        "url": "https://fastapi.tiangolo.com/img/logo-margin/logo-teal.png"
    }
    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi
```

---

### 3. Adding Metadata, Descriptions, and Security Schemes

#### (a) Metadata & Descriptions

```python
from fastapi import FastAPI

tags_metadata = [
    {"name": "auth", "description": "Authentication & Authorization"},
    {"name": "users", "description": "Manage user accounts"},
]

app = FastAPI(
    title="Metadata Example API",
    description="API demonstrating metadata and tags",
    version="1.0.0",
    openapi_tags=tags_metadata,
)
```

#### (b) Security Schemes

```python
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI(title="Security Schemes Example")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/secure-data")
def secure_data(token: str = Depends(oauth2_scheme)):
    return {"message": "You are authorized", "token": token}
```

---

# **4.3 Middlewares**

---

## 1. Logging, Timing, and CORS Middlewares

### (a) Request Logging + Timing

```python
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = (time.time() - start_time) * 1000
    print(f"{request.method} {request.url} completed in {process_time:.2f} ms")
    response.headers["X-Process-Time"] = str(process_time)
    return response
```

✅ This logs every request and adds an `X-Process-Time` header.

---

### (b) CORS Middleware

FastAPI provides built-in CORS support:

```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],        # allow all origins (not for prod!)
    allow_credentials=True,
    allow_methods=["*"],        # allow all HTTP methods
    allow_headers=["*"],        # allow all headers
)
```

---

## 2. Exception Handling Middleware

### (a) Custom Exception + Handler

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

class CustomException(Exception):
    def __init__(self, name: str):
        self.name = name

app = FastAPI()

@app.exception_handler(CustomException)
async def custom_exception_handler(request: Request, exc: CustomException):
    return JSONResponse(
        status_code=400,
        content={"error": f"Oops! Something went wrong with {exc.name}"},
    )

@app.get("/boom")
def trigger_error():
    raise CustomException(name="test")
```

---

### (b) Middleware-Level Exception Handling

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def exception_middleware(request: Request, call_next):
    try:
        return await call_next(request)
    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})
```

✅ This catches all unhandled exceptions globally.

---

## 3. Custom Authentication / Authorization Middleware

### (a) Token-Based Authentication (Async)

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def auth_middleware(request: Request, call_next):
    if request.url.path.startswith("/public"):
        return await call_next(request)

    token = request.headers.get("Authorization")
    if token != "Bearer secrettoken123":
        return JSONResponse(status_code=401, content={"error": "Unauthorized"})
    
    return await call_next(request)

@app.get("/public")
def public_route():
    return {"message": "Anyone can access"}

@app.get("/private")
def private_route():
    return {"message": "Authorized users only"}
```

---

### (b) Role-Based Authorization (Sync Style inside Async Middleware)

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

fake_users = {
    "token-admin": {"role": "admin"},
    "token-user": {"role": "user"},
}

@app.middleware("http")
async def role_auth_middleware(request: Request, call_next):
    token = request.headers.get("Authorization")
    user = fake_users.get(token)
    
    if not user:
        return JSONResponse(status_code=401, content={"error": "Unauthorized"})
    
    # Sync-style logic inside async middleware
    if request.url.path.startswith("/admin") and user["role"] != "admin":
        return JSONResponse(status_code=403, content={"error": "Forbidden: Admins only"})
    
    return await call_next(request)

@app.get("/admin")
def admin_route():
    return {"message": "Welcome Admin"}

@app.get("/dashboard")
def dashboard():
    return {"message": "Welcome User"}
```

---

# 🔹 **Middlewares in FastAPI**

In FastAPI (built on Starlette), **middleware** is code that runs **before and after each request**.
It’s useful for **cross-cutting concerns** like logging, security, performance, etc.

---

## ✅ Types of Middlewares

We can group them into:

### 1. **Built-in / Provided Middlewares**

FastAPI (via Starlette) already provides some common middlewares.

* **CORS Middleware** → Cross-Origin requests handling.
* **TrustedHost Middleware** → Restrict allowed hosts.
* **GZip Middleware** → Response compression.
* **HTTPSRedirect Middleware** → Redirect HTTP → HTTPS.
* **Session Middleware** → Cookie-based sessions.

---

### 2. **Custom Middlewares**

You can write your own:

* **Logging & timing middleware**
* **Exception handling middleware**
* **Authentication / Authorization middleware**
* **Rate limiting middleware**
* **Request/Response transformation middleware**

---

## 🚀 Code Snippets for Each

### (A) **CORS Middleware**

```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],        # or specific domains
    allow_methods=["*"],
    allow_headers=["*"],
    allow_credentials=True,
)

@app.get("/")
def home():
    return {"message": "CORS Enabled!"}
```

---

### (B) **TrustedHost Middleware**

```python
from fastapi import FastAPI
from starlette.middleware.trustedhost import TrustedHostMiddleware

app = FastAPI()

app.add_middleware(
    TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"]
)

@app.get("/")
def home():
    return {"message": "Allowed host!"}
```

---

### (C) **GZip Middleware**

```python
from fastapi import FastAPI
from starlette.middleware.gzip import GZipMiddleware

app = FastAPI()

app.add_middleware(GZipMiddleware, minimum_size=1000)  # compress responses >1KB

@app.get("/big-response")
def big_response():
    return {"data": "x" * 2000}
```

---

### (D) **HTTPS Redirect Middleware**

```python
from fastapi import FastAPI
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware

app = FastAPI()

app.add_middleware(HTTPSRedirectMiddleware)

@app.get("/")
def secure_home():
    return {"message": "Redirected to HTTPS"}
```

---

### (E) **Session Middleware**

```python
from fastapi import FastAPI, Request
from starlette.middleware.sessions import SessionMiddleware

app = FastAPI()

app.add_middleware(SessionMiddleware, secret_key="supersecret")

@app.get("/set-session")
def set_session(request: Request):
    request.session["user"] = "Alice"
    return {"message": "Session set"}

@app.get("/get-session")
def get_session(request: Request):
    user = request.session.get("user", "Guest")
    return {"user": user}
```

---

### (F) **Custom Logging & Timing Middleware**

```python
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def log_time_middleware(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = (time.time() - start) * 1000
    print(f"{request.method} {request.url} took {duration:.2f} ms")
    response.headers["X-Process-Time"] = f"{duration:.2f}ms"
    return response
```

---

### (G) **Custom Exception Middleware**

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def exception_middleware(request: Request, call_next):
    try:
        return await call_next(request)
    except Exception as e:
        return JSONResponse(status_code=500, content={"error": str(e)})
```

---

### (H) **Custom Authentication Middleware**

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def auth_middleware(request: Request, call_next):
    if request.url.path.startswith("/public"):
        return await call_next(request)

    token = request.headers.get("Authorization")
    if token != "Bearer secrettoken123":
        return JSONResponse(status_code=401, content={"error": "Unauthorized"})
    
    return await call_next(request)

@app.get("/public")
def public_route():
    return {"message": "Public Access"}

@app.get("/private")
def private_route():
    return {"message": "Private Access"}
```

---

# ✅ Summary

📌 Types of Middlewares:

1. **Built-in** (CORS, GZip, TrustedHost, HTTPSRedirect, Session).
2. **Custom** (Logging, Exception Handling, Auth, Rate Limiting, etc.).

📌 Use cases:

* **Security** → Auth, TrustedHost, HTTPSRedirect.
* **Performance** → Logging, Timing, GZip compression.
* **Cross-Origin** → CORS.
* **State management** → Sessions.
* **Reliability** → Exception handling.




# **4.4 Advanced Features**

---

## 1. Background Tasks

Use FastAPI’s `BackgroundTasks` for lightweight async-safe tasks.

```python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")

@app.post("/send-email/")
async def send_email(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"Email sent to {email}")
    return {"message": f"Email to {email} scheduled"}
```

---

## 2. Event Handlers (Startup & Shutdown)

Useful for DB connections, cache initialization, etc.

```python
from fastapi import FastAPI

app = FastAPI()

@app.on_event("startup")
async def on_startup():
    print("🚀 App started")

@app.on_event("shutdown")
async def on_shutdown():
    print("🛑 App shutting down")
```

---

## 3. Dependency Overrides

Handy for testing and mocking.

```python
from fastapi import FastAPI, Depends

app = FastAPI()

def get_db():
    return "real_database"

@app.get("/data")
def read_data(db=Depends(get_db)):
    return {"db": db}

# Override for testing
def fake_db():
    return "fake_database"

app.dependency_overrides[get_db] = fake_db
```

---

## 4. Async Streaming Responses (large CSV/JSON)

Great for large datasets.

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import csv
import io
import asyncio

app = FastAPI()

# Sync example
@app.get("/csv-sync")
def get_csv_sync():
    def generate():
        buffer = io.StringIO()
        writer = csv.writer(buffer)
        writer.writerow(["id", "name"])
        for i in range(5):
            writer.writerow([i, f"user_{i}"])
            yield buffer.getvalue()
            buffer.seek(0)
            buffer.truncate(0)
    return StreamingResponse(generate(), media_type="text/csv")

# Async example
@app.get("/csv-async")
async def get_csv_async():
    async def generate():
        for i in range(5):
            yield f"{i},user_{i}\n"
            await asyncio.sleep(0.5)
    return StreamingResponse(generate(), media_type="text/csv")
```

---

## 5. WebSockets (Chat, Real-time Notifications)

```python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"You said: {data}")
```

---

## 6. Server-Sent Events (SSE)

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/sse")
async def sse():
    async def event_stream():
        for i in range(5):
            yield f"data: Message {i}\n\n"
            await asyncio.sleep(1)
    return StreamingResponse(event_stream(), media_type="text/event-stream")
```

---

## 7. Background Tasks with **Celery**

For heavy background jobs (instead of `BackgroundTasks`).

```python
# worker.py
from celery import Celery

celery_app = Celery("tasks", broker="redis://localhost:6379/0")

@celery_app.task
def send_email_task(email: str):
    print(f"📧 Sending email to {email}")
    return f"Email sent to {email}"
```

```python
# main.py
from fastapi import FastAPI
from worker import send_email_task

app = FastAPI()

@app.post("/send-email/{email}")
def send_email(email: str):
    task = send_email_task.delay(email)
    return {"task_id": task.id, "status": "queued"}
```

---

## 8. Rate Limiting & Throttling

Using **slowapi** (Flask-Limiter style).

```python
from fastapi import FastAPI
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.responses import JSONResponse

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter

@app.exception_handler(RateLimitExceeded)
def rate_limit_handler(request, exc):
    return JSONResponse(
        status_code=429,
        content={"error": "Too Many Requests. Try again later."},
    )

@app.get("/limited")
@limiter.limit("5/minute")
def limited_endpoint():
    return {"message": "This is a rate-limited endpoint"}
```

---

✅ With these, you now have:

1. Background tasks (lightweight).
2. Event handlers (startup/shutdown).
3. Dependency overrides.
4. Sync & async streaming responses.
5. WebSockets (chat).
6. Server-Sent Events (SSE).
7. Heavy background tasks (Celery).
8. Rate limiting (SlowAPI).

---

* **Celery** → robust, widely used, advanced features (retries, schedules).
* **RQ (Redis Queue)** → lightweight, simpler, quick to set up.

---

# **4.4 Advanced Features (Extended)**

---

## 7b. Background Tasks with **RQ (Redis Queue)**

### Install RQ

```bash
pip install rq redis
```

---

### Worker (runs separately)

```python
# worker.py
import time
from rq import Worker, Queue, Connection
import redis

redis_conn = redis.Redis(host="localhost", port=6379, db=0)
queue = Queue(connection=redis_conn)

def send_email_task(email: str):
    print(f"📧 Sending email to {email}...")
    time.sleep(2)  # simulate delay
    print(f"✅ Email sent to {email}")
    return f"Email sent to {email}"

if __name__ == "__main__":
    with Connection(redis_conn):
        worker = Worker([queue])
        worker.work()
```

---

### FastAPI App

```python
# main.py
from fastapi import FastAPI
from rq import Queue
import redis
from worker import send_email_task

app = FastAPI()

redis_conn = redis.Redis(host="localhost", port=6379, db=0)
task_queue = Queue(connection=redis_conn)

@app.post("/send-email/{email}")
def send_email(email: str):
    job = task_queue.enqueue(send_email_task, email)
    return {"job_id": job.id, "status": "queued"}
```

---

### Running

1. Start Redis server.
2. Start RQ worker:

   ```bash
   python worker.py
   ```
3. Run FastAPI:

   ```bash
   uvicorn main:app --reload
   ```
4. Call `/send-email/{email}` → job gets queued and executed by RQ worker.

---




# 1) JWT Tokens: HS256 vs RS256

```python
# Install: pip install fastapi uvicorn pyjwt cryptography
from fastapi import FastAPI, HTTPException, Depends
import jwt   # PyJWT
from datetime import datetime, timedelta
from typing import Dict

app = FastAPI()

# HS256 (shared secret)
HS_SECRET = "super-secret-key"
def create_hs256(payload: Dict, expires_minutes=15):
    payload = payload.copy()
    payload.update({"exp": datetime.utcnow() + timedelta(minutes=expires_minutes)})
    token = jwt.encode(payload, HS_SECRET, algorithm="HS256")
    return token

def verify_hs256(token: str):
    try:
        return jwt.decode(token, HS_SECRET, algorithms=["HS256"])
    except jwt.PyJWTError:
        raise HTTPException(401, "Invalid token")

# RS256 (asymmetric) - generate keys externally or via cryptography
# Example PEM strings (in production load from files)
RSA_PRIVATE_PEM = """-----BEGIN RSA PRIVATE KEY-----
...your private key...
-----END RSA PRIVATE KEY-----"""
RSA_PUBLIC_PEM = """-----BEGIN PUBLIC KEY-----
...your public key...
-----END PUBLIC KEY-----"""

def create_rs256(payload: Dict, expires_minutes=15):
    payload = payload.copy()
    payload.update({"exp": datetime.utcnow() + timedelta(minutes=expires_minutes)})
    token = jwt.encode(payload, RSA_PRIVATE_PEM, algorithm="RS256")
    return token

def verify_rs256(token: str):
    try:
        return jwt.decode(token, RSA_PUBLIC_PEM, algorithms=["RS256"])
    except jwt.PyJWTError:
        raise HTTPException(401, "Invalid token")

# Usage sample endpoints
@app.get("/token-hs")
def token_hs():
    return {"token": create_hs256({"sub": "user:1"})}

@app.get("/token-rs")
def token_rs():
    return {"token": create_rs256({"sub": "user:1"})}
```

---

# 2) Authentication vs Authorization (short note + snippet)

**Note (Markdown):**

* **Authentication** = prove who you are (login).
* **Authorization** = what you are allowed to do (roles/permissions).
* Typical pattern: authenticate (issue token) → authorize (check token claims + roles).

```python
# Example: token contains role claim; route checks permissions
from fastapi import Security
def authorize_roles(required_roles: set, token_claims: dict):
    user_roles = set(token_claims.get("roles", []))
    if not (user_roles & required_roles):
        raise HTTPException(403, "Forbidden")

@app.get("/admin-only")
def admin_only(token: str = Depends(lambda: verify_hs256("..."))):
    # token is dict of claims
    authorize_roles({"admin"}, token)
    return {"msg": "Welcome admin"}
```

---

# 3) Basic Auth

```python
# pip install fastapi[all]
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets
security = HTTPBasic()

@app.get("/basic")
def basic_auth(creds: HTTPBasicCredentials = Depends(security)):
    # validate with constant-time compare
    correct_username = secrets.compare_digest(creds.username, "alice")
    correct_password = secrets.compare_digest(creds.password, "s3cr3t")
    if not (correct_username and correct_password):
        raise HTTPException(401, "Invalid credentials")
    return {"user": creds.username}
```

---

# 4) OAuth2

## 4.1 OAuth2 Password Flow (FastAPI built-in helper)

```python
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.post("/token")
def token(form_data: OAuth2PasswordRequestForm = Depends()):
    # validate user/password (lookup DB)
    if form_data.username != "alice" or form_data.password != "secret":
        raise HTTPException(400, "Incorrect username or password")
    access = create_hs256({"sub": f"user:{form_data.username}", "scope": "user"}, expires_minutes=30)
    refresh = create_hs256({"sub": f"user:{form_data.username}", "type": "refresh"}, expires_minutes=60*24*7)
    return {"access_token": access, "token_type": "bearer", "refresh_token": refresh}
```

## 4.2 OAuth2 Authorization Code Flow (simplified)

```python
# This is a minimal illustrative flow: /authorize -> redirect to client -> /callback
from fastapi import Request
from fastapi.responses import RedirectResponse

CLIENT_ID = "myclient"
CLIENT_REDIRECT = "https://myclient/callback"

@app.get("/authorize")
def authorize(response_type: str, client_id: str, redirect_uri: str, scope: str = ""):
    # validate client_id and redirect_uri against DB
    # present login/consent UI (skipped)
    # create short-lived authorization code (store server-side)
    auth_code = "abc123"  # store & map to user + client
    return RedirectResponse(f"{redirect_uri}?code={auth_code}")

@app.post("/token_from_code")
def token_from_code(code: str, client_id: str, client_secret: str):
    # validate code and client, then exchange for tokens
    return {"access_token": create_hs256({"sub":"user:1"}), "token_type": "bearer"}
```

---

# 5) API Key

```python
from fastapi.security.api_key import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

API_KEYS = {"key123": {"owner": "service-A", "scopes": ["read"]}}

def get_api_key(api_key: str = Depends(api_key_header)):
    if not api_key or api_key not in API_KEYS:
        raise HTTPException(403, "Invalid API Key")
    return API_KEYS[api_key]

@app.get("/internal")
def internal_service(api_key=Depends(get_api_key)):
    return {"ok": True, "owner": api_key["owner"]}
```

---

# 6) Token management: Access token, Refresh token, Revoke flows

```python
# Use Redis for token blacklisting / revocation
# pip install redis
import redis, uuid
r = redis.Redis()

def issue_tokens(user_id: int):
    access = create_hs256({"sub": f"user:{user_id}", "jti": str(uuid.uuid4())}, expires_minutes=15)
    refresh_id = str(uuid.uuid4())
    refresh = create_hs256({"sub": f"user:{user_id}", "jti": refresh_id, "type": "refresh"}, expires_minutes=60*24*7)
    # store refresh in redis so it can be revoked server-side
    r.set(f"refresh:{refresh_id}", user_id, ex=60*60*24*7)
    return access, refresh

def revoke_refresh(refresh_jti: str):
    r.delete(f"refresh:{refresh_jti}")

def is_blacklisted(jti: str):
    return r.exists(f"blk:{jti}")

# On logout: add access jti to blacklist with TTL equal to token remaining lifetime:
def blacklist_access(jti: str, ttl_seconds: int):
    r.set(f"blk:{jti}", 1, ex=ttl_seconds)
```

---

# 7) Device / Session handling

```python
# Model idea: sessions table (id, user_id, device_info, jti, created_at, last_seen, revoked)
# Example pseudo-code to register a device session
def create_session(user_id, device_info):
    session_id = str(uuid.uuid4())
    access, refresh = issue_tokens(user_id)
    # Insert session entry into DB with device_info and refresh_jti
    # return tokens + session metadata
    return {"access": access, "refresh": refresh, "session_id": session_id}

# Device fingerprinting (very basic):
def fingerprint_from_headers(request):
    return f"{request.client.host}|{request.headers.get('user-agent')}"
```

---

# 8) Token Blacklisting (Redis example — covered above)

* Add access token `jti` to Redis blacklist on logout.
* On every authenticated request, check `if is_blacklisted(claims['jti']) -> 401`.

---

# 9) OAuth2 + OpenID Connect (note + sample claims)

**Note:** OIDC is OAuth2 + ID Token (JWT) carrying standardized claims (`iss`, `sub`, `aud`, `exp`, `nonce`, `email`, `email_verified`).

```python
# Create ID token (OIDC-like)
def create_id_token(user_id: int, client_id: str):
    claims = {"iss": "https://auth.example.com", "sub": f"user:{user_id}", "aud": client_id, "exp": datetime.utcnow()+timedelta(minutes=5)}
    id_token = create_rs256(claims)  # RS256 recommended for ID tokens
    return id_token
```

---

# 10) Refresh token rotation

```python
# On use of a refresh token, invalidate the old refresh token (delete from DB/Redis),
# issue a fresh refresh token and store it.
def rotate_refresh(old_refresh_jti, user_id):
    # validate existence in DB/Redis
    if not r.get(f"refresh:{old_refresh_jti}"):
        raise HTTPException(401, "Invalid refresh")
    # revoke old
    r.delete(f"refresh:{old_refresh_jti}")
    # issue new refresh
    _, new_refresh = issue_tokens(user_id)
    return new_refresh
```

---

# 11) Password policies & reset flows

```python
# pip install passlib[bcrypt]
from passlib.context import CryptContext
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(plain: str):
    return pwd_ctx.hash(plain)

def verify_password(plain: str, hashed: str):
    return pwd_ctx.verify(plain, hashed)

# Password policy check (example)
import re
def validate_password(pw: str):
    if len(pw) < 10: return False
    if not re.search(r"[A-Z]", pw): return False
    if not re.search(r"[a-z]", pw): return False
    if not re.search(r"[0-9]", pw): return False
    if not re.search(r"[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>\/?]", pw): return False
    return True

# Reset flow (email with token)
def create_password_reset_token(user_id):
    token = create_hs256({"sub": f"user:{user_id}", "action": "pwd_reset"}, expires_minutes=30)
    # send email with link /reset?token=...
    return token
```

---

# 12) 2FA / MFA integration (TOTP with pyotp)

```python
# pip install pyotp qrcode
import pyotp

def generate_totp_secret():
    secret = pyotp.random_base32()
    return secret

def get_totp_uri(secret, username, issuer="MyApp"):
    return pyotp.totp.TOTP(secret).provisioning_uri(name=username, issuer_name=issuer)

# Verify TOTP code
def verify_totp(secret, code):
    t = pyotp.TOTP(secret)
    return t.verify(code, valid_window=1)

# Example: during login, after password verified, ask for TOTP and call verify_totp
```

---

# 13) Anomaly detection (login attempts / brute force)

```python
# Very simple rate-limit style using Redis counters
def check_bruteforce(ip: str, user: str):
    key = f"fail:{ip}"
    fails = r.incr(key)
    if fails == 1:
        r.expire(key, 60*15)  # TTL 15 min
    if fails > 10:
        # mark suspicious; require captcha / block
        raise HTTPException(429, "Too many attempts")
```

For production, use ML models or libraries (detect impossible travel, device change, velocity checks). Store login history and analyze.

---

# 14) API security: HMAC, signed URLs, IP whitelist/blacklist

## HMAC example for API payloads

```python
import hmac, hashlib, base64

def sign_payload(secret: str, payload: bytes) -> str:
    sig = hmac.new(secret.encode(), payload, hashlib.sha256).digest()
    return base64.b64encode(sig).decode()

def verify_signature(secret: str, payload: bytes, signature_b64: str) -> bool:
    expected = sign_payload(secret, payload)
    return hmac.compare_digest(expected, signature_b64)
```

## Signed (expiring) URL

```python
from urllib.parse import urlencode, urljoin
def create_signed_url(base_url, path, secret, expires_seconds=300):
    expires = int(datetime.utcnow().timestamp()) + expires_seconds
    data = f"{path}|{expires}"
    sig = hmac.new(secret.encode(), data.encode(), hashlib.sha256).hexdigest()
    qs = urlencode({"expires": expires, "sig": sig})
    return f"{urljoin(base_url,path)}?{qs}"

def validate_signed_url(path, expires, sig, secret):
    if int(expires) < int(datetime.utcnow().timestamp()):
        return False
    expected = hmac.new(secret.encode(), f"{path}|{expires}".encode(), hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, sig)
```

## IP whitelist middleware (very simple)

```python
from starlette.middleware.base import BaseHTTPMiddleware

class IPWhitelistMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, allowed_ips):
        super().__init__(app); self.allowed = set(allowed_ips)
    async def dispatch(self, request, call_next):
        client = request.client.host
        if client not in self.allowed:
            return JSONResponse({"detail": "Forbidden"}, status_code=403)
        return await call_next(request)

app.add_middleware(IPWhitelistMiddleware, allowed_ips=["127.0.0.1"])
```

---

# 15) CSRF

**Notes:** For APIs using `Authorization: Bearer` tokens, CSRF is less of an issue. If you use cookies for auth, implement CSRF tokens.

```python
# Example: set SameSite=strict cookies + CSRF header check
from fastapi import Request

def generate_csrf():
    return secrets.token_urlsafe(32)

# When rendering forms -> embed CSRF token in page and set cookie with SameSite=Lax/Strict
# On POST endpoints:
def validate_csrf(request: Request):
    csrf_cookie = request.cookies.get("csrf")
    csrf_header = request.headers.get("x-csrf-token")
    if not csrf_cookie or not csrf_header or not secrets.compare_digest(csrf_cookie, csrf_header):
        raise HTTPException(403, "CSRF validation failed")
```

---

# 16) HTTPS enforcement & HSTS

```python
# Enforce HSTS header with middleware
from starlette.responses import Response

@app.middleware("http")
async def hsts_middleware(request, call_next):
    resp = await call_next(request)
    resp.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains; preload"
    return resp

# When deploying: always terminate TLS at reverse proxy (NGINX, Caddy, Cloudflare).
# Set cookies with Secure=True and SameSite options.
```

---

### Final notes / recommended libs & infra

* Use **RS256** for ID tokens / when third parties verify tokens; **HS256** is simpler for internal services.
* Use **Redis** for short-lived state: rate limits, blacklists, refresh token store.
* Use **pyotp** for TOTP MFA; use an authenticator app (Google Authenticator / Authy).
* Consider a full OAuth2 / OIDC provider (Keycloak, Auth0, Okta,ORY Hydra) if you need standards compliance.
* Always run behind a reverse proxy (NGINX/Caddy) for TLS, CORS, rate-limiting.
* Log suspicious events for later analysis and build an alerting pipeline.

---




## **JWT Tokens: HS256 vs RS256**

### **1️⃣ HS256 (HMAC + SHA256) Example**

```python
"""
HS256 JWT Example
- Symmetric algorithm: same secret key for signing and verification
- Header, payload, and optional claims included
"""

from datetime import datetime, timedelta
import jwt  # PyJWT library

# ----------------------
# Secret Key (must be kept safe)
# ----------------------
SECRET_KEY = "my_super_secret_key"

# ----------------------
# Function to create a JWT token
# ----------------------
def create_hs256_token(
    data: dict,           # Payload data (custom claims)
    expires_in_minutes: int = 15  # Token expiry
):
    """
    Create HS256 JWT token
    """
    # Payload: required + optional claims
    payload = data.copy()
    payload.update({
        "exp": datetime.utcnow() + timedelta(minutes=expires_in_minutes),  # Required: expiration time
        "iat": datetime.utcnow(),  # Optional: issued at time
        "nbf": datetime.utcnow(),  # Optional: not valid before
        "iss": "my_app",           # Optional: issuer
        "aud": "my_users"          # Optional: audience
    })

    # Header (PyJWT sets default: {"alg": "HS256", "typ": "JWT"})
    headers = {
        "alg": "HS256",  # Signing algorithm
        "typ": "JWT"     # Type of token
    }

    # Encode token
    token = jwt.encode(payload, SECRET_KEY, algorithm="HS256", headers=headers)
    return token

# ----------------------
# Function to verify JWT token
# ----------------------
def verify_hs256_token(token: str):
    """
    Verify HS256 JWT token
    """
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"], audience="my_users", issuer="my_app")
        return payload
    except jwt.ExpiredSignatureError:
        return "Token expired"
    except jwt.InvalidAudienceError:
        return "Invalid audience"
    except jwt.InvalidIssuerError:
        return "Invalid issuer"
    except jwt.InvalidTokenError:
        return "Invalid token"

# ----------------------
# Usage Example
# ----------------------
data = {"user_id": 123, "role": "admin"}  # Custom payload
token = create_hs256_token(data)
print("HS256 Token:", token)

decoded = verify_hs256_token(token)
print("Decoded Payload:", decoded)
```

---

### **2️⃣ RS256 (RSA + SHA256) Example**

```python
"""
RS256 JWT Example
- Asymmetric algorithm: private key signs, public key verifies
- Header, payload, and optional claims included
"""

from datetime import datetime, timedelta
import jwt

# ----------------------
# RSA Keys (keep private key secret)
# ----------------------
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAv...your_private_key_here...IDAQAB
-----END RSA PRIVATE KEY-----"""

PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAv...your_public_key_here...IDAQAB
-----END PUBLIC KEY-----"""

# ----------------------
# Function to create RS256 JWT token
# ----------------------
def create_rs256_token(
    data: dict,
    expires_in_minutes: int = 15
):
    """
    Create RS256 JWT token
    """
    # Payload: required + optional claims
    payload = data.copy()
    payload.update({
        "exp": datetime.utcnow() + timedelta(minutes=expires_in_minutes),  # Required
        "iat": datetime.utcnow(),  # Optional
        "nbf": datetime.utcnow(),  # Optional
        "iss": "my_app",           # Optional
        "aud": "my_users"          # Optional
    })

    # Header
    headers = {
        "alg": "RS256",
        "typ": "JWT"
    }

    # Encode token
    token = jwt.encode(payload, PRIVATE_KEY, algorithm="RS256", headers=headers)
    return token

# ----------------------
# Function to verify RS256 JWT token
# ----------------------
def verify_rs256_token(token: str):
    """
    Verify RS256 JWT token
    """
    try:
        payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"], audience="my_users", issuer="my_app")
        return payload
    except jwt.ExpiredSignatureError:
        return "Token expired"
    except jwt.InvalidAudienceError:
        return "Invalid audience"
    except jwt.InvalidIssuerError:
        return "Invalid issuer"
    except jwt.InvalidTokenError:
        return "Invalid token"

# ----------------------
# Usage Example
# ----------------------
data = {"user_id": 456, "role": "user"}
token = create_rs256_token(data)
print("RS256 Token:", token)

decoded = verify_rs256_token(token)
print("Decoded Payload:", decoded)
```

---

### **📌 Notes for Jupyter Notebook**

* **Header**

  * `alg`: signing algorithm (`HS256` or `RS256`) ✅ required
  * `typ`: token type (`JWT`) ✅ optional, default `"JWT"`

* **Payload (Claims)**

  * `exp`: expiration time ✅ required
  * `iat`: issued at time ⬜ optional
  * `nbf`: not valid before ⬜ optional
  * `iss`: issuer ⬜ optional
  * `aud`: audience ⬜ optional
  * **Custom claims**: `user_id`, `role`, etc.

* **HS256**: simple, secret shared between services.

* **RS256**: more secure for distributed systems; private key signs, public key verifies.

---

## **FastAPI JWT Example: HS256**

```python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt

app = FastAPI()

# Secret key for HS256
SECRET_KEY = "my_super_secret_key"

# Sample user database
fake_users_db = {
    "alice": {"username": "alice", "password": "secret123", "role": "admin"},
    "bob": {"username": "bob", "password": "mypassword", "role": "user"}
}

# ----------------------
# JWT Functions
# ----------------------
def create_hs256_token(data: dict, expires_in_minutes: int = 15):
    payload = data.copy()
    payload.update({
        "exp": datetime.utcnow() + timedelta(minutes=expires_in_minutes),
        "iat": datetime.utcnow(),
        "iss": "my_app",
        "aud": "my_users"
    })
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

def verify_hs256_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"], audience="my_users", issuer="my_app")
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# ----------------------
# Dependency to get current user
# ----------------------
def get_current_user(token: str = Depends(OAuth2PasswordRequestForm)):
    return verify_hs256_token(token.password)

# ----------------------
# Login route
# ----------------------
@app.post("/login")
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = fake_users_db.get(form_data.username)
    if not user or user["password"] != form_data.password:
        raise HTTPException(status_code=400, detail="Invalid credentials")
    
    token = create_hs256_token({"user_id": user["username"], "role": user["role"]})
    return {"access_token": token, "token_type": "bearer"}

# ----------------------
# Protected route
# ----------------------
@app.get("/protected")
def protected_route(current_user: dict = Depends(get_current_user)):
    return {"message": f"Hello {current_user['user_id']}! You have access.", "role": current_user["role"]}
```

---

### **FastAPI JWT Example: RS256**

```python
# Only the JWT functions change; FastAPI routes remain mostly the same
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAv...your_private_key_here...IDAQAB
-----END RSA PRIVATE KEY-----"""

PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAv...your_public_key_here...IDAQAB
-----END PUBLIC KEY-----"""

def create_rs256_token(data: dict, expires_in_minutes: int = 15):
    payload = data.copy()
    payload.update({
        "exp": datetime.utcnow() + timedelta(minutes=expires_in_minutes),
        "iat": datetime.utcnow(),
        "iss": "my_app",
        "aud": "my_users"
    })
    return jwt.encode(payload, PRIVATE_KEY, algorithm="RS256")

def verify_rs256_token(token: str):
    try:
        payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"], audience="my_users", issuer="my_app")
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")
```

---

### **📌 Key Points**

* **Login flow**: validate user → generate JWT → return token
* **Protected route**: dependency verifies JWT → grants access
* **Token claims**: `exp`, `iat`, `iss`, `aud` + custom claims (`user_id`, `role`)
* **HS256 vs RS256**: only change signing/verification method and keys

---




## **Authentication vs Authorization**

### **1️⃣ What is Authentication?**

**Definition:**
Authentication is the process of **verifying who a user is**. It answers the question:

> “Are you really who you say you are?”

**Key Points:**

* Deals with **identity verification**
* Happens **before authorization**
* Usually involves **credentials**: username/password, OTP, biometric, tokens

**Common Use Cases:**

* Login to a website or app
* API access using API keys or JWTs
* Mobile apps using social login (Google, Facebook)

**Types of Authentication:**

| Type                               | Description                                   | Example             |
| ---------------------------------- | --------------------------------------------- | ------------------- |
| **Basic Auth**                     | Username & password encoded in header         | REST APIs           |
| **OAuth2 Password Flow**           | Users provide credentials to get access token | Mobile apps         |
| **OAuth2 Authorization Code Flow** | Redirect user to auth server for consent      | Google login        |
| **API Key**                        | Pre-shared key in headers                     | Public APIs         |
| **JWT Tokens**                     | JSON Web Tokens to prove identity             | SPA & Microservices |
| **2FA / MFA**                      | Two-factor / multi-factor authentication      | OTP, TOTP apps, SMS |

**Example:**

* Logging in to Gmail with email + password
* Entering a Google Authenticator code

---

### **2️⃣ What is Authorization?**

**Definition:**
Authorization is the process of **determining what an authenticated user is allowed to do**. It answers the question:

> “What can you access or perform?”

**Key Points:**

* Deals with **permissions**
* Happens **after authentication**
* Usually based on **roles, policies, or claims**

**Common Use Cases:**

* Role-based access control (RBAC)
* Feature toggles for specific users
* Admin vs user permissions in dashboards
* Accessing APIs with scopes (`read:user`, `write:post`)

**Types of Authorization:**

| Type                                      | Description                      | Example                               |
| ----------------------------------------- | -------------------------------- | ------------------------------------- |
| **Role-Based Access Control (RBAC)**      | Access based on user role        | Admin/User roles in HRMS app          |
| **Attribute-Based Access Control (ABAC)** | Access based on attributes       | Department, location, clearance level |
| **Policy-Based Access Control**           | Access based on defined policies | AWS IAM policies                      |
| **Scope-Based Access**                    | Access based on OAuth2 scopes    | `read:user`, `write:files`            |

**Example:**

* In HRMS, only **HR managers** can update employee salary
* In GitHub, a collaborator may **read a repo** but **cannot delete it**

---

### **3️⃣ Key Differences: Authentication vs Authorization**

| Feature           | Authentication                      | Authorization                       |
| ----------------- | ----------------------------------- | ----------------------------------- |
| Question Answered | Who are you?                        | What can you do?                    |
| Purpose           | Verify identity                     | Grant or restrict access            |
| Happens When      | User logs in                        | After user is authenticated         |
| Data Required     | Credentials, OTP, biometric         | Role, permissions, policies, scopes |
| Example           | Login page, social login, JWT token | Admin dashboard, file permissions   |

---

### **4️⃣ Practical Example: Login + Role Access**

```python
# FastAPI Example
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Sample user database
fake_users_db = {
    "alice": {"username": "alice", "password": "1234", "role": "admin"},
    "bob": {"username": "bob", "password": "5678", "role": "user"},
}

# Authentication function
def authenticate_user(username: str, password: str):
    user = fake_users_db.get(username)
    if not user or user["password"] != password:
        return None
    return user

# Authorization function
def get_current_user_role(token: str = Depends(oauth2_scheme)):
    # For demo: token is username
    user = fake_users_db.get(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid authentication")
    return user

# Protected route with authorization
@app.get("/admin-area")
def admin_area(current_user: dict = Depends(get_current_user_role)):
    if current_user["role"] != "admin":
        raise HTTPException(status_code=403, detail="Not authorized")
    return {"message": f"Welcome {current_user['username']}! You have admin access."}
```

**Explanation:**

* **Authentication:** Check if username/password is valid
* **Authorization:** Check if authenticated user has `admin` role

---

### **5️⃣ Summary for Notebook**

1. **Authentication** = verify identity
2. **Authorization** = grant access based on permissions
3. **Order matters:** Authenticate → Authorize
4. **JWTs** often carry both:

   * Identity info → Authentication
   * Role/permissions → Authorization

---

# **1️⃣ Authentication Types**

### **A. Basic Authentication**

```python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
import secrets

app = FastAPI()
security = HTTPBasic()

fake_users_db = {
    "alice": "password123",
    "bob": "mypassword"
}

@app.get("/basic-auth")
def basic_auth(credentials: HTTPBasicCredentials = Depends(security)):
    correct_password = fake_users_db.get(credentials.username)
    if not correct_password or not secrets.compare_digest(credentials.password, correct_password):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
    return {"message": f"Hello {credentials.username}, authenticated via Basic Auth!"}
```

---

### **B. OAuth2 Password Flow (Username + Password)**

```python
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "mysecret"

fake_users_db = {"alice": {"username": "alice", "password": "password123"}}

def authenticate_user(username: str, password: str):
    user = fake_users_db.get(username)
    if not user or user["password"] != password:
        return None
    return user

@app.post("/token")
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    payload = {"sub": user["username"], "exp": datetime.utcnow() + timedelta(minutes=30)}
    token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
    return {"access_token": token, "token_type": "bearer"}

@app.get("/protected")
def protected_route(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return {"message": f"Hello {payload['sub']}! Authenticated with OAuth2 Password Flow"}
    except:
        raise HTTPException(status_code=401, detail="Invalid or expired token")
```

---

### **C. OAuth2 Authorization Code Flow**

*(FastAPI itself doesn’t handle the full OAuth2 redirect flow; you usually integrate with an external provider like Google, GitHub)*

```python
# Example snippet using OAuth2 with Google
from fastapi import FastAPI
from fastapi.responses import RedirectResponse

app = FastAPI()

CLIENT_ID = "your_google_client_id"
CLIENT_SECRET = "your_google_client_secret"
REDIRECT_URI = "http://localhost:8000/auth/callback"

@app.get("/auth/google")
def login_google():
    url = (
        f"https://accounts.google.com/o/oauth2/v2/auth?"
        f"client_id={CLIENT_ID}&response_type=code&scope=openid%20email&redirect_uri={REDIRECT_URI}"
    )
    return RedirectResponse(url)

@app.get("/auth/callback")
def auth_callback(code: str):
    # Exchange code for tokens via Google API (requests.post)
    return {"code_received": code}
```

---

### **D. API Key Authentication**

```python
from fastapi import FastAPI, Security, HTTPException
from fastapi.security.api_key import APIKeyHeader

app = FastAPI()
API_KEY = "supersecretapikey"
api_key_header = APIKeyHeader(name="X-API-KEY")

@app.get("/api-key-protected")
def api_key_auth(api_key: str = Security(api_key_header)):
    if api_key != API_KEY:
        raise HTTPException(status_code=403, detail="Invalid API Key")
    return {"message": "Authenticated via API Key"}
```

---

### **E. JWT Token Authentication**

*(We already did HS256 and RS256 earlier; this is a practical route example)*

```python
# Depends on previous JWT creation functions
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/jwt-protected")
def jwt_protected_route(token: str = Depends(oauth2_scheme)):
    payload = verify_hs256_token(token)  # or verify_rs256_token
    return {"message": f"Hello {payload['user_id']}! JWT verified."}
```

---

# **2️⃣ Authorization Types**

### **A. Role-Based Access Control (RBAC)**

```python
from fastapi import Depends, HTTPException

fake_users_db = {
    "alice": {"username": "alice", "role": "admin"},
    "bob": {"username": "bob", "role": "user"}
}

def get_current_user(token: str):
    # For demo: token is username
    user = fake_users_db.get(token)
    if not user:
        raise HTTPException(status_code=401, detail="Unauthorized")
    return user

def require_role(role: str):
    def role_checker(user: dict = Depends(get_current_user)):
        if user["role"] != role:
            raise HTTPException(status_code=403, detail="Forbidden: insufficient role")
        return user
    return role_checker

@app.get("/admin-area")
def admin_area(user: dict = Depends(require_role("admin"))):
    return {"message": f"Welcome {user['username']}! You are an admin."}
```

---

### **B. Attribute-Based Access Control (ABAC)**

```python
def require_department(dept: str):
    def dept_checker(user: dict = Depends(get_current_user)):
        if user.get("department") != dept:
            raise HTTPException(status_code=403, detail="Forbidden: wrong department")
        return user
    return dept_checker

@app.get("/finance-data")
def finance_data(user: dict = Depends(require_department("finance"))):
    return {"message": f"{user['username']} can access finance department data."}
```

---

### **C. Policy-Based Access Control**

```python
# Policies can be functions or dict-based rules
POLICIES = {
    "delete_user": lambda user: user["role"] == "admin",
    "read_reports": lambda user: user.get("department") in ["finance", "admin"]
}

def check_policy(action: str):
    def policy_checker(user: dict = Depends(get_current_user)):
        if not POLICIES.get(action, lambda u: False)(user):
            raise HTTPException(status_code=403, detail=f"Action '{action}' not allowed")
        return user
    return policy_checker

@app.delete("/user/{user_id}")
def delete_user(user_id: str, user: dict = Depends(check_policy("delete_user"))):
    return {"message": f"{user['username']} deleted user {user_id}"}
```

---

### **D. Scope-Based Authorization (OAuth2 Scopes)**

```python
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", scopes={"read": "Read access", "write": "Write access"})

def require_scope(required_scope: str):
    def scope_checker(token: str = Depends(oauth2_scheme)):
        # In practice, decode JWT and check 'scope' claim
        payload = verify_hs256_token(token)
        token_scopes = payload.get("scopes", [])
        if required_scope not in token_scopes:
            raise HTTPException(status_code=403, detail=f"Scope '{required_scope}' required")
        return payload
    return scope_checker

@app.get("/read-data")
def read_data(payload: dict = Depends(require_scope("read"))):
    return {"message": "You have read access"}
```

---

✅ **Notes for Jupyter Notebook**

* Authentication: Verify **identity** → Basic Auth, OAuth2, API Key, JWT
* Authorization: Verify **permissions** → RBAC, ABAC, policies, scopes
* **JWT tokens** can carry **identity + roles/scopes**, allowing combined auth & authorization



# **3️⃣ Token Management, Session Handling & Advanced Security**

---

## **A. Access Token, Refresh Token, and Revoke Flows**

```python
from fastapi import FastAPI, Depends, HTTPException
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer
import jwt

app = FastAPI()
SECRET_KEY = "supersecretkey"
REFRESH_SECRET_KEY = "refreshsupersecret"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# In-memory storage for revoked tokens
revoked_tokens = set()

# ----------------------
# Create Access Token
# ----------------------
def create_access_token(user_id: str, expires_minutes=15):
    payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(minutes=expires_minutes)}
    token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
    return token

# ----------------------
# Create Refresh Token
# ----------------------
def create_refresh_token(user_id: str, expires_days=7):
    payload = {"sub": user_id, "exp": datetime.utcnow() + timedelta(days=expires_days)}
    token = jwt.encode(payload, REFRESH_SECRET_KEY, algorithm="HS256")
    return token

# ----------------------
# Verify Token & Check Revoked
# ----------------------
def verify_token(token: str, refresh=False):
    key = REFRESH_SECRET_KEY if refresh else SECRET_KEY
    if token in revoked_tokens:
        raise HTTPException(status_code=401, detail="Token revoked")
    try:
        return jwt.decode(token, key, algorithms=["HS256"])
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# ----------------------
# Revoke Token
# ----------------------
def revoke_token(token: str):
    revoked_tokens.add(token)
    return True

# ----------------------
# Example Routes
# ----------------------
@app.post("/token/access")
def get_access_token(user_id: str):
    return {"access_token": create_access_token(user_id)}

@app.post("/token/refresh")
def refresh_token(refresh_token: str):
    payload = verify_token(refresh_token, refresh=True)
    new_access = create_access_token(payload["sub"])
    return {"access_token": new_access}

@app.post("/token/revoke")
def revoke(token: str):
    revoke_token(token)
    return {"status": "revoked"}
```

---

## **B. Device Fingerprinting & Multiple Active Sessions**

```python
# Simple example: track sessions per user/device
user_sessions = {}  # {user_id: [device_id1, device_id2, ...]}

def add_session(user_id: str, device_id: str):
    devices = user_sessions.get(user_id, [])
    devices.append(device_id)
    user_sessions[user_id] = devices

def remove_session(user_id: str, device_id: str):
    devices = user_sessions.get(user_id, [])
    if device_id in devices:
        devices.remove(device_id)
    user_sessions[user_id] = devices

@app.post("/login-session")
def login_session(user_id: str, device_id: str):
    add_session(user_id, device_id)
    return {"message": f"Device {device_id} logged in", "active_devices": user_sessions[user_id]}
```

---

## **C. Refresh Token Rotation**

```python
# Each refresh generates a new refresh token and revokes the old
@app.post("/token/rotate")
def rotate_refresh(refresh_token: str):
    payload = verify_token(refresh_token, refresh=True)
    revoke_token(refresh_token)
    new_refresh = create_refresh_token(payload["sub"])
    new_access = create_access_token(payload["sub"])
    return {"access_token": new_access, "refresh_token": new_refresh}
```

---

## **D. Password Policies & Reset Flow**

```python
import re
from fastapi import Form

# Example password validation
def validate_password(password: str):
    if len(password) < 8:
        raise HTTPException(status_code=400, detail="Password too short")
    if not re.search(r"[A-Z]", password):
        raise HTTPException(status_code=400, detail="Password must contain uppercase")
    if not re.search(r"[0-9]", password):
        raise HTTPException(status_code=400, detail="Password must contain a number")
    return True

@app.post("/reset-password")
def reset_password(username: str = Form(...), new_password: str = Form(...)):
    validate_password(new_password)
    # Update password in DB
    return {"message": f"Password for {username} updated successfully"}
```

---

## **E. 2FA / MFA Integration (TOTP Example)**

```python
import pyotp

# Generate secret per user
user_secrets = {}  # {username: secret}

@app.post("/generate-2fa")
def generate_2fa(username: str):
    secret = pyotp.random_base32()
    user_secrets[username] = secret
    otp_uri = pyotp.totp.TOTP(secret).provisioning_uri(name=username, issuer_name="MyApp")
    return {"otp_uri": otp_uri, "secret": secret}

@app.post("/verify-2fa")
def verify_2fa(username: str, token: str):
    secret = user_secrets.get(username)
    if not secret:
        raise HTTPException(status_code=400, detail="2FA not set")
    totp = pyotp.TOTP(secret)
    if totp.verify(token):
        return {"status": "verified"}
    else:
        raise HTTPException(status_code=400, detail="Invalid token")
```

---

## **F. API Security: HMAC, Signed URLs, IP Whitelisting**

```python
import hmac, hashlib
from fastapi import Request

API_SECRET = b"myapisecret"

# HMAC Signature Example
def generate_hmac(message: str):
    return hmac.new(API_SECRET, message.encode(), hashlib.sha256).hexdigest()

@app.get("/signed-url")
def signed_url(token: str, request: Request):
    expected = generate_hmac(request.url.path)
    if token != expected:
        raise HTTPException(status_code=403, detail="Invalid signature")
    return {"message": "HMAC verified"}

# IP Whitelist Example
WHITELISTED_IPS = ["127.0.0.1"]

@app.get("/restricted-ip")
def restricted_ip(request: Request):
    client_ip = request.client.host
    if client_ip not in WHITELISTED_IPS:
        raise HTTPException(status_code=403, detail="IP not allowed")
    return {"message": "Access granted"}
```

---

## **G. CSRF Protection (FastAPI Example)**

```python
# Using a CSRF token in a form
from fastapi import Form, Cookie

@app.post("/submit-form")
def submit_form(csrf_token: str = Form(...), csrf_cookie: str = Cookie(...)):
    if csrf_token != csrf_cookie:
        raise HTTPException(status_code=403, detail="CSRF token mismatch")
    return {"message": "Form submitted safely"}
```

---

## **H. HTTPS Enforcement / HSTS**

```python
# FastAPI itself runs behind a server (like uvicorn) 
# In production, enforce HTTPS via reverse proxy (NGINX, Caddy)
# Example: set HSTS headers in middleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response

app.add_middleware(HTTPSRedirectMiddleware)  # Redirect HTTP -> HTTPS

class HSTSMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response: Response = await call_next(request)
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        return response

app.add_middleware(HSTSMiddleware)
```

---

### ✅ **Notebook Notes**

* **Token Management:** Access/Refresh, Revocation, Rotation
* **Session Handling:** Multiple devices, device fingerprinting
* **Password Security:** Policies, reset flow, MFA/TOTP
* **API Security:** HMAC, signed URLs, IP filtering
* **Web Security:** CSRF, HTTPS, HSTS

