# Security Testing Tutorial

## Learning Objectives

By the end of this tutorial, you will:
1. Understand common web application vulnerabilities
2. Learn XSS (Cross-Site Scripting) prevention techniques
3. Test for SQL injection vulnerabilities
4. Implement authentication security tests
5. Apply rate limiting and brute force protection
6. Use security headers effectively

## Prerequisites

- Python 3.8+
- RAG Engine Mini running locally
- Basic understanding of HTTP and APIs

⚠️ **Warning**: This tutorial is for educational purposes. Only test systems you own or have explicit permission to test.

## Part 1: Understanding Security Vulnerabilities

### OWASP Top 10

The OWASP Top 10 represents the most critical security risks to web applications:

1. **Broken Access Control** - Users accessing resources they shouldn't
2. **Cryptographic Failures** - Weak encryption, exposed sensitive data
3. **Injection** - SQL, NoSQL, OS command injection
4. **Insecure Design** - Fundamental design flaws
5. **Security Misconfiguration** - Default configs, verbose errors
6. **Vulnerable Components** - Outdated dependencies
7. **Authentication Failures** - Weak auth, session management
8. **Software Integrity Failures** - Insecure deserialization
9. **Logging Failures** - Insufficient monitoring
10. **Server-Side Request Forgery (SSRF)** - Unauthorized external requests

Let's examine each with examples:

In [None]:
# Display OWASP Top 10 with risk levels
vulnerabilities = {
    "Injection": {"severity": "High", "example": "SQL: ' OR 1=1--"},
    "XSS": {"severity": "High", "example": "<script>alert(1)</script>"},
    "Authentication": {"severity": "Critical", "example": "No rate limiting"},
    "Access Control": {"severity": "Critical", "example": "IDOR: /api/user/123"},
    "Sensitive Data": {"severity": "High", "example": "Plaintext passwords"},
}

print("Top 5 Critical Vulnerabilities:")
print("="*60)
for vuln, details in vulnerabilities.items():
    print(f"{vuln:20} | {details['severity']:10} | {details['example']}")

## Part 2: Cross-Site Scripting (XSS)

### What is XSS?

XSS allows attackers to inject malicious scripts into web pages that other users view.

**Types of XSS:**

1. **Stored XSS**: Malicious script saved to database
   - Persists across sessions
   - Affects all users viewing the data

2. **Reflected XSS**: Script in URL/request reflected in response
   - Requires user to click malicious link
   - One-time attack

3. **DOM-based XSS**: Manipulates client-side JavaScript
   - No server involvement
   - Difficult to detect

### Common XSS Payloads

In [None]:
# Common XSS attack vectors
xss_payloads = [
    {
        "name": "Basic Script",
        "payload": "<script>alert('XSS')</script>",
        "description": "Classic script tag injection"
    },
    {
        "name": "Image onerror",
        "payload": "<img src=x onerror=alert('XSS')>",
        "description": "Event handler injection"
    },
    {
        "name": "SVG onload",
        "payload": "<svg onload=alert('XSS')>",
        "description": "SVG-based XSS"
    },
    {
        "name": "JavaScript Protocol",
        "payload": "javascript:alert('XSS')",
        "description": "Protocol-based execution"
    },
    {
        "name": "Encoded Script",
        "payload": "<scr<script>ipt>alert('XSS')</scr</script>ipt>",
        "description": "Filter evasion with encoding"
    },
    {
        "name": "Template Injection",
        "payload": "{{constructor.constructor('alert(1)')()}}",
        "description": "Angular/Vue template injection"
    },
]

print("Common XSS Attack Vectors:")
print("="*80)
for i, attack in enumerate(xss_payloads, 1):
    print(f"\n{i}. {attack['name']}")
    print(f"   Payload: {attack['payload']}")
    print(f"   Description: {attack['description']}")

### XSS Prevention

The golden rule: **Never trust user input**

**Defense Layers:**

