# SQL Injection Detection Model

This notebook builds a machine learning model to detect SQL injection attacks using LightGBM.

## 1. Import Dependencies

In [56]:
import pandas
import numpy
import re
import math
import collections
import lightgbm
import sklearn.model_selection
import sklearn.metrics
import sklearn.preprocessing
import joblib
import optuna

## 2. Configuration and Constants

In [57]:
RANDOM_STATE = 42
MAX_QUERY_LENGTH = 2000

MALICIOUS_PATTERNS = [
    (r'\bor\b.*?=.*?\bor\b', 3),
    (r'\bunion\b.*?\bselect\b', 5),
    (r'\bdrop\b.*?\btable\b', 5),
    (r'--', 2),
    (r'/\*.*?\*/', 2),
    (r'\bexec\b.*?\bxp_', 4),
    (r'\bsleep\b\s*\(', 3),
    (r'\bwaitfor\b.*?\bdelay\b', 3),
    (r'\bconcat\b\s*\(', 2),
    (r'\bchar\b\s*\(', 2),
    (r'@@version', 3),
    (r'\bsubstring\b\s*\(', 2)
]

In [None]:
def get_device_params():
    try:
        lightgbm.train(
            {'device': 'gpu', 'objective': 'regression', 'verbose': -1},
            lightgbm.Dataset([[1]], label=[0]),
            num_boost_round=1
        )
        return {'device': 'gpu'}
    except Exception:
        return {'device': 'cpu', 'num_threads': 0}

DEVICE_PARAMS = get_device_params()

## 3. Feature Extraction Functions

In [59]:
def calculate_entropy(text):
    if not text:
        return 0
    character_frequency = collections.Counter(text)
    text_length = len(text)
    entropy = 0
    for count in character_frequency.values():
        probability = count / text_length
        if probability > 0:
            entropy -= probability * math.log2(probability)
    return entropy

In [None]:
def extract_features(query):
    if not query or not isinstance(query, str):
        return {}

    query = query[:MAX_QUERY_LENGTH]
    query_lower = query.lower()
    query_length = len(query)

    special_char_count = len(re.findall(r'[^a-zA-Z0-9\s]', query))

    sql_keywords = ['select', 'from', 'where', 'union', 'drop', 'insert', 'update', 'delete']
    sql_keyword_count = sum(1 for keyword in sql_keywords if keyword in query_lower)

    entropy = calculate_entropy(query)
    single_quote_count = query.count("'")
    double_quote_count = query.count('"')
    has_comment = 1 if '--' in query or '/*' in query or '*/' in query else 0
    has_union = 1 if 'union' in query_lower else 0
    parentheses_count = query.count('(') + query.count(')')
    semicolon_count = query.count(';')

    whitespace_count = sum(1 for character in query if character.isspace())
    whitespace_ratio = whitespace_count / max(query_length, 1)
    numeric_count = sum(1 for character in query if character.isdigit())

    malicious_pattern_score = 0
    for pattern, weight in MALICIOUS_PATTERNS:
        if re.search(pattern, query_lower, re.IGNORECASE):
            malicious_pattern_score += weight

    return {
        'query_length': query_length,
        'special_char_count': special_char_count,
        'sql_keyword_count': sql_keyword_count,
        'entropy': entropy,
        'single_quote_count': single_quote_count,
        'double_quote_count': double_quote_count,
        'has_comment': has_comment,
        'has_union': has_union,
        'parentheses_count': parentheses_count,
        'semicolon_count': semicolon_count,
        'whitespace_ratio': whitespace_ratio,
        'numeric_count': numeric_count,
        'malicious_pattern_score': malicious_pattern_score
    }

## 4. Data Generation Functions

In [61]:
def generate_synthetic_malicious_queries(count):
    templates = [
        "' OR '1'='1",
        "' OR '1'='1' --",
        "' OR '1'='1' /*",
        "admin' --",
        "admin' #",
        "admin'/*",
        "' or 1=1--",
        "' or 1=1#",
        "' or 1=1/*",
        "') or '1'='1--",
        "') or ('1'='1--",
        "'; exec xp_cmdshell('dir'); --",
        "'; DROP TABLE users; --",
        "1' UNION SELECT NULL--",
        "' UNION SELECT username, password FROM users--",
        "1' AND 1=1--",
        "1' AND 1=2--",
        "' OR EXISTS(SELECT * FROM users WHERE username='admin'",
        "' OR 1=1 LIMIT 1--",
        "' OR 'a'='a",
        "') OR ('a'='a",
        '"; DROP TABLE users; --',
        "' OR SLEEP(5)--",
        "' OR 1=1; WAITFOR DELAY '00:00:05'--",
        "' UNION ALL SELECT @@version--",
        "' AND ASCII(SUBSTRING((SELECT password FROM users LIMIT 1),1,1)) > 64--"
    ]
    queries = []
    for i in range(count):
        template = templates[i % len(templates)]
        variation = template.replace('1', str(i % 10))
        queries.append(variation)
    return queries

