1. **Enterprise LLM Architecture** – Overview of how large language models are deployed in real-world systems, including serving, orchestration, monitoring, CI/CD, and security.

2. **Production vLLM Server (hands-on)** – Practical setup of a production-ready LLM server using vLLM and FastAPI, including request handling, streaming, and metrics collection.

3. **Kubernetes Deployment** – Running the LLM server on Kubernetes for scalability, load balancing, health checks, and GPU resource management.

4. **Load Testing & Performance** – Testing the server under high traffic, measuring response times, throughput (tokens/sec), and plotting performance metrics.

5. **Security & Rate Limiting** – Protecting the server with authentication, JWT tokens, request limits, Redis caching, and content safety filters.


### 0: `Fastapi stuff` multiply use|access

In [1]:
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import StreamingResponse, Response
from contextlib import asynccontextmanager
import uvicorn
import json
import time
import logging
from typing import AsyncGenerator, Dict, Any
import asyncio

### 1: `Load Testing & Performance`

**Purpose:** Test how many users your LLM server can handle before crashing or slowing down.

**What it does:**
- Sends **50 fake user requests** to your server at once
- Measures **response time** and **speed**
- Shows **graphs** of performance
- Finds **breaking point** (how many users it can handle)

**For you:** Run this to see if your server can handle **5, 50, or 500 users** before failing.

### 2: ` Kubernetes`

**Purpose:** Deploy your LLM server at **enterprise scale** using Kubernetes.

**What it does:**
1. **Runs 2 copies** of your server (`replicas: 2`) for reliability
2. **Auto-heals** if a server crashes (Kubernetes restarts it)
3. **Load balances** traffic between servers
4. **Scales automatically** based on demand
5. **Monitors health** (`/health` endpoint)
6. **Uses GPUs** efficiently (`nvidia.com/gpu: 2`)

**When you need this:**
- **Thousands of users** using your agent
- **24/7 uptime** required
- **Multiple GPUs/servers** available
- **Team management** needed

**For now:** Your **single FastAPI server** is enough. Kubernetes is for **large-scale production**.

### 3: `Security & Rate Limiting`

**Purpose:** Protect your LLM server from **abuse and attacks**.

**What it adds:**
1. **Rate limiting** → Users can't spam (max 100 requests/minute)
2. **Authentication** → Users need login tokens
3. **Content filtering** → Blocks harmful prompts
4. ~~**Redis caching**~~ → ~~Speeds up repeated requests~~

**Why needed:**  
If your server goes public, people could:
- **Crash it** with too many requests
- **Run up bills** (if using paid APIs)
- **Generate bad content**

**For now:** Optional. Add when you **deploy publicly**.

In [34]:
# Rate limiting → Users can't spam (max 100 requests/minute)

In [1]:
import time
from collections import defaultdict

In [73]:
rate_limits = defaultdict(list) # Example: {"user123": [timestamp1, timestamp2, ...]}

def check_rate_limit(ids, limit=10, per=60):
    current = time.time()

    rate_limits[ids] = [t for t in rate_limits[ids] if current - t < per]

    if len(rate_limits[ids]) >= limit:
        return False
    rate_limits[ids].append(current)
    return True

In [30]:
# Content filtering → Blocks harmful prompts

In [33]:
BLOCKED = ['harmful', 'dangerous', 'illegal', 'hate speech']

def check_safety(text):
    lower = text.lower()
    return not any(bad in lower for bad in BLOCKED)

In [43]:
# Authentication → Users need login tokens

In [64]:
valid_token= {}
token_counter = 0

In [52]:
def create_token(ids):
    global token_counter

    token_counter +=1
    token = f"token_{token_counter}_{ids}"  # Create token like "token_1_user123"

    valid_token[token] = {
        "user_id": ids,
        "created": time.time(),
        "expires": time.time() + 86400
    }
    return token

In [55]:
def verify_token(token):
    if token not in valid_token:
        return None

    data = valid_token[token]
    current_time = time.time()

    if current_time > data['expires']:
        del valid_token[token]
        return None
    
    return data

In [84]:
def secure_request(token, user_input):
    user_data = verify_token(token)
    if not user_data:
        return {"error": "Invalid or expired token"}

    user_id = user_data['user_id']
    
    if not check_rate_limit(user_id, limit=10):
        return {"error": "Rate limit exceeded (10 requests/minute)"}

    if not check_safety(user_input):
        return {"error": "Content safety violation"}

    return {"success": True, "user": user_id}

In [60]:
# run

In [90]:
token = create_token('evans1234')

In [98]:
result = secure_request(token, 'How are you?')
print(result)

{'error': 'Rate limit exceeded (10 requests/minute)'}