1. **Input Validation**
   - Whitelist allowed characters
   - Validate data types and lengths
   - Reject suspicious patterns

2. **Output Encoding**
   - Encode special characters: < > & " '
   - Context-aware encoding (HTML, JS, CSS, URL)

3. **Content Security Policy (CSP)**
   - Define allowed sources for scripts, styles, images
   - Block inline scripts

4. **HttpOnly Cookies**
   - Prevent JavaScript from accessing cookies
   - Protects session tokens

In [None]:
# Demonstrate output encoding
from html import escape

# Dangerous input
user_input = "<script>alert('XSS')</script>"

# Without encoding (DANGEROUS)
dangerous_output = user_input

# With encoding (SAFE)
safe_output = escape(user_input)

print("XSS Prevention with Output Encoding:")
print("="*60)
print(f"\nOriginal Input: {user_input}")
print(f"\nWithout Encoding (DANGEROUS):")
print(f"  HTML: {dangerous_output}")
print(f"  Result: Script will execute!")
print(f"\nWith Encoding (SAFE):")
print(f"  HTML: {safe_output}")
print(f"  Result: Displayed as text, not executed")

# Show the difference
print(f"\nBrowser rendering:")
print(f"  Dangerous: Displays alert popup (if not prevented by CSP)")
print(f"  Safe: Displays literal text: <script>alert('XSS')</script>")

### Testing for XSS

Let's write tests to verify XSS prevention:

In [None]:
# Example XSS test (pseudocode - would need actual API client)

xss_test_code = '''
import pytest

# List of XSS payloads to test
XSS_PAYLOADS = [
    "<script>alert('XSS')</script>",
    "<img src=x onerror=alert('XSS')>",
    "<svg onload=alert('XSS')>",
    "javascript:alert('XSS')",
]

@pytest.mark.parametrize("payload", XSS_PAYLOADS)
def test_ask_endpoint_xss_prevention(client, auth_headers, payload):
    #
    # Test that the /ask endpoint sanitizes XSS payloads.
    #
    # Expected Behavior:
    # - Request should succeed (200 OK)
    # - Response should NOT contain unescaped script tags
    # - Response should be properly sanitized
    #
    
    # Send request with XSS payload
    response = client.post(
        "/api/v1/ask",
        headers=auth_headers,
        json={"question": payload, "k": 5}
    )
    
    # Should not crash (200 OK)
    assert response.status_code == 200
    
    # Check response doesn't contain raw script tags
    response_text = response.text
    assert "<script>" not in response_text \
        or "&lt;script&gt;" in response_text, \
        f"Unescaped script tag found: {payload}"
    
    assert "<img" not in response_text \
        or "&lt;img" in response_text, \
        f"Unescaped img tag found: {payload}"
'''

print(xss_test_code)

## Part 3: SQL Injection (SQLi)

### What is SQL Injection?

SQL injection occurs when untrusted user input is concatenated into SQL queries without proper sanitization.

**Impact:**
- Access unauthorized data
- Modify or delete database content
- Execute administrative operations
- In some cases, execute OS commands

### Classic SQLi Example

In [None]:
# Vulnerable code example (DON'T DO THIS)

vulnerable_code = '''
# ❌ VULNERABLE CODE
def search_documents(user_input):
    query = f"SELECT * FROM documents WHERE title LIKE '%{user_input}%'"
    return db.execute(query)

# User input: ' OR '1'='1
# Resulting query: SELECT * FROM documents WHERE title LIKE '%' OR '1'='1%'
# This returns ALL documents!
'''

secure_code = '''
# ✅ SECURE CODE
def search_documents(user_input):
    query = "SELECT * FROM documents WHERE title LIKE :search"
    return db.execute(query, {"search": f"%{user_input}%"})

# User input: ' OR '1'='1
# Resulting query: SELECT * FROM documents WHERE title LIKE '%' OR '1'='1%'
# The entire input is treated as a literal string, not SQL code
'''

