In [1]:
# Imports

import base64
import numpy as np
from utils import get_rules_list, create_train_test_split, payload_to_vec, predict_vec
from modsec import init_modsec

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, roc_curve
from wafamole.models import Model
from wafamole.evasion import EvasionEngine
from sklearn.preprocessing import LabelEncoder

In [2]:
# Set up variables

# TODO: handle large files
attack_data_path = "data/attacks_20k.sql"
sane_data_path = "data/sanes_20k.sql"

rule_ids = get_rules_list()
modsec = init_modsec()

In [3]:
# Create train and test datasets OR load them from disk

train, test = create_train_test_split(
    attack_file=attack_data_path,
    sane_file=sane_data_path,
    train_size=5000,
    test_size=1000,
    modsec=modsec,
    rule_ids=rule_ids,
)

# Create a RF model

X_train = list(train["vector"])
y_train = train["label"]
X_test = list(test["vector"])
y_test = test["label"]

# create and train the Random Forest model
# number of trees is set to 160 for PLs other than PL1 as per the paper
model = RandomForestClassifier(n_estimators=160, random_state=666)
model.fit(X_train, y_train)
print("Model trained successfully!")

print(classification_report(y_test, model.predict(X_test)))

# free the memory of unneeded data
# del train, test
# del X_train, y_train, X_test, y_test

Reading and parsing data...
Full data shape: (13504, 2)
Splitting into train and test...
Creating vectors...


Processing payloads: 100%|██████████| 5000/5000 [01:28<00:00, 56.51it/s]
Processing payloads: 100%|██████████| 1000/1000 [00:37<00:00, 26.62it/s]


Done!
Train shape: (5000, 3) | Test shape: (1000, 3)
Model trained successfully!
              precision    recall  f1-score   support

      attack       0.96      0.94      0.95       516
        sane       0.94      0.96      0.95       484

    accuracy                           0.95      1000
   macro avg       0.95      0.95      0.95      1000
weighted avg       0.95      0.95      0.95      1000



In [14]:
# Here we can adjust the false positive rate (FPR) of the model

# Convert categorical labels to binary labels
# 'attack' is considered the positive class (1) and 'sane' is the negative class (0)
label_encoder = LabelEncoder()
binary_y_test = label_encoder.fit_transform(y_test)

# Use predict_proba to get probabilities
probabilities = model.predict_proba(X_test)[:, 1]  # Probabilities of the positive class

# Calculate ROC curve
fpr, tpr, thresholds = roc_curve(binary_y_test, probabilities)

# Find the threshold closest to your desired FPR (1%)
# when the model now predicts an instance as 'attack', it is correct 99% of the time, false 1% of the time
desired_fpr = 0.01
closest_idx = np.argmin(np.abs(fpr - desired_fpr))
threshold = thresholds[closest_idx]
adjusted_predictions = (probabilities >= threshold).astype(int)

print(classification_report(binary_y_test, adjusted_predictions))

              precision    recall  f1-score   support

           0       0.86      0.99      0.92       516
           1       0.99      0.83      0.90       484

    accuracy                           0.91      1000
   macro avg       0.92      0.91      0.91      1000
weighted avg       0.92      0.91      0.91      1000



In [5]:
# Create WAFamole model
class WAFamoleModel(Model):
    # TODO: rework predict payload to take vec ?
    def extract_features(self, value: str):
        payload_base64 = base64.b64encode(value.encode("utf-8")).decode("utf-8")
        return payload_to_vec(
            payload_base64=payload_base64, rule_ids=rule_ids, modsec=modsec
        )

    def classify(self, value: str):
        vec = self.extract_features(value)
        return predict_vec(
            vec=vec,
            model=model,
            rule_ids=rule_ids,
            modsec=modsec,
        )

In [6]:
# Create WAFamole evasion engine
wafamole_model = WAFamoleModel()
engine = EvasionEngine(wafamole_model)

In [7]:
# payload = "UPDATE `tab` SET `col1` = 1 WHERE `col3` >= 1110573056 LIMIT 516358144;" # sane
# payload = "SELECT `col1` FROM `tab` WHERE `col1` LIKE '%'s'%';" # attack
payload = 'SELECT SLEEP(5)#";'  # attack


# Test payload without evasion
payload_base64 = base64.b64encode(payload.encode("utf-8")).decode("utf-8")
vec = payload_to_vec(payload_base64, rule_ids, modsec)
is_attack = predict_vec(
    vec=vec,
    model=model,
    rule_ids=rule_ids,
    modsec=modsec,
)
print(f"Payload: {payload}")
print(f"Vec: {vec}")
print(f"Confidence: {round(is_attack, 5)}")

Payload: SELECT SLEEP(5)#";
Vec: [0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 1 0 0 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 1 0 0 0 0 0 1 0 0 0 1 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0]
Confidence: 0.98302


In [8]:
# Try and evade the WAF with WAFamole

min_confidence, min_payload = engine.evaluate(
    payload=payload,
    max_rounds=200,
    round_size=10,
    timeout=60,
    threshold=0.5,
)
print()
print(f"Min payload: {min_payload.encode('utf-8')}")
print(f"Min confidence: {round(min_confidence, 5)}")
print()
print(
    f"Reduced confidence from {round(is_attack, 5)} to {round(min_confidence, 5)} (reduction of {round(is_attack - min_confidence, 5)})"
)

[!] Execution timed out
Reached confidence 0.5764273185806232
with payload
SELECT SLEEP(0x5) || 0x0 OR False#";~@\|&
49N

Min payload: b'SELECT SLEEP(0x5) || 0x0 OR False#";~@\\|&\n49N'
Min confidence: 0.57643

Reduced confidence from 0.98302 to 0.57643 (reduction of 0.40659)