## 5. Load External Dataset (SQLiV3)

In [None]:
import os

if not os.path.exists('sqliv5-dataset'):
    os.system('git clone https://github.com/nidnogg/sqliv5-dataset.git')

dataset_path = 'sqliv5-dataset/SQLiV3.csv'
data_raw = pandas.read_csv(dataset_path, header=None, encoding='utf-8', on_bad_lines='skip')
data_raw.columns = ['Query', 'Label', 'Col3', 'Col4']

data = data_raw[['Query', 'Label']].copy()
data['Label'] = pandas.to_numeric(data['Label'], errors='coerce')
data = data[data['Label'].isin([0, 1])]
data = data[data['Query'].notna()]
data = data[data['Query'].str.len() > 0]
data['Label'] = data['Label'].astype(int)

print(f"Dataset size: {len(data)}")
print(f"Malicious: {sum(data['Label'] == 1)}, Benign: {sum(data['Label'] == 0)}")
print(f"Balance ratio: {sum(data['Label'] == 1) / len(data):.2%}")

## 6. Feature Extraction and Data Splitting (External Dataset)

In [None]:
features_list = []
for query in data['Query']:
    features_list.append(extract_features(query))

X = pandas.DataFrame(features_list)
y = data['Label'].reset_index(drop=True)
X = X.fillna(0)

scaler = sklearn.preprocessing.StandardScaler()
X_normalized = scaler.fit_transform(X)
X_normalized = pandas.DataFrame(X_normalized, columns=X.columns)

# Use 80/20 split only for hyperparameter tuning
X_train, X_val, y_train, y_val = sklearn.model_selection.train_test_split(
    X_normalized, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y)

print(f"Dataset shape: {X.shape}")
print(f"Train: {X_train.shape[0]}, Val: {X_val.shape[0]}")

## 7. Hyperparameter Optimization

In [None]:
def optimize_hyperparameters(X_train, y_train, X_val, y_val, n_trials=50):
    def objective(trial):
        params = {
            'objective': 'binary',
            'metric': 'binary_logloss',
            'boosting_type': 'gbdt',
            'num_leaves': trial.suggest_int('num_leaves', 20, 60),
            'learning_rate': trial.suggest_float('learning_rate', 0.05, 0.2, log=True),
            'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 0.95),
            'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 0.95),
            'bagging_frequency': trial.suggest_int('bagging_frequency', 1, 7),
            'min_child_samples': trial.suggest_int('min_child_samples', 10, 40),
            'max_depth': trial.suggest_int('max_depth', 4, 12),
            'reg_alpha': trial.suggest_float('reg_alpha', 1e-6, 1.0, log=True),
            'reg_lambda': trial.suggest_float('reg_lambda', 1e-6, 1.0, log=True),
            'verbose': -1,
            'random_state': RANDOM_STATE
        }

        train_data = lightgbm.Dataset(X_train, label=y_train)
        valid_data = lightgbm.Dataset(X_val, label=y_val, reference=train_data)

        model = lightgbm.train(
            params,
            train_data,
            valid_sets=[valid_data],
            num_boost_round=1000,
            callbacks=[
                lightgbm.early_stopping(50),
                lightgbm.log_evaluation(0)
            ]
        )

        y_pred_proba = model.predict(X_val, num_iteration=model.best_iteration)
        auc_score = sklearn.metrics.roc_auc_score(y_val, y_pred_proba)

        trial.report(auc_score, model.best_iteration)
        if trial.should_prune():
            raise optuna.TrialPruned()

        return auc_score

    study = optuna.create_study(direction='maximize', pruner=optuna.pruners.MedianPruner())
    study.optimize(objective, n_trials=n_trials, n_jobs=4, show_progress_bar=True)

    print(f"Best AUC-ROC: {study.best_value:.4f}")
    return study.best_params

In [None]:
best_params = optimize_hyperparameters(X_train, y_train, X_val, y_val, n_trials=20)

## 8. Train Final Model

In [None]:
lgb_params = {
    'objective': 'binary',
    'metric': 'binary_logloss',
    'boosting_type': 'gbdt',
    'verbose': -1,
    'random_state': RANDOM_STATE,
    **best_params
}

train_data = lightgbm.Dataset(X_train, label=y_train)
valid_data = lightgbm.Dataset(X_val, label=y_val, reference=train_data)