print("SQL Injection Vulnerability:")
print("="*60)
print(vulnerable_code)
print("\n\nSecure Alternative:")
print("="*60)
print(secure_code)

### SQLi Attack Patterns

In [None]:
# Common SQL Injection payloads
sqli_payloads = [
    {
        "payload": "' OR '1'='1",
        "effect": "Bypasses WHERE clause, returns all rows",
        "target": "Authentication, search"
    },
    {
        "payload": "' UNION SELECT * FROM users--",
        "effect": "Extracts data from other tables",
        "target": "Data exfiltration"
    },
    {
        "payload": "1; DROP TABLE documents--",
        "effect": "Destructive command execution",
        "target": "Database destruction"
    },
    {
        "payload": "' OR 1=1; INSERT INTO admins VALUES ('hacker')--",
        "effect": "Data modification",
        "target": "Privilege escalation"
    },
]

print("Common SQL Injection Patterns:")
print("="*80)
for i, attack in enumerate(sqli_payloads, 1):
    print(f"\n{i}. Payload: {attack['payload']}")
    print(f"   Effect: {attack['effect']}")
    print(f"   Target: {attack['target']}")

### SQLi Prevention

**Primary Defense: Parameterized Queries (Prepared Statements)**

This is the only way to completely prevent SQL injection.

**Best Practices:**

1. **Never concatenate user input into SQL queries**
2. **Use ORM or parameterized queries**
3. **Validate and sanitize input** (secondary defense)
4. **Use least privilege database user**
5. **Enable query logging** for detection

In [None]:
# SQLi Prevention Examples

prevention_examples = '''
## Python with SQLAlchemy (Recommended)

# ✅ Safe - SQLAlchemy automatically parameterizes
Document.query.filter(Document.title.like(f"%{user_input}%")).all()

# ✅ Safe - Explicit parameterization
from sqlalchemy import text

query = text("SELECT * FROM documents WHERE title LIKE :search")
result = db.execute(query, {"search": f"%{user_input}%"})

## Python with Raw SQL

import sqlite3

# ✅ Safe - Parameterized query
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute(
    "SELECT * FROM documents WHERE title LIKE ?",
    (f"%{user_input}%",)
)

# ❌ DANGEROUS - String formatting
cursor.execute(f"SELECT * FROM documents WHERE title LIKE '%{user_input}%'")
'''

print(prevention_examples)

### Testing for SQL Injection

Automated tests to verify SQLi prevention:

In [None]:
# SQL Injection test example
sqli_test = '''
import pytest

SQLI_PAYLOADS = [
    "' OR '1'='1",
    "' OR 1=1--",
    "1; DROP TABLE users--",
    "' UNION SELECT * FROM users--",
    "1' AND 1=1--",
    "1' AND 1=2--",
]

@pytest.mark.parametrize("payload", SQLI_PAYLOADS)
def test_sql_injection_prevention(client, auth_headers, payload):
    """Test SQL injection prevention in search endpoint."""
    
    response = client.get(
        "/api/v1/documents/search",
        headers=auth_headers,
        params={"q": payload, "limit": 10}
    )
    
    # Should not expose database errors
    assert response.status_code in [200, 422]
    
    # Response should not contain SQL keywords or error messages
    response_text = response.text.lower()
    assert "sql" not in response_text, \
        "SQL error message exposed - potential injection"
    assert "database" not in response_text, \
        "Database error message exposed"
    assert "syntax" not in response_text, \
        "Syntax error message exposed"
'''

print(sqli_test)

## Part 4: Authentication Security

### Brute Force Attacks

Attackers try common passwords against known usernames.

**Protection Mechanisms:**

1. **Rate Limiting**
   - Limit login attempts per IP/user
   - Progressive delays

2. **Account Lockout**
   - Lock account after N failed attempts
   - Unlock after time period or manual intervention

3. **CAPTCHA**
   - Require CAPTCHA after N failures
   - Blocks automated attacks

