In [1]:
# Phase 2 - Rule-Based SQL Injection Detection Engine
# Day 11: Requirements & Attack Taxonomy
# Notebook 3: Rule Engine Development

import pandas as pd
import numpy as np
import json
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
from datetime import datetime
import os

print("=" * 80)
print("PHASE 2: RULE-BASED SQL INJECTION DETECTION ENGINE")
print("=" * 80)
print("\nDay 11: Requirements & Attack Taxonomy Definition")
print("Objective: Define attack categories, examples, and operational constraints")

# Create directories for Phase 2 outputs
os.makedirs('../reports/phase2', exist_ok=True)
os.makedirs('../rules', exist_ok=True)
os.makedirs('../test_sets', exist_ok=True)

print("\n" + "=" * 80)
print("SECTION 1: ATTACK TAXONOMY DEFINITION")
print("=" * 80)

print("\nDefining 6 primary SQL injection attack categories...")

# Define comprehensive attack taxonomy
attack_taxonomy = {
    "1_tautology": {
        "category_id": "TAU",
        "name": "Tautology-Based Injection",
        "description": "Exploits always-true conditions to bypass authentication or retrieve all records",
        "technical_details": "Injects conditions like 'OR 1=1', 'OR 'a'='a' that always evaluate to TRUE",
        "severity": "HIGH",
        "typical_targets": ["Login forms", "Search filters", "WHERE clauses"],
        "detection_strategy": "Pattern matching for tautology expressions",
        "examples": [
            "' OR '1'='1",
            "' OR 1=1--",
            "admin' OR '1'='1'--",
            "' OR 'x'='x",
            "1' OR '1'='1' #",
            "' OR 'a'='a'--",
            "') OR ('1'='1",
            "1' OR 1=1 LIMIT 1--"
        ],
        "keywords": ["OR", "AND", "=", "1=1", "true", "false"],
        "false_positive_risks": [
            "Legitimate queries with OR conditions",
            "Mathematical expressions in data"
        ]
    },
    "2_union": {
        "category_id": "UNI",
        "name": "UNION-Based Injection",
        "description": "Uses UNION operator to combine malicious query with legitimate one",
        "technical_details": "Appends UNION SELECT to retrieve data from other tables",
        "severity": "CRITICAL",
        "typical_targets": ["Data retrieval endpoints", "SELECT statements", "API queries"],
        "detection_strategy": "Detect UNION keyword with SELECT/FROM patterns",
        "examples": [
            "' UNION SELECT NULL, NULL--",
            "' UNION ALL SELECT username, password FROM users--",
            "1' UNION SELECT table_name FROM information_schema.tables--",
            "' UNION SELECT 1,2,3,4,5--",
            "') UNION SELECT NULL,NULL,NULL#",
            "' UNION SELECT @@version--",
            "' UNION SELECT NULL, CONCAT(username, ':', password) FROM users--"
        ],
        "keywords": ["UNION", "UNION ALL", "SELECT", "FROM", "NULL"],
        "false_positive_risks": [
            "Legitimate complex queries using UNION",
            "Stored procedures with UNION"
        ]
    },
    "3_comment": {
        "category_id": "CMT",
        "name": "Comment-Based Injection",
        "description": "Uses SQL comments to truncate queries and bypass validation",
        "technical_details": "Injects -- or /* */ or # to comment out remaining query parts",
        "severity": "HIGH",
        "typical_targets": ["Login forms", "Input validation", "Query string parameters"],
        "detection_strategy": "Detect SQL comment sequences",
        "examples": [
            "admin'--",
            "' OR 1=1--",
            "'; DROP TABLE users--",
            "admin'/*",
            "' OR '1'='1'/*",
            "admin'#",
            "' OR 1=1#",
            "'; EXEC xp_cmdshell('dir')--"
        ],
        "keywords": ["--", "/*", "*/", "#"],
        "false_positive_risks": [
            "URLs with -- in parameters",
            "Email addresses or data containing #",
            "Mathematical operations (e.g., 5--3)"
        ]
    },
    "4_stacked": {
        "category_id": "STK",
        "name": "Stacked Queries Injection",
        "description": "Executes multiple SQL statements in a single query using semicolons",
        "technical_details": "Uses ; to separate and execute additional malicious commands",
        "severity": "CRITICAL",
        "typical_targets": ["API endpoints", "Batch processing", "Administrative interfaces"],
        "detection_strategy": "Detect semicolons followed by SQL keywords",
        "examples": [
            "'; DROP TABLE users--",
            "'; UPDATE users SET password='hacked'--",
            "'; INSERT INTO logs VALUES ('breach')--",
            "'; EXEC sp_executesql N'malicious_code'--",
            "1'; DELETE FROM products WHERE 1=1--",
            "'; CREATE TABLE backdoor (id INT)--",
            "'; GRANT ALL PRIVILEGES ON *.* TO 'attacker'@'%'--"
        ],
        "keywords": [";", "DROP", "DELETE", "UPDATE", "INSERT", "EXEC", "CREATE"],
        "false_positive_risks": [
            "Stored procedures with multiple statements",
            "Legitimate batch operations"
        ]
    },
    "5_time_blind": {
        "category_id": "TMB",
        "name": "Time-Based Blind Injection",
        "description": "Infers information based on response time delays",
        "technical_details": "Uses SLEEP(), WAITFOR, BENCHMARK() to cause delays",
        "severity": "CRITICAL",
        "typical_targets": ["Boolean-based queries", "Error-suppressed applications"],
        "detection_strategy": "Detect time-delay functions",
        "examples": [
            "' AND SLEEP(5)--",
            "'; WAITFOR DELAY '00:00:05'--",
            "' AND IF(1=1, SLEEP(5), 0)--",
            "' AND BENCHMARK(5000000,MD5('A'))--",
            "' OR IF(SUBSTRING(password,1,1)='a', SLEEP(5), 0)--",
            "'; SELECT pg_sleep(5)--",
            "' AND (SELECT * FROM (SELECT(SLEEP(5)))xyz)--"
        ],
        "keywords": ["SLEEP", "WAITFOR", "DELAY", "BENCHMARK", "pg_sleep"],
        "false_positive_risks": [
            "Legitimate performance testing queries",
            "Database maintenance scripts"
        ]
    },
    "6_advanced": {
        "category_id": "ADV",
        "name": "Advanced & Evasion Techniques",
        "description": "Complex attacks using encoding, obfuscation, or stored procedures",
        "technical_details": "Hex encoding, CHAR(), CONCAT(), stored proc abuse, XML/JSON injection",
        "severity": "CRITICAL",
        "typical_targets": ["API endpoints", "Complex applications", "Enterprise systems"],
        "detection_strategy": "Detect encoding patterns, function chaining, privilege escalation",
        "examples": [
            "0x61646d696e",  # hex for 'admin'
            "CHAR(97)+CHAR(100)+CHAR(109)+CHAR(105)+CHAR(110)",  # 'admin'
            "'; EXEC xp_cmdshell('net user')--",
            "'; EXEC sp_addrolemember 'db_owner', 'attacker'--",
            "%27%20OR%201=1--",  # URL encoded
            "LOAD_FILE('/etc/passwd')",
            "INTO OUTFILE '/var/www/shell.php'",
            "EXTRACTVALUE(1, CONCAT(0x5c, (SELECT @@version)))"
        ],
        "keywords": ["0x", "CHAR", "CONCAT", "EXEC", "xp_", "sp_", "LOAD_FILE", 
                     "OUTFILE", "EXTRACTVALUE", "%27", "%20"],
        "false_positive_risks": [
            "Legitimate hex values in data",
            "URL-encoded legitimate requests",
            "System administration queries"
        ]
    }
}