In [87]:
# incase run/ debug....

In [96]:
for i in range(11):
    check_rate_limit('evans1234')

In [97]:
print(secure_request(token, 'Whats up'))

{'error': 'Rate limit exceeded (10 requests/minute)'}


In [99]:
1

1

In [17]:
# # security_standalone.py
# import time  # To get current time for rate limiting
# from collections import defaultdict  # Dictionary that automatically creates empty lists

# # Store how many times each user has made requests and when
# # Example: {"user123": [timestamp1, timestamp2, ...]}
# rate_limits = defaultdict(list)

# def check_rate_limit(user_id, limit=100, window=60):
## user_id → Which user is making request (e.g., "john_doe")
## limit=100 → Max 100 requests allowed
## window=60 → Within 60 seconds (1 minute)

#     """Check if user has made too many requests recently"""
#     current = time.time()  # Get current time in seconds
#     key = user_id  # Use user ID as key (e.g., "user123")
    
#     # Remove timestamps older than 60 seconds (keep only recent requests)
#     # Example: If current=100, keep timestamps > 40 (100-60=40)
#     rate_limits[key] = [t for t in rate_limits[key] if current - t < window]
    
#     # Check if user has reached limit (e.g., more than 100 recent requests)
#     if len(rate_limits[key]) >= limit:
#         return False  # Too many requests
    
#     # Add current timestamp to user's request history
#     rate_limits[key].append(current)
#     return True  # Request allowed

# # Words we don't want users to send
# BLOCKED = ["harmful", "dangerous", "illegal", "hate speech"]

# def check_safety(text):
#     """Check if text contains bad words"""
#     lower = text.lower()  # Convert to lowercase so "Harmful" = "harmful"
    
#     # Check if ANY blocked word is in the text
#     # Example: "This is harmful" → "harmful" in text → returns False (not safe)
#     return not any(bad in lower for bad in BLOCKED)

# # Store valid user tokens
# # Example: {"token_1_user123": {"user_id": "user123", "created": 12345, "expires": 12345+86400}}
# valid_tokens = {}
# token_counter = 0  # Counter to make unique token IDs

# def create_token(user_id):
#     """Create a new access token for a user"""
#     global token_counter  # Use the global counter variable
    
#     token_counter += 1  # Increase counter by 1
#     token = f"token_{token_counter}_{user_id}"  # Create token like "token_1_user123"
    
#     # Store token info with expiration time (24 hours from now)
#     valid_tokens[token] = {
#         "user_id": user_id,  # Who owns this token
#         "created": time.time(),  # When created (current time)
#         "expires": time.time() + 86400  # Expire in 24 hours (60*60*24=86400 seconds)
#     }
#     return token

# def verify_token(token):
#     """Check if token is valid and not expired"""
#     # Check if token exists in our storage
#     if token not in valid_tokens:
#         return None  # Token doesn't exist
    
#     data = valid_tokens[token]  # Get token data
#     current_time = time.time()
    
#     # Check if token has expired
#     if current_time > data["expires"]:
#         del valid_tokens[token]  # Remove expired token
#         return None  # Token expired
    
#     return data  # Token is valid, return user data

# def secure_request(token, user_input):
#     """Check everything: token, rate limit, and content safety"""
#     # Step 1: Check if token is valid
#     user_data = verify_token(token)
#     if not user_data:
#         return {"error": "Invalid or expired token"}
    
#     user_id = user_data["user_id"]  # Get user ID from token
    
#     # Step 2: Check rate limit (max 100 requests per minute)
#     if not check_rate_limit(user_id, limit=100):
#         return {"error": "Rate limit exceeded (100 requests/minute)"}
    
#     # Step 3: Check if input contains bad words
#     if not check_safety(user_input):
#         return {"error": "Content safety violation"}
    
#     # If all checks pass, return success
#     return {"success": True, "user": user_id}

In [3]:
# # Example: How to use this code
# if __name__ == "__main__":
#     # Create a token for user "user123"
#     token = create_token("user123")
#     print(f"Token created: {token}")
    
#     # Test 1: Good request (should work)
#     result = secure_request(token, "Hello, how are you?")
#     print(f"Good request result: {result}")
    
#     # Test 2: Bad content (should fail)
#     result = secure_request(token, "This is harmful content")
#     print(f"Bad content result: {result}")
    
#     # Test 3: Simulate making 101 requests quickly (should fail rate limit)
#     print("\nTesting rate limit by making 101 fake requests...")
#     for i in range(101):
#         check_rate_limit("user123")  # Add 101 timestamps
    
#     result = secure_request(token, "Test message")
#     print(f"After 101 requests: {result}")