4. **Multi-Factor Authentication (MFA)**
   - Requires second factor (SMS, app, hardware key)
   - Makes brute force ineffective

In [None]:
# Brute force protection implementation

brute_force_protection = '''
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

# Setup rate limiter
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

# Apply strict limits to login endpoint
@app.route("/api/v1/auth/login", methods=["POST"])
@limiter.limit("5 per minute")  # Max 5 attempts per minute
def login():
    # Authentication logic
    pass

# Progressive delay implementation
failed_attempts = {}  # In production, use Redis

def check_login_attempts(email):
    attempts = failed_attempts.get(email, 0)
    
    if attempts >= 5:
        # Lock account for 30 minutes
        return False, "Account locked. Try again in 30 minutes."
    
    if attempts >= 3:
        # Require CAPTCHA
        return False, "Please complete CAPTCHA"
    
    return True, None

def record_failed_attempt(email):
    failed_attempts[email] = failed_attempts.get(email, 0) + 1
'''

print("Brute Force Protection Implementation:")
print("="*60)
print(brute_force_protection)

### JWT Token Security

JSON Web Tokens (JWT) are commonly used for authentication.

**Security Requirements:**

1. **Use Strong Algorithms**
   - ✅ HS256 with strong secret (32+ chars)
   - ✅ RS256 (asymmetric, preferred)
   - ❌ None algorithm (must reject)

2. **Short Expiration**
   - Access tokens: 15-60 minutes
   - Refresh tokens: 7-30 days

3. **Secure Storage**
   - httpOnly cookies (XSS protection)
   - Secure flag (HTTPS only)
   - SameSite=Strict (CSRF protection)

4. **Token Revocation**
   - Maintain blacklist of revoked tokens
   - Short expiration minimizes window

In [None]:
# JWT Security testing

jwt_security_test = '''
import pytest
from jose import jwt

def test_token_signature_validation(client):
    """Test that tampered tokens are rejected."""
    
    # Create token with tampered signature
    tampered_token = valid_token[:-5] + "XXXXX"
    
    headers = {"Authorization": f"Bearer {tampered_token}"}
    response = client.get("/api/v1/documents", headers=headers)
    
    # Must reject tampered token
    assert response.status_code == 401

def test_none_algorithm_rejected(client):
    """Test that 'none' algorithm is rejected."""
    
    import base64
    import json
    
    # Token with 'none' algorithm
    header = base64.urlsafe_b64encode(
        json.dumps({"alg": "none", "typ": "JWT"}).encode()
    ).decode().rstrip("=")
    
    payload = base64.urlsafe_b64encode(
        json.dumps({"sub": "admin"}).encode()
    ).decode().rstrip("=")
    
    none_token = f"{header}.{payload}."
    
    headers = {"Authorization": f"Bearer {none_token}"}
    response = client.get("/api/v1/documents", headers=headers)
    
    # Must reject 'none' algorithm
    assert response.status_code == 401

def test_expired_token_rejected(client):
    """Test that expired tokens are rejected."""
    
    # Create expired token
    expired_token = jwt.encode(
        {
            "sub": "test@example.com",
            "exp": int(time.time()) - 3600,  # Expired 1 hour ago
            "iat": int(time.time()) - 7200,
        },
        key="secret",
        algorithm="HS256"
    )
    
    headers = {"Authorization": f"Bearer {expired_token}"}
    response = client.get("/api/v1/documents", headers=headers)
    
    # Must reject expired token
    assert response.status_code == 401
'''

print(jwt_security_test)

## Part 5: Rate Limiting Implementation

Rate limiting is essential for:
- Preventing brute force attacks
- Protecting against DDoS
- Ensuring fair resource usage
- Managing costs (API quotas)

In [None]:
# Rate limiting test example