# Print taxonomy summary
print("\nAttack Categories Defined: 6")
print("-" * 70)

for key, category in attack_taxonomy.items():
    print(f"\n{category['category_id']}: {category['name']}")
    print(f"   Severity: {category['severity']}")
    print(f"   Description: {category['description']}")
    print(f"   Example patterns: {len(category['examples'])}")
    print(f"   Keywords tracked: {len(category['keywords'])}")

print("\n" + "=" * 80)
print("SECTION 2: DETAILED ATTACK EXAMPLES WITH BEHAVIOR")
print("=" * 80)

# Create detailed examples table
examples_data = []
for cat_key, category in attack_taxonomy.items():
    for i, example in enumerate(category['examples'][:3], 1):  # Top 3 examples
        examples_data.append({
            'Category': category['category_id'],
            'Attack Type': category['name'],
            'Example': example[:60] + "..." if len(example) > 60 else example,
            'Severity': category['severity'],
            'Target': category['typical_targets'][0] if category['typical_targets'] else "N/A"
        })

examples_df = pd.DataFrame(examples_data)

print("\nSample Attack Examples by Category:")
print("=" * 100)
print(examples_df.to_string(index=False))

print("\n" + "=" * 80)
print("SECTION 3: OPERATIONAL CONSTRAINTS & REQUIREMENTS")
print("=" * 80)

operational_requirements = {
    "performance": {
        "max_latency_per_query_ms": 10,
        "target_throughput_queries_per_sec": 1000,
        "memory_limit_mb": 512,
        "cpu_cores_allocated": 4,
        "rationale": "Real-time web application requirement - sub-10ms response time"
    },
    "accuracy": {
        "target_precision_overall": 0.95,
        "target_recall_overall": 0.92,
        "target_f1_overall": 0.93,
        "target_precision_high_confidence_rules": 0.98,
        "max_false_positive_rate": 0.02,
        "max_false_negative_rate": 0.08,
        "rationale": "Balance between security (low FNR) and usability (low FPR)"
    },
    "operational": {
        "rule_update_frequency": "Weekly",
        "monitoring_metrics": [
            "False positive rate per category",
            "False negative rate per category",
            "Query processing latency (p50, p95, p99)",
            "Throughput (queries/sec)",
            "Rule hit rate distribution"
        ],
        "alerting_thresholds": {
            "latency_p99_ms": 50,
            "false_positive_rate": 0.05,
            "throughput_drop_percent": 20
        },
        "deployment_strategy": "Blue-green with 5% canary",
        "rollback_criteria": "FPR > 5% or latency > 50ms"
    },
    "rule_engine_specific": {
        "max_rules_active": 100,
        "rule_evaluation_order": "Priority-based (severity-weighted)",
        "pattern_matching_engine": "Regex with compiled patterns",
        "caching_strategy": "LRU cache for repeated queries",
        "logging_level": "INFO for production, DEBUG for suspicious"
    }
}

print("\nPerformance Requirements:")
print("-" * 70)
for key, value in operational_requirements["performance"].items():
    if key != "rationale":
        print(f"  {key}: {value}")
print(f"\nRationale: {operational_requirements['performance']['rationale']}")

print("\nAccuracy Requirements:")
print("-" * 70)
for key, value in operational_requirements["accuracy"].items():
    if key != "rationale" and isinstance(value, (int, float)):
        print(f"  {key}: {value:.2%}" if value < 1 else f"  {key}: {value}")
print(f"\nRationale: {operational_requirements['accuracy']['rationale']}")

print("\nRule Engine Configuration:")
print("-" * 70)
for key, value in operational_requirements["rule_engine_specific"].items():
    print(f"  {key}: {value}")

print("\n" + "=" * 80)
print("SECTION 4: DESIRED DETECTION BEHAVIOR")
print("=" * 80)

