In [1]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import random
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier

In [2]:
RND = 42
np.random.seed(RND)
random.seed(RND)

# -------------------------
# 1) Fonctions utilitaires
# -------------------------
def generate_ip_pool(n_public=2000, n_private=200):
    # Crée une liste d'IP publiques et privées (simples)
    pool = []
    # IP privées (10., 192.168., 172.16-31)
    for _ in range(n_private):
        pool.append(f"10.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}")
    # IP publiques (simples variations)
    for _ in range(n_public):
        pool.append(f"{random.randint(1,223)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}")
    return pool

def random_timestamp(start_days=90):
    # timestamp aléatoire dans les start_days derniers jours
    base = datetime.now()
    delta = timedelta(days=random.randint(0, start_days), hours=random.randint(0,23), minutes=random.randint(0,59))
    return base - delta

# -------------------------
# 2) Génération du dataset
# -------------------------
def generate_realistic_dataset(n_records=20000, ip_pool=None, attacker_ip_frac=0.06):
    if ip_pool is None:
        ip_pool = generate_ip_pool()
    n_ips = len(ip_pool)
    # Choisir quelques IP qui seront "attaquantes" récurrentes (mais pas toutes les sessions d'une IP seront attaques)
    n_attacker_ips = max(1, int(n_ips * attacker_ip_frac))
    attacker_ips = set(np.random.choice(ip_pool, size=n_attacker_ips, replace=False))
    
    rows = []
    for i in range(n_records):
        # Choisir IP (avec plus de chance de réutiliser IP attaquante si c'est une IP d'attaque)
        if random.random() < 0.02 and len(attacker_ips)>0:  # 2% de prob de créer un "burst" explicite
            ip = random.choice(list(attacker_ips))
        else:
            ip = random.choice(ip_pool)
        
        # Baseline attempt counts (légitimes ont souvent 1-5 tentatives, attaques 3-150 mais avec chevauchement)
        is_attacker_ip_prior = 1 if ip in attacker_ips else 0
        attempts_from_ip = int(np.clip(np.random.poisson(1 + 3*is_attacker_ip_prior) + np.random.choice([0,1,2], p=[0.7,0.2,0.1]), 1, 200))
        
        # failed_attempts : légitime peut échouer, attaque aussi ; on ajoute chevauchement
        # échantillonnage selon Beta-Binomial style (approx)
        fail_ratio_base = np.random.beta(1 + 2*is_attacker_ip_prior, 2 + 1*(1-is_attacker_ip_prior))
        failed_attempts = int(np.clip(np.round(fail_ratio_base * attempts_from_ip + np.random.randint(0,2)), 0, attempts_from_ip))
        
        login_success = attempts_from_ip - failed_attempts
        # login duration: attaques très rapides mais avec overlap
        if random.random() < 0.25*is_attacker_ip_prior:
            login_duration_seconds = np.random.uniform(0.05, 3.0) * max(1, attempts_from_ip/10)
        else:
            # sessions utilisateurs : durée plus variable
            login_duration_seconds = np.random.exponential(scale=3.0) + np.random.uniform(0.2, 6.0)
            # petite corrélation : plus d'essais -> durée légèrement plus grande
            login_duration_seconds += attempts_from_ip * np.random.uniform(0.01, 0.2)
        login_duration_seconds = float(np.clip(login_duration_seconds, 0.05, 600))
        
        # request_size_bytes : overlap but attacks might be a bit larger on average
        request_size_bytes = int(np.clip(np.random.normal(800 + 400*is_attacker_ip_prior, 300), 100, 10000))
        
        # requests_per_minute: poisson with overlap
        lam = 2 + 8*is_attacker_ip_prior + np.random.uniform(-1,2)
        requests_per_minute = int(np.clip(np.random.poisson(max(1, lam)), 0, 5000))
        
        # Timestamp
        ts = random_timestamp(start_days=120)
        
        rows.append({
            'timestamp': ts,
            'ip_address': ip,
            'login_success': login_success,
            'attempts_from_ip': attempts_from_ip,
            'failed_attempts': failed_attempts,
            'login_duration_seconds': login_duration_seconds,
            'request_size_bytes': request_size_bytes,
            'requests_per_minute': requests_per_minute
        })
    df = pd.DataFrame(rows)
    # On mélange pour éviter tout ordre biaisé
    df = df.sample(frac=1, random_state=RND).reset_index(drop=True)
    return df

# Générer dataset
ip_pool = generate_ip_pool(n_public=1800, n_private=200)
df = generate_realistic_dataset(n_records=20000, ip_pool=ip_pool, attacker_ip_frac=0.06)