rate_limit_test = '''
def test_rate_limit_enforced(client):
    """Test that rate limits are enforced."""
    
    endpoint = "/api/v1/ask"
    num_requests = 150  # Exceed typical rate limit
    
    responses = []
    for _ in range(num_requests):
        response = client.post(
            endpoint,
            json={"question": "test"}
        )
        responses.append(response.status_code)
    
    # Count rate limited responses
    rate_limited = responses.count(429)
    successful = responses.count(200)
    
    print(f"\nRate Limit Test:")
    print(f"  Total requests: {num_requests}")
    print(f"  Successful: {successful}")
    print(f"  Rate limited (429): {rate_limited}")
    
    # Should have triggered rate limiting
    assert rate_limited > 0, "Rate limiting not triggered!"

def test_rate_limit_headers(client, auth_headers):
    """Test that rate limit headers are present."""
    
    response = client.get("/api/v1/documents", headers=auth_headers)
    
    # Standard rate limit headers
    limit = response.headers.get("X-RateLimit-Limit")
    remaining = response.headers.get("X-RateLimit-Remaining")
    reset = response.headers.get("X-RateLimit-Reset")
    
    print(f"\nRate Limit Headers:")
    print(f"  X-RateLimit-Limit: {limit}")
    print(f"  X-RateLimit-Remaining: {remaining}")
    print(f"  X-RateLimit-Reset: {reset}")
    
    if limit or remaining:
        print("  ✓ Rate limit headers present")
'''

print(rate_limit_test)

## Part 6: Security Headers

HTTP security headers add another layer of protection:

| Header | Purpose | Recommended Value |
|--------|---------|-------------------|
| **Content-Security-Policy** | Control resource loading | `default-src 'self'` |
| **X-Frame-Options** | Prevent clickjacking | `DENY` |
| **X-Content-Type-Options** | Prevent MIME sniffing | `nosniff` |
| **X-XSS-Protection** | Enable browser XSS filter | `1; mode=block` |
| **Strict-Transport-Security** | Force HTTPS | `max-age=31536000` |
| **Referrer-Policy** | Control referrer info | `strict-origin-when-cross-origin` |

In [None]:
# Security headers implementation

security_headers_code = '''
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)
        
        # Prevent clickjacking
        response.headers["X-Frame-Options"] = "DENY"
        
        # Prevent MIME sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"
        
        # XSS protection
        response.headers["X-XSS-Protection"] = "1; mode=block"
        
        # Referrer policy
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        
        # Content Security Policy
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'none'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:;"
        )
        
        # HTTPS only
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains"
        )
        
        return response

app = FastAPI()
app.add_middleware(SecurityHeadersMiddleware)
'''

print("Security Headers Implementation:")
print("="*60)
print(security_headers_code)

## Part 7: Practical Security Testing Exercise

Let's create a comprehensive security test suite:

In [None]:
# Complete security test suite example