detection_behavior = {
    "1_tautology": {
        "should_detect": [
            "' OR '1'='1",
            "admin' OR 1=1--",
            "' OR 'a'='a"
        ],
        "should_not_detect": [
            "SELECT * FROM products WHERE category='electronics' OR category='books'",
            "UPDATE settings SET value=1 WHERE id=1",
            "Legitimate text: 'The year 1=1999 was significant'"
        ],
        "edge_cases": [
            "String comparison in data: 'password reset token: a=a'",
            "Mathematical expression: quantity=1+1",
            "Boolean flag: is_active OR is_pending"
        ]
    },
    "2_union": {
        "should_detect": [
            "' UNION SELECT NULL--",
            "1' UNION ALL SELECT username, password FROM users--"
        ],
        "should_not_detect": [
            "Documentation: 'SQL UNION operator combines results'",
            "Column name: union_date",
            "Company name: 'Union Bank'"
        ],
        "edge_cases": [
            "Text containing 'union': 'The union of sets A and B'",
            "Legitimate multi-query with UNION in stored procedure"
        ]
    },
    "3_comment": {
        "should_detect": [
            "admin'--",
            "'; DROP TABLE users--"
        ],
        "should_not_detect": [
            "URL: http://example.com/page--old",
            "Email: user--test@example.com",
            "Markdown: Lists use -- for bullets"
        ],
        "edge_cases": [
            "Double dash in data: 'Model X--2024'",
            "CSS comments in web content",
            "Mathematical: 5--3 (subtraction)"
        ]
    },
    "4_stacked": {
        "should_detect": [
            "'; DROP TABLE users--",
            "1'; DELETE FROM products--"
        ],
        "should_not_detect": [
            "Semicolon in text: 'End of sentence; start of next'",
            "JSON data: {\"key\": \"value\"; \"key2\": \"value2\"}",
            "CSS: body { margin: 0; padding: 0; }"
        ],
        "edge_cases": [
            "Multiple semicolons in legitimate data",
            "Programming code samples in comments"
        ]
    },
    "5_time_blind": {
        "should_detect": [
            "' AND SLEEP(5)--",
            "'; WAITFOR DELAY '00:00:05'--"
        ],
        "should_not_detect": [
            "Documentation: 'Use SLEEP function for delays'",
            "Error message: 'Connection timeout - wait for retry'",
            "Text: 'I will benchmark the performance'"
        ],
        "edge_cases": [
            "Function name in comments",
            "Variable name: sleep_duration"
        ]
    },
    "6_advanced": {
        "should_detect": [
            "0x61646d696e",
            "CHAR(97)+CHAR(100)+CHAR(109)"
        ],
        "should_not_detect": [
            "Legitimate hex color: #FF00AB or 0xFF00AB",
            "MAC address: 0x1A:2B:3C:4D",
            "Documentation: 'CHAR function converts ASCII'"
        ],
        "edge_cases": [
            "Hex values in legitimate data",
            "URL-encoded legitimate requests"
        ]
    }
}

print("\nDetection Behavior Specification:")
for cat_key, behavior in detection_behavior.items():
    category_name = attack_taxonomy[cat_key]['name']
    print(f"\n{category_name}:")
    print(f"  Should Detect: {len(behavior['should_detect'])} examples")
    print(f"  Should NOT Detect: {len(behavior['should_not_detect'])} examples")
    print(f"  Edge Cases: {len(behavior['edge_cases'])} cases")

print("\n" + "=" * 80)
print("GENERATING VISUALIZATIONS")
print("=" * 80)

# Visualization 1: Attack Category Distribution
fig = make_subplots(
    rows=2, cols=2,
    subplot_titles=(
        'Attack Categories by Severity',
        'Examples per Category',
        'Keywords per Category',
        'False Positive Risk Assessment'
    ),
    specs=[
        [{"type": "bar"}, {"type": "bar"}],
        [{"type": "bar"}, {"type": "bar"}]
    ]
)

# Chart 1: Severity distribution
severity_counts = {}
for cat in attack_taxonomy.values():
    sev = cat['severity']
    severity_counts[sev] = severity_counts.get(sev, 0) + 1

fig.add_trace(
    go.Bar(
        x=list(severity_counts.keys()),
        y=list(severity_counts.values()),
        marker=dict(color=['#e74c3c', '#f39c12']),
        text=list(severity_counts.values()),
        textposition='outside'
    ),
    row=1, col=1
)

# Chart 2: Examples per category
categories = [cat['name'].split()[0] for cat in attack_taxonomy.values()]
example_counts = [len(cat['examples']) for cat in attack_taxonomy.values()]

fig.add_trace(
    go.Bar(
        x=categories,
        y=example_counts,
        marker=dict(color='#3498db'),
        text=example_counts,
        textposition='outside'
    ),
    row=1, col=2
)

# Chart 3: Keywords per category
keyword_counts = [len(cat['keywords']) for cat in attack_taxonomy.values()]