# -------------------------
# 3) Création d'un label réaliste (probabiliste)
# -------------------------
# On calcule un score d'attaque latent à partir des features (sans "fuite explicite")
def compute_attack_score(row):
    # Normaliser approximativement via heuristiques (on ne veut pas utiliser stats du dataset complet pour éviter fuite)
    a = row['attempts_from_ip']
    f = row['failed_attempts']
    rpm = row['requests_per_minute']
    rs = row['request_size_bytes']
    avg_time = row['login_duration_seconds'] / max(1, a)
    
    # composantes : échecs relatifs, fréquence de requêtes, essais totaux, taille des requêtes, temps moyen
    comp_fail = (f / max(1, a))        # 0..1
    comp_attempts = np.log1p(a) / np.log1p(200)  # 0..1 approx
    comp_rpm = np.log1p(rpm) / np.log1p(500)     # 0..1 approx
    comp_size = (rs - 100) / (10000 - 100)       # 0..1 approx
    comp_time = 1.0 - np.tanh(avg_time / 10.0)   # attaques tendent à avoir avg_time faible -> comp_time proche 1
    
    # pondérations arbitraires mais réalistes + bruit
    score = 0.35*comp_fail + 0.25*comp_rpm + 0.2*comp_attempts + 0.1*comp_size + 0.1*comp_time
    score += np.random.normal(0, 0.07)  # bruit
    return np.clip(score, 0.0, 1.0)

df['attack_score'] = df.apply(compute_attack_score, axis=1)

# transformer score en label probabiliste (seuil dynamique pour obtenir ~6-10% d'attaques)
threshold = 0.62
df['is_credential_stuffing'] = (df['attack_score'] > threshold).astype(int)

# Ajuster proportion si besoin (pour simuler dataset réaliste)
# Si proportion trop faible/élevée, on ajuste threshold pour atteindre cible (facultatif)
target_frac = 0.08
current_frac = df['is_credential_stuffing'].mean()
if abs(current_frac - target_frac) > 0.01:
    # trouver threshold qui donne target_frac approximatif
    thr = np.percentile(df['attack_score'], 100*(1-target_frac))
    df['is_credential_stuffing'] = (df['attack_score'] >= thr).astype(int)

print("Proportion d'attaques :", df['is_credential_stuffing'].mean())

Proportion d'attaques : 0.08


In [3]:
n_records = 20000
ip_pool = generate_ip_pool(n_public=1800, n_private=200)

# Appel de la nouvelle fonction
df = generate_realistic_dataset(n_records=n_records, ip_pool=ip_pool, attacker_ip_frac=0.06)

# Création des labels
df['attack_score'] = df.apply(compute_attack_score, axis=1)
threshold = 0.62
df['is_credential_stuffing'] = (df['attack_score'] > threshold).astype(int)

# Ajustement facultatif pour atteindre proportion cible
target_frac = 0.08
current_frac = df['is_credential_stuffing'].mean()
if abs(current_frac - target_frac) > 0.01:
    thr = np.percentile(df['attack_score'], 100*(1-target_frac))
    df['is_credential_stuffing'] = (df['attack_score'] >= thr).astype(int)

# Format du timestamp
df['timestamp'] = df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Infos
print(f"Dataset shape: {df.shape}")
print(f"Credential stuffing cases: {df['is_credential_stuffing'].sum()}")
print("=== 10 premières lignes ===")
display(df.head(10))

Dataset shape: (20000, 10)
Credential stuffing cases: 1600
=== 10 premières lignes ===


Unnamed: 0,timestamp,ip_address,login_success,attempts_from_ip,failed_attempts,login_duration_seconds,request_size_bytes,requests_per_minute,attack_score,is_credential_stuffing
0,2025-08-13 12:45:27,47.38.148.235,1,2,1,4.21363,991,1,0.364206,0
1,2025-08-04 17:43:23,158.36.203.7,6,6,0,5.326295,957,1,0.152904,0
2,2025-08-17 19:15:26,10.54.223.19,0,1,1,8.403439,286,3,0.572005,1
3,2025-08-03 05:17:22,142.111.240.238,0,1,1,5.093761,1259,4,0.520577,0
4,2025-09-21 05:54:28,113.88.251.211,1,1,0,6.978925,609,5,0.219976,0
5,2025-08-29 20:49:27,125.37.144.166,0,2,2,5.939954,486,4,0.50361,0
6,2025-08-31 01:09:27,55.44.21.104,1,2,1,6.869387,988,3,0.44672,0
7,2025-09-23 16:46:23,151.39.111.216,1,2,1,12.341943,842,1,0.292187,0
8,2025-08-05 02:29:30,115.200.186.176,1,1,0,4.378385,812,6,0.196255,0
9,2025-09-15 13:25:25,28.143.92.99,2,2,0,4.186468,975,4,0.097881,0


In [4]:
df.to_csv("credential_stuffing_detection.csv", index=False)
print("Dataset saved to 'credential_stuffing_detection.csv'")

Dataset saved to 'credential_stuffing_detection.csv'