complete_suite = '''

# tests/security/test_security_comprehensive.py

import pytest
import time
from typing import List, Dict

class TestXSSPrevention:
    """Test XSS prevention across all endpoints."""
    
    XSS_PAYLOADS: List[str] = [
        "<script>alert('XSS')</script>",
        "<img src=x onerror=alert('XSS')>",
        "<svg onload=alert('XSS')>",
        "javascript:alert('XSS')",
    ]
    
    @pytest.mark.parametrize("payload", XSS_PAYLOADS)
    def test_ask_endpoint_xss(self, client, auth_headers, payload):
        response = client.post(
            "/api/v1/ask",
            headers=auth_headers,
            json={"question": payload, "k": 5}
        )
        
        # Check for unescaped script tags
        assert "<script>" not in response.text \
            or "&lt;script&gt;" in response.text

class TestSQLInjectionPrevention:
    """Test SQL injection prevention."""
    
    SQLI_PAYLOADS: List[str] = [
        "' OR '1'='1",
        "' UNION SELECT * FROM users--",
        "1; DROP TABLE documents--",
    ]
    
    @pytest.mark.parametrize("payload", SQLI_PAYLOADS)
    def test_search_sqli(self, client, auth_headers, payload):
        response = client.get(
            "/api/v1/documents/search",
            headers=auth_headers,
            params={"q": payload}
        )
        
        # Should not expose database errors
        assert response.status_code in [200, 400]
        assert "sql" not in response.text.lower()

class TestBruteForceProtection:
    """Test brute force attack protection."""
    
    def test_login_rate_limiting(self, client):
        email = "test@example.com"
        
        # Attempt many failed logins
        for i in range(20):
            response = client.post(
                "/api/v1/auth/login",
                json={"email": email, "password": "wrong"}
            )
        
        # Should eventually be rate limited
        assert response.status_code == 429

class TestAuthenticationBypass:
    """Test authentication bypass attempts."""
    
    def test_invalid_token_rejected(self, client):
        headers = {"Authorization": "Bearer invalid_token"}
        response = client.get("/api/v1/documents", headers=headers)
        assert response.status_code == 401
    
    def test_malformed_auth_header(self, client):
        headers = {"Authorization": "Basic dXNlcjpwYXNz"}
        response = client.get("/api/v1/documents", headers=headers)
        assert response.status_code in [401, 403]

class TestSecurityHeaders:
    """Test security headers are present."""
    
    def test_security_headers_present(self, client):
        response = client.get("/health")
        
        assert "X-Frame-Options" in response.headers
        assert "X-Content-Type-Options" in response.headers
        assert "X-XSS-Protection" in response.headers

class TestInformationDisclosure:
    """Test that sensitive info is not leaked."""
    
    def test_error_messages_not_verbose(self, client):
        response = client.get("/api/v1/nonexistent")
        
        # Should not contain sensitive patterns
        response_text = response.text.lower()
        assert "sql" not in response_text
        assert "stack trace" not in response_text
        assert "exception" not in response_text
'''

print(complete_suite)

## Part 8: Security Testing in CI/CD

Integrate security testing into your pipeline:

```yaml
name: Security Tests

on: [push, pull_request]

jobs:
  security:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Run Bandit (SAST)
        run: |
          pip install bandit
          bandit -r src/ -f json -o bandit.json
      
      - name: Check dependencies
        run: |
          pip install safety
          safety check
      
      - name: Run security tests
        run: |
          pytest tests/security/ -v --tb=short
      
      - name: OWASP ZAP Scan
        uses: zaproxy/action-baseline@v0.7.0
        with:
          target: 'http://localhost:8000'
```

## Summary

### Key Takeaways

1. **Defense in Depth** - Multiple layers of security
2. **Never Trust User Input** - Validate everything
3. **Use Parameterized Queries** - Prevent SQL injection
4. **Encode Output** - Prevent XSS
5. **Implement Rate Limiting** - Prevent brute force
6. **Set Security Headers** - Additional protection layer
7. **Test Continuously** - Security is ongoing

### Security Checklist

- [ ] Input validation on all endpoints
- [ ] Output encoding for dynamic content
- [ ] Parameterized queries for database access
- [ ] Rate limiting on authentication endpoints
- [ ] JWT with strong algorithms and short expiration
- [ ] Security headers on all responses
- [ ] Error handling without information disclosure
- [ ] Multi-tenant data isolation
- [ ] Regular dependency vulnerability scans
- [ ] Security tests in CI/CD pipeline

### Additional Resources

- [OWASP Testing Guide](https://owasp.org/www-project-web-security-testing-guide/)
- [OWASP Cheat Sheets](https://cheatsheetseries.owasp.org/)
- [CWE Top 25](https://cwe.mitre.org/top25/)
- [Security Testing Guide](../../docs/learning/testing/04-security-testing.md)

## Next Steps

1. Run the security tests against your RAG Engine
2. Set up automated security scanning in CI/CD
3. Review and fix any vulnerabilities found
4. Create a security incident response plan
5. Stay updated on new security threats