In [1]:
import sqlite3

# Open the database file (or create it if it doesn't exist)
conn = sqlite3.connect('database.db')

# Create a cursor object to execute SQL commands
cursor = conn.cursor()

# Example: Query all tables in the database
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print("Tables in the database:", tables)

# Close the connection when done
conn.close()

Tables in the database: [('users',), ('sqlite_sequence',)]


In [2]:
import sqlite3

# Open the database file (or create it if it doesn't exist)
conn = sqlite3.connect('shop.db')

# Create a cursor object to execute SQL commands
cursor = conn.cursor()

# Example: Query all tables in the database
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print("Tables in the database:", tables)

# Close the connection when done
conn.close()

Tables in the database: [('users',), ('sqlite_sequence',), ('products',)]


In [3]:
import numpy as np
import pandas as pd
import pickle
from tensorflow.keras.models import load_model

# Load your model and preprocessing tools
autoencoder = load_model('autoencoder_model.h5')
with open('vectorizer.pkl', 'rb') as f:
    vectorizer = pickle.load(f)
with open('scaler.pkl', 'rb') as f:
    scaler = pickle.load(f)

threshold = 0.000430  # Set based on your F1 tuning

def extract_features(df):
    def sqli_keywords(text):
        keywords = ['select', 'union', 'insert', 'drop', '--', ';', "'", '"', 'or 1=1', 'sleep', 'benchmark']
        text = str(text).lower()
        return int(any(kw in text for kw in keywords))

    def feature_row(row):
        url = str(row['URL'])
        content = str(row.get('content', ''))
        cookie = str(row.get('cookie', ''))
        agent = str(row.get('User-Agent', ''))

        combined = f"{url} {content} {cookie}"

        return {
            'method_is_post': int(row['Method'] == 'POST'),
            'url_len': len(url),
            'url_param_count': url.count('='),
            'content_len': len(content),
            'cookie_len': len(cookie),
            'num_quotes': combined.count("'"),
            'num_double_quotes': combined.count('"'),
            'num_equals': combined.count('='),
            'num_semicolons': combined.count(';'),
            'num_comments': combined.count('--'),
            'has_sql_keywords': sqli_keywords(combined),
            'has_union': int('union' in combined.lower()),
            'has_select': int('select' in combined.lower()),
            'ua_is_curl': int('curl' in agent.lower()),
            'ua_is_sqlmap': int('sqlmap' in agent.lower()),
            'ua_len': len(agent),
            'cookie_has_session': int('jsessionid' in cookie.lower())
        }

    return pd.DataFrame([feature_row(row) for _, row in df.iterrows()])


def detect_threat(log_json):
    # Normalize input keys
    data = {
        'Method': log_json.get('method', ''),
        'URL': log_json.get('url', ''),
        'content': log_json.get('content', ''),
        'cookie': str(log_json.get('cookie', '')),
        'User-Agent': log_json.get('user_agent', '')
    }

    df = pd.DataFrame([data])

    # Text-based field for TF-IDF
    df['combined'] = (
        df['Method'].fillna('') + ' ' +
        df['User-Agent'].fillna('') + ' ' +
        df['URL'].fillna('') + ' ' +
        df['content'].fillna('') + ' ' +
        df['cookie'].fillna('')
    )

    X_text = vectorizer.transform(df['combined']).toarray()
    X_custom = extract_features(df)
    X_final = np.hstack([X_text, X_custom.values])
    X_scaled = scaler.transform(X_final)

    # Predict
    X_pred = autoencoder.predict(X_scaled)
    mse = np.mean(np.power(X_scaled - X_pred, 2), axis=1)[0]
    label = 'sql_injection' if mse > threshold else 'benign'

    return {'label': label, 'score': float(mse)}


# Example use:
sample_input = {
    "content": "username=admin&password=admin123",
    "cookie": {
        "session": "eyJ1c2VyIjoiYWRtaW4ifQ.Z_-4YQ.GKgirDa6gT53v42343Hm9f1L3fY"
    },
    "message": "Login successful! 🎉",
    "method": "POST",
    "url": "http://127.0.0.1:5000/login",
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36"
}

result = detect_threat(sample_input)
print(result)


FileNotFoundError: [Errno 2] Unable to synchronously open file (unable to open file: name = 'autoencoder_model.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [1]:
import random
import csv

def generate_sql_dataset(filename="sql_dataset.csv", num_samples=1000, attack_ratio=0.5):
    benign_templates = [
        "SELECT * FROM users WHERE id='{}';",
        "SELECT name, email FROM customers WHERE email='{}';",
        "UPDATE products SET price={} WHERE id='{}';",
        "INSERT INTO orders (user_id, product_id, quantity) VALUES ('{}', '{}', {});",
        "DELETE FROM sessions WHERE session_id='{}';"
    ]
    
    sqli_payloads = [
        "' OR '1'='1",
        "'; DROP TABLE users; --",
        "' UNION SELECT NULL, version(), NULL --",
        "' OR 1=1--",
        "' AND 1=(SELECT COUNT(*) FROM tab); --",
        "'; EXEC xp_cmdshell('dir'); --"
    ]
    
    benign_data = []
    attack_data = []

    for _ in range(num_samples):
        if random.random() < attack_ratio:
            # Attack sample
            template = random.choice(benign_templates)
            injection = random.choice(sqli_payloads)
            if '{}' in template:
                query = template.replace("{}", injection, 1)
            else:
                query = template + " " + injection
            attack_data.append([query, "attack"])
        else:
            # Benign sample
            template = random.choice(benign_templates)
            args = [random.randint(1, 1000) if '{}' in template else "" for _ in range(template.count("{}"))]
            args = [str(arg) for arg in args]
            query = template.format(*args)
            benign_data.append([query, "benign"])

    all_data = benign_data + attack_data
    random.shuffle(all_data)

    # Save to CSV
    with open(filename, "w", newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(["query", "label"])
        writer.writerows(all_data)
    
    print(f"✅ Dataset saved to {filename} with {len(all_data)} samples.")



In [4]:
generate_sql_dataset("sql_dataset.csv", num_samples=10000, attack_ratio=0.5)

✅ Dataset saved to sql_dataset.csv with 10000 samples.


In [5]:
import pandas as pd
df= pd.read_csv("sql_dataset.csv")
df

Unnamed: 0,query,label
0,UPDATE products SET price='; DROP TABLE users;...,attack
1,DELETE FROM sessions WHERE session_id='617';,benign
2,"INSERT INTO orders (user_id, product_id, quant...",attack
3,SELECT * FROM users WHERE id='' OR 1=1--';,attack
4,DELETE FROM sessions WHERE session_id='715';,benign
...,...,...
9995,SELECT * FROM users WHERE id='362';,benign
9996,SELECT * FROM users WHERE id='' OR 1=1--';,attack
9997,"SELECT name, email FROM customers WHERE email=...",benign
9998,"SELECT name, email FROM customers WHERE email=...",benign


In [7]:
df[df['label']=='benign']

Unnamed: 0,query,label
1,DELETE FROM sessions WHERE session_id='617';,benign
4,DELETE FROM sessions WHERE session_id='715';,benign
6,"SELECT name, email FROM customers WHERE email=...",benign
7,"SELECT name, email FROM customers WHERE email=...",benign
9,"SELECT name, email FROM customers WHERE email=...",benign
...,...,...
9992,SELECT * FROM users WHERE id='529';,benign
9993,"INSERT INTO orders (user_id, product_id, quant...",benign
9995,SELECT * FROM users WHERE id='362';,benign
9997,"SELECT name, email FROM customers WHERE email=...",benign