In [5]:
# Rates and derived features
df['failed_attempt_ratio'] = df['failed_attempts'] / df['attempts_from_ip']
df['success_ratio'] = df['login_success'] / df['attempts_from_ip']
df['avg_login_time'] = df['login_duration_seconds'] / df['attempts_from_ip']

# IP privée indicateur (détection simple)
df['is_private_ip'] = df['ip_address'].str.startswith(('10.', '172.', '192.168')).astype(int)

# Timestamp features
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
df['day_of_week'] = pd.to_datetime(df['timestamp']).dt.dayofweek

# Features finales sélectionnées
features = [
    'attempts_from_ip', 'failed_attempts', 'login_duration_seconds',
    'request_size_bytes', 'requests_per_minute',
    'failed_attempt_ratio', 'success_ratio', 'avg_login_time',
    'is_private_ip', 'hour', 'day_of_week'
]



In [6]:
# Remplacer inf / nan si présents (sécurité)
df[features] = df[features].fillna(0).replace([np.inf, -np.inf], 0)

# -------------------------
# 5) Train/Test & Modèles
# -------------------------
X = df[features]
y = df['is_credential_stuffing']

# Split stratifié
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=RND, stratify=y)

# Scale uniquement pour la régression logistique
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [7]:
# Random Forest (GridSearch léger)
rf = RandomForestClassifier(random_state=RND)
rf_params = {
    'n_estimators': [100, 200],
    'max_depth': [10, 20, None],
}
rf_grid = GridSearchCV(rf, rf_params, cv=3, scoring='roc_auc', n_jobs=-1)
rf_grid.fit(X_train, y_train)
rf_best = rf_grid.best_estimator_

In [8]:
# Logistic Regression
lr = LogisticRegression(random_state=RND, max_iter=1000)
lr_params = {'C': [0.1, 1.0, 5.0]}
lr_grid = GridSearchCV(lr, lr_params, cv=3, scoring='roc_auc', n_jobs=-1)
lr_grid.fit(X_train_scaled, y_train)
lr_best = lr_grid.best_estimator_

In [9]:
# SVM (RBF kernel) + GridSearch
# -------------------------
svm = SVC(kernel='rbf', probability=True, random_state=RND, class_weight='balanced')
svm_params = {
    'C': [0.1, 1, 5],
    'gamma': ['scale', 'auto']
}
svm_grid = GridSearchCV(svm, svm_params, cv=3, scoring='roc_auc', n_jobs=-1)
svm_grid.fit(X_train_scaled, y_train)
svm_best = svm_grid.best_estimator_

In [10]:
# KNN + GridSearch
# -------------------------
knn = KNeighborsClassifier()
knn_params = {
    'n_neighbors': [3, 5, 7],
    'weights': ['uniform', 'distance'],
    'metric': ['euclidean', 'manhattan']
}
knn_grid = GridSearchCV(knn, knn_params, cv=3, scoring='roc_auc', n_jobs=-1)
knn_grid.fit(X_train_scaled, y_train)
knn_best = knn_grid.best_estimator_

In [11]:
# 6) Évaluation
# -------------------------
def evaluate_model(model, X_eval, y_eval, name="Model"):
    y_pred = model.predict(X_eval)
    y_proba = model.predict_proba(X_eval)[:,1] if hasattr(model, "predict_proba") else None
    print(f"==== {name} ====")
    print("Confusion Matrix:")
    print(confusion_matrix(y_eval, y_pred))
    print("\nClassification Report:")
    print(classification_report(y_eval, y_pred, digits=3))
    if y_proba is not None:
        print("ROC-AUC Score:", round(roc_auc_score(y_eval, y_proba), 4))
    print("\n")

evaluate_model(rf_best, X_test, y_test, "Random Forest")
evaluate_model(lr_best, X_test_scaled, y_test, "Logistic Regression (scaled)")
evaluate_model(svm_best, X_test_scaled, y_test, "SVM (RBF)")
evaluate_model(knn_best, X_test_scaled, y_test, "KNN")

==== Random Forest ====
Confusion Matrix:
[[3644   36]
 [ 246   74]]

Classification Report:
              precision    recall  f1-score   support

           0      0.937     0.990     0.963      3680
           1      0.673     0.231     0.344       320

    accuracy                          0.929      4000
   macro avg      0.805     0.611     0.653      4000
weighted avg      0.916     0.929     0.913      4000

ROC-AUC Score: 0.8953


==== Logistic Regression (scaled) ====
Confusion Matrix:
[[3644   36]
 [ 248   72]]

Classification Report:
              precision    recall  f1-score   support

           0      0.936     0.990     0.962      3680
           1      0.667     0.225     0.336       320

    accuracy                          0.929      4000
   macro avg      0.801     0.608     0.649      4000
weighted avg      0.915     0.929     0.912      4000

ROC-AUC Score: 0.8989


==== SVM (RBF) ====
Confusion Matrix:
[[2536 1144]
 [  20  300]]

Classification Report:
        