fig.add_trace(
    go.Bar(
        x=categories,
        y=keyword_counts,
        marker=dict(color='#27ae60'),
        text=keyword_counts,
        textposition='outside'
    ),
    row=2, col=1
)

# Chart 4: FP risk assessment
fp_risk_counts = [len(cat['false_positive_risks']) for cat in attack_taxonomy.values()]

fig.add_trace(
    go.Bar(
        x=categories,
        y=fp_risk_counts,
        marker=dict(color='#f39c12'),
        text=fp_risk_counts,
        textposition='outside'
    ),
    row=2, col=2
)

fig.update_xaxes(title_text="Severity", row=1, col=1)
fig.update_yaxes(title_text="Count", row=1, col=1)
fig.update_xaxes(title_text="Category", row=1, col=2, tickangle=45)
fig.update_yaxes(title_text="Example Count", row=1, col=2)
fig.update_xaxes(title_text="Category", row=2, col=1, tickangle=45)
fig.update_yaxes(title_text="Keyword Count", row=2, col=1)
fig.update_xaxes(title_text="Category", row=2, col=2, tickangle=45)
fig.update_yaxes(title_text="FP Risk Factors", row=2, col=2)

fig.update_layout(
    height=900,
    title_text="Phase 2 Day 11: Attack Taxonomy Overview",
    title_x=0.5,
    showlegend=False
)

config = {
    'toImageButtonOptions': {
        'format': 'png',
        'filename': 'phase2_day11_attack_taxonomy',
        'height': 900,
        'width': 1400,
        'scale': 2
    },
    'displayModeBar': True,
    'displaylogo': False
}

fig.show(config=config)

print("\nVisualizations generated successfully")

print("\n" + "=" * 80)
print("SAVING ATTACK TAXONOMY DOCUMENTATION")
print("=" * 80)

# Save complete taxonomy
taxonomy_doc = {
    "document_metadata": {
        "title": "SQL Injection Attack Taxonomy",
        "version": "1.0",
        "created_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "phase": "Phase 2 - Day 11",
        "author": "Rule Engine Development Team"
    },
    "attack_categories": attack_taxonomy,
    "operational_requirements": operational_requirements,
    "detection_behavior": detection_behavior,
    "summary": {
        "total_categories": len(attack_taxonomy),
        "total_examples": sum(len(cat['examples']) for cat in attack_taxonomy.values()),
        "total_keywords": sum(len(cat['keywords']) for cat in attack_taxonomy.values()),
        "severity_distribution": severity_counts
    }
}

taxonomy_path = '../reports/phase2/attack_taxonomy.json'
with open(taxonomy_path, 'w') as f:
    json.dump(taxonomy_doc, f, indent=4)

print(f"\nAttack taxonomy saved: {taxonomy_path}")

# Create human-readable markdown
markdown_content = f"""# SQL Injection Attack Taxonomy
**Version:** 1.0  
**Date:** {datetime.now().strftime('%Y-%m-%d')}  
**Phase:** Phase 2 - Rule Engine Development

## Overview
This document defines the 6 primary SQL injection attack categories, their characteristics, and detection requirements.

## Attack Categories

"""

for key, category in attack_taxonomy.items():
    markdown_content += f"""### {category['category_id']}: {category['name']}

**Severity:** {category['severity']}  
**Description:** {category['description']}

**Technical Details:**  
{category['technical_details']}

**Typical Targets:**
"""
    for target in category['typical_targets']:
        markdown_content += f"- {target}\n"
    
    markdown_content += f"""
**Detection Strategy:** {category['detection_strategy']}

**Example Attacks:**
"""
    for example in category['examples'][:5]:
        markdown_content += f"``````\n"
    
    markdown_content += f"""
**Keywords:** {', '.join(category['keywords'])}

**False Positive Risks:**
"""
    for risk in category['false_positive_risks']:
        markdown_content += f"- {risk}\n"
    
    markdown_content += "\n---\n\n"