model = lightgbm.train(
    lgb_params,
    train_data,
    valid_sets=[valid_data],
    num_boost_round=1000,
    callbacks=[
        lightgbm.early_stopping(50),
        lightgbm.log_evaluation(100)
    ]
)

print(f"Best iteration: {model.best_iteration}")

## 9. Model Evaluation

In [None]:
def evaluate_with_cross_validation(params, X, y, n_folds=5):
    """Evaluate model using cross-validation only"""
    kfold = sklearn.model_selection.StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=RANDOM_STATE)
    cv_scores = []
    
    for fold, (train_idx, val_idx) in enumerate(kfold.split(X, y)):
        X_train_cv = X.iloc[train_idx]
        y_train_cv = y.iloc[train_idx]
        X_val_cv = X.iloc[val_idx]
        y_val_cv = y.iloc[val_idx]
        
        train_data = lightgbm.Dataset(X_train_cv, label=y_train_cv)
        valid_data = lightgbm.Dataset(X_val_cv, label=y_val_cv, reference=train_data)
        
        model_cv = lightgbm.train(
            params,
            train_data,
            valid_sets=[valid_data],
            num_boost_round=1000,
            callbacks=[
                lightgbm.early_stopping(50),
                lightgbm.log_evaluation(0)
            ]
        )
        
        y_pred_proba = model_cv.predict(X_val_cv, num_iteration=model_cv.best_iteration)
        auc_score = sklearn.metrics.roc_auc_score(y_val_cv, y_pred_proba)
        cv_scores.append(auc_score)
        print(f"Fold {fold + 1}: AUC-ROC = {auc_score:.4f}")
    
    print(f"\nOverall: {numpy.mean(cv_scores):.4f} Â± {numpy.std(cv_scores):.4f}")
    return cv_scores

In [None]:
# For deployment, use the final model trained on validation data
# For evaluation, use cross-validation
print("\nCross-Validation Evaluation:")
cv_scores = evaluate_with_cross_validation(lgb_params, X_normalized, y)

In [None]:
# Find optimal threshold using the trained model
X_val_pred = model.predict(X_val, num_iteration=model.best_iteration)
thresholds = numpy.linspace(0, 1, 100)
f1_scores = []

for threshold in thresholds:
    y_pred = (X_val_pred > threshold).astype(int)
    if sum(y_pred) > 0:
        f1 = sklearn.metrics.f1_score(y_val, y_pred)
        f1_scores.append(f1)
    else:
        f1_scores.append(0)

optimal_threshold = thresholds[numpy.argmax(f1_scores)]
print(f"Optimal threshold: {optimal_threshold:.3f}")

## 10. Model Persistence

In [ ]:
def save_model(model, scaler, feature_names, filepath):
    model_data = {
        'model': model,
        'scaler': scaler,
        'feature_names': feature_names,
        'optimal_threshold': optimal_threshold,
        'model_params': lgb_params
    }
    joblib.dump(model_data, filepath)
    print(f"Model saved to {filepath}")

def predict_query(query, model_data):
    features = extract_features(query)
    feature_df = pandas.DataFrame([features])

    missing_features = set(model_data['feature_names']) - set(feature_df.columns)
    for feature in missing_features:
        feature_df[feature] = 0

    feature_df = feature_df[model_data['feature_names']]
    features_normalized = model_data['scaler'].transform(feature_df)
    probability = model_data['model'].predict(features_normalized, num_iteration=model_data['model'].best_iteration)[0]
    prediction = "MALICIOUS" if probability > model_data['optimal_threshold'] else "BENIGN"

    return {
        'query': query,
        'prediction': prediction,
        'probability': probability,
        'threshold': model_data['optimal_threshold']
    }

# Save the trained model
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(X)
save_model(model, scaler, list(X.columns), '/tmp/sql_injection_model.pkl')

# Feature importance
feature_importance = pandas.DataFrame({
    'feature': X.columns,
    'importance': model.feature_importance(importance_type='gain')
}).sort_values('importance', ascending=False)

print("\\nTop 5 Most Important Features:")
print(feature_importance.head(5).to_string(index=False))

## 11. Testing with Sample Queries

In [None]:
test_queries = [
    "SELECT * FROM users WHERE id = 123",
    "' OR 1=1 --",
    "admin'; DROP TABLE users; --",
    "1' UNION SELECT username, password FROM admin_users--",
    "'; exec xp_cmdshell 'net user hacker password123 /add' --",
    "' OR SLEEP(5)--",
]

for query in test_queries:
    result = predict_query(query, {'model': model, 'scaler': scaler,
                                   'feature_names': list(X.columns),
                                   'optimal_threshold': optimal_threshold})
    print(f"{query[:50]:50} -> {result['prediction']:10} ({result['probability']:.3f})")