# Broken Access Control:
Snapchat in 2014 leaked 4.6 million user phone numbers because attackers had unlimited access to endpoints without authentication.

Code:

In [None]:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

# Simulating a database
users_db = {
    1: {"id": 1, "username": "alice", "email": "alice@example.com", "is_admin": False},
    2: {"id": 2, "username": "bob", "email": "bob@example.com", "is_admin": False},
    3: {"id": 3, "username": "admin", "email": "admin@example.com", "is_admin": True}
}

class UserProfile(BaseModel):
    id: int
    username: str
    email: str
    is_admin: bool

# Vulnerable endpoint - No authentication check
@app.get("/api/users/{user_id}", response_model=UserProfile)
async def get_user_profile(user_id: int):
    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")
    return users_db[user_id]

This code allows you to brute force retrieve user information by changing the user id.

To secure it,
Authentication: Added API key authentication using FastAPI's security dependencies
Authorization: Implemented role-based access control
Principle of Least Privilege: Users can only access their own profiles unless they're admins

FIXED VERSION:

In [None]:
from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from typing import Optional

app = FastAPI()

# Simulating a database
users_db = {
    1: {"id": 1, "username": "alice", "email": "alice@example.com", "is_admin": False},
    2: {"id": 2, "username": "bob", "email": "bob@example.com", "is_admin": False},
    3: {"id": 3, "username": "admin", "email": "admin@example.com", "is_admin": True}
}

# Simulating an active sessions database
api_keys = {
    "user1_api_key": 1,
    "user2_api_key": 2,
    "admin_api_key": 3
}

api_key_header = APIKeyHeader(name="X-API-Key")

class UserProfile(BaseModel):
    id: int
    username: str
    email: str
    is_admin: bool

async def get_current_user(api_key: str = Depends(api_key_header)) -> Optional[dict]:
    if api_key not in api_keys:
        raise HTTPException(
            status_code=401,
            detail="Invalid API Key"
        )
    user_id = api_keys[api_key]
    return users_db[user_id]

@app.get("/api/users/{user_id}", response_model=UserProfile)
async def get_user_profile(
    user_id: int,
    current_user: dict = Depends(get_current_user)
):
    # Check if user is requesting their own profile or is an admin
    if current_user["id"] != user_id and not current_user["is_admin"]:
        raise HTTPException(
            status_code=403,
            detail="Access denied"
        )

    if user_id not in users_db:
        raise HTTPException(status_code=404, detail="User not found")

    return users_db[user_id]

Tools or processes to prevent:
Use tools like OWASP ZAP or Burp Suite for security testing
Implement automated security testing in CI/CD pipeline

# Cryptographic Failures
Equifax in 2017 had a breach affecting 143 million users because of bad key rotation, weak encryption, and unpatched vulnerabilities.

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import base64  # Weak encryption method

app = FastAPI()

# Simulating a database
user_db = {}

class User(BaseModel):
    username: str
    password: str

# VULNERABLE: Using base64 encoding instead of proper encryption/hashing
@app.post("/register")
async def register_user(user: User):
    if user.username in user_db:
        raise HTTPException(status_code=400, detail="Username already exists")

    # VULNERABLE: Using base64 encoding which is reversible and not secure
    encoded_password = base64.b64encode(user.password.encode()).decode()
    user_db[user.username] = encoded_password
    return {"message": "User registered successfully"}

@app.post("/login")
async def login_user(user: User):
    if user.username not in user_db:
        raise HTTPException(status_code=400, detail="User not found")

    stored_password = user_db[user.username]
    encoded_attempt = base64.b64encode(user.password.encode()).decode()

    if stored_password == encoded_attempt:
        return {"message": "Login successful"}
    raise HTTPException(status_code=401, detail="Invalid credentials")

This code has improper password handling and storage. It uses base64 encoding and it doesn’t have salting.


Here is the fixed version:

In [None]:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from passlib.context import CryptContext
import secrets

app = FastAPI()

# Proper password hashing configuration
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Simulating a database
user_db = {}

class User(BaseModel):
    username: str
    password: str

@app.post("/register")
async def register_user(user: User):
    if user.username in user_db:
        raise HTTPException(status_code=400, detail="Username already exists")

    # Secure: Using proper password hashing with salt
    hashed_password = pwd_context.hash(user.password)
    user_db[user.username] = hashed_password
    return {"message": "User registered successfully"}

@app.post("/login")
async def login_user(user: User):
    if user.username not in user_db:
        raise HTTPException(status_code=400, detail="User not found")

    stored_password = user_db[user.username]
    if pwd_context.verify(user.password, stored_password):
        return {"message": "Login successful"}
    raise HTTPException(status_code=401, detail="Invalid credentials")

This uses bcrypt which does salt generation.

Some tools are static analysis tools like bandit or sonarqube for security scanning.



# Injection
In 2023 ResumeLooters successfully used SQL injection attacks to steal over 2 million user records across 65 different websites.


In [None]:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)

@app.route('/users/<id>')
def get_user(id):
    # Vulnerable SQL query
    sql = f"SELECT * FROM users WHERE id = {id}"
    result = db.engine.execute(sql)
    return {"users": [dict(row) for row in result]}

This directly interpolates user input into a SQL query which could be exploited.

Here is the fixed version:


In [None]:
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80))

@app.route('/users/<int:id>')
def get_user(id):
    # Safe parameterized query
    user = User.query.filter_by(id=id).first()
    return jsonify({"id": user.id, "username": user.username})

This uses parameterized queries to prevent that.

Some processes or tools are input validation and sanitation. By implementing strict input validation and using parameterized queries you can prevent injection.