markdown_content += f"""## Operational Requirements

### Performance
- Max Latency: {operational_requirements['performance']['max_latency_per_query_ms']}ms per query
- Target Throughput: {operational_requirements['performance']['target_throughput_queries_per_sec']} queries/sec
- Memory Limit: {operational_requirements['performance']['memory_limit_mb']}MB

### Accuracy Targets
- Overall Precision: ≥ {operational_requirements['accuracy']['target_precision_overall']}
- Overall Recall: ≥ {operational_requirements['accuracy']['target_recall_overall']}
- Overall F1-Score: ≥ {operational_requirements['accuracy']['target_f1_overall']}
- High-Confidence Rules Precision: ≥ {operational_requirements['accuracy']['target_precision_high_confidence_rules']}
- Max False Positive Rate: ≤ {operational_requirements['accuracy']['max_false_positive_rate']}
- Max False Negative Rate: ≤ {operational_requirements['accuracy']['max_false_negative_rate']}

## Next Steps
1. Day 12: Rule design and pattern engineering
2. Day 13-15: Rule implementation and validation
3. Day 16-18: Testing and performance optimization
"""

markdown_path = '../reports/phase2/attack_taxonomy.md'
with open(markdown_path, 'w') as f:
    f.write(markdown_content)

print(f"Human-readable taxonomy saved: {markdown_path}")

print("\n" + "=" * 80)
print("DAY 11 COMPLETED - ATTACK TAXONOMY DEFINED")
print("=" * 80)

print("\nDeliverables Created:")
print("  1. attack_taxonomy.json - Complete taxonomy with all details")
print("  2. attack_taxonomy.md - Human-readable documentation")
print("  3. Interactive visualizations - Attack overview charts")

print("\nKey Statistics:")
print(f"  Total attack categories: {len(attack_taxonomy)}")
print(f"  Total example payloads: {sum(len(cat['examples']) for cat in attack_taxonomy.values())}")
print(f"  Total keywords tracked: {sum(len(cat['keywords']) for cat in attack_taxonomy.values())}")
print(f"  Severity levels: {len(severity_counts)} (HIGH, CRITICAL)")

print("\nOperational Targets Set:")
print(f"  Max latency: {operational_requirements['performance']['max_latency_per_query_ms']}ms per query")
print(f"  Target F1-score: ≥ {operational_requirements['accuracy']['target_f1_overall']}")
print(f"  Max FP rate: ≤ {operational_requirements['accuracy']['max_false_positive_rate']}")

print("\nNext: Day 12 (Rule design and pattern engineering)")


PHASE 2: RULE-BASED SQL INJECTION DETECTION ENGINE

Day 11: Requirements & Attack Taxonomy Definition
Objective: Define attack categories, examples, and operational constraints

SECTION 1: ATTACK TAXONOMY DEFINITION

Defining 6 primary SQL injection attack categories...

Attack Categories Defined: 6
----------------------------------------------------------------------

TAU: Tautology-Based Injection
   Severity: HIGH
   Description: Exploits always-true conditions to bypass authentication or retrieve all records
   Example patterns: 8
   Keywords tracked: 6

UNI: UNION-Based Injection
   Severity: CRITICAL
   Description: Uses UNION operator to combine malicious query with legitimate one
   Example patterns: 7
   Keywords tracked: 5

CMT: Comment-Based Injection
   Severity: HIGH
   Description: Uses SQL comments to truncate queries and bypass validation
   Example patterns: 8
   Keywords tracked: 4

STK: Stacked Queries Injection
   Severity: CRITICAL
   Description: Executes multipl


Visualizations generated successfully

SAVING ATTACK TAXONOMY DOCUMENTATION

Attack taxonomy saved: ../reports/phase2/attack_taxonomy.json
Human-readable taxonomy saved: ../reports/phase2/attack_taxonomy.md

DAY 11 COMPLETED - ATTACK TAXONOMY DEFINED

Deliverables Created:
  1. attack_taxonomy.json - Complete taxonomy with all details
  2. attack_taxonomy.md - Human-readable documentation
  3. Interactive visualizations - Attack overview charts

Key Statistics:
  Total attack categories: 6
  Total example payloads: 45
  Total keywords tracked: 38
  Severity levels: 2 (HIGH, CRITICAL)

Operational Targets Set:
  Max latency: 10ms per query
  Target F1-score: ≥ 0.93
  Max FP rate: ≤ 0.02

Next: Day 12 (Rule design and pattern engineering)
