# üõ°Ô∏è Workshop SOC Analyst : Threat Hunting & Engineering
**Dur√©e estim√©e :** 8 Heures | **Niveau :** Interm√©diaire

---

## üéØ Objectifs de la mission

Vous √™tes analyste SOC dans une entreprise qui subit actuellement plusieurs vagues d'attaques. Votre SIEM (Loki) collecte les logs, mais aucune r√®gle de d√©tection n'est en place. 

Votre mission se d√©roule en 5 √©tapes :
1.  **Threat Intel :** Identifier les signatures des attaques (SQLi, Bruteforce, etc.).
2.  **Threat Hunting :** Traquer les attaquants dans les logs avec le langage LogQL.
3.  **SIEM Engineering :** D√©velopper un script Python pour d√©tecter les anomalies statistiques (volum√©trie).
4.  **Network Forensics :** Analyser les captures r√©seaux (PCAP) pour confirmer les attaques bas niveau.
5.  **Active Response :** D√©velopper un script de riposte automatique pour bloquer les IPs malveillantes.

---

## üõ†Ô∏è Module 0 : Initialisation & Architecture
Avant de commencer, nous devons charger les outils et v√©rifier la connexion au SIEM.

In [1]:
# Installation des d√©pendances (si n√©cessaire)
!pip install scapy pandas matplotlib docker requests


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.3[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import requests
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import time
import docker
from scapy.all import *

# --- CONFIGURATION ---
# Si vous √™tes DANS le r√©seau Docker (Jupyter), utilisez le nom du service : "loki"
# Si vous √™tes en local, utilisez "localhost"
LOKI_HOST = "loki"
LOKI_PORT = "3100"
LOKI_API = f"http://{LOKI_HOST}:{LOKI_PORT}/loki/api/v1/query_range"

def test_connection():
    try:
        r = requests.get(f"http://{LOKI_HOST}:{LOKI_PORT}/ready", timeout=2)
        return r.status_code == 200
    except: return False

print("üì° Test de connexion au SIEM...")
assert test_connection(), "‚ùå Connexion √† Loki impossible. V√©rifiez vos conteneurs."
print("‚úÖ Connexion r√©ussie.")

üì° Test de connexion au SIEM...
‚úÖ Connexion r√©ussie.


### üß™ Test de connectivit√© (Sanity Check)
V√©rifions que Loki est bien accessible avant d'aller plus loin.

In [4]:
def test_connection():
    try:
        # On demande l'√©tat 'ready' de Loki
        response = requests.get(f"http://{LOKI_HOST}:{LOKI_PORT}/ready", timeout=2)
        if response.status_code == 200:
            print("‚úÖ Connexion au SIEM r√©ussie ! Loki est pr√™t.")
            return True
        else:
            print(f"‚ö†Ô∏è Loki r√©pond mais n'est pas pr√™t (Code: {response.status_code})")
            return False
    except Exception as e:
        print(f"‚ùå Impossible de joindre Loki : {e}")
        print("üëâ V√©rifiez que vos conteneurs Docker sont lanc√©s (docker-compose up -d)")
        return False

assert test_connection() == True, "La connexion au SIEM a √©chou√©. Arr√™tez-vous ici et fixez l'environnement."

‚úÖ Connexion au SIEM r√©ussie ! Loki est pr√™t.


## üß† Module 1 : Threat Intel & Recherche

Un bon analyste ne cherche pas au hasard. Il doit comprendre la **signature** de l'attaque.

**Th√®me 1 : Injection SQL (SQLi)**
> *Question :* Dans une attaque classique, quels caract√®res un attaquant injecte-t-il pour manipuler une requ√™te SQL via une URL ?
> *Votre r√©ponse :*

**Th√®me 2 : Path Traversal (LFI)**
> *Question :* Quelle s√©quence de caract√®res permet de remonter dans l'arborescence des fichiers ? Quel fichier Linux critique est souvent cibl√© ?
> *Votre r√©ponse :*

**Th√®me 3 : D√©ni de Service (Bruteforce / Flood)**
> *Question :* Quelle est la diff√©rence entre un utilisateur normal qui se trompe de mot de passe et un Bruteforce ?
> *Votre r√©ponse :*


## üîé Module 2 : Threat Hunting (LogQL)

Loki utilise le langage **LogQL**. C'est similaire au Grep, mais pour des logs distribu√©s.
Nous avons cr√©√© une fonction utilitaire `query_loki` pour vous simplifier la vie.

In [None]:
def query_loki(query, minutes=10):
    """ Interroge Loki et retourne la liste des r√©sultats bruts """
    end_time = datetime.now()
    start_time = end_time - timedelta(minutes=minutes)
    
    params = {
        'query': query,
        'start': int(start_time.timestamp() * 1e9),
        'end': int(end_time.timestamp() * 1e9),
        'limit': 1000,
        'direction': 'BACKWARD'
    }
    try:
        r = requests.get(LOKI_API, params=params)
        r.raise_for_status()
        return r.json()['data']['result']
    except Exception as e:
        print(f"‚ùå Erreur LogQL : {e}")
        return []

### Exercice 2.1 : D√©tection Bruteforce SSH
**Objectif :** Trouvez les logs indiquant un √©chec d'authentification dans le conteneur `victim_server`.
**Indice :** Le message syst√®me standard est "Failed password".

In [None]:
# --- ZONE √âTUDIANT : Compl√©tez la requ√™te LogQL ---
ssh_query = 'YOUR_CODE_HERE'

# --- VALIDATION --- 
ssh_logs = query_loki(ssh_query, minutes=30)

if len(ssh_logs) > 0:
    print(f"‚úÖ SUCC√àS : {len(ssh_logs)} logs de tentatives trouv√©s.")
    print(f"   Exemple : {ssh_logs[0]['values'][0][1]}")
else:
    print("‚ùå √âCHEC : Aucun log trouv√©. Avez-vous lanc√© l'attaque (hydra) ?")
    
assert len(ssh_logs) > 0, "La requ√™te SSH n'a rien retourn√©."

### Exercice 2.2 : D√©tection Injection SQL (Web)
**Objectif :** Trouvez les tentatives d'injection SQL dans le conteneur `web_server`.
**Indice :** Les logs sont au format JSON. Cherchez dans le champ `request_uri` les motifs comme `%27` (apostrophe), `OR`, `SELECT`.
**Syntaxe LogQL :** `{...} | json | field =~ "Regex"`

In [None]:
# --- ZONE √âTUDIANT : Compl√©tez la requ√™te (Utilisez | json et =~ pour la regex) ---
sqli_query = 'YOUR_CODE_HERE'


# --- VALIDATION ---
sqli_logs = query_loki(sqli_query, minutes=30)

count_sqli = sum(len(x['values']) for x in sqli_logs)
if count_sqli > 0:
    print(f"‚úÖ SUCC√àS : {count_sqli} tentatives d'injection SQL d√©tect√©es.")
else:
    print("‚ùå √âCHEC : Rien trouv√©. V√©rifiez votre Regex.")
    
assert count_sqli > 0, "Aucune injection SQL d√©tect√©e."

## üìà Module 3 : SIEM Engineering (Python & Pandas)

Le LogQL, c'est bien, mais pour des analyses complexes (d√©tection d'anomalies temporelles), Python est sup√©rieur.
Nous allons transformer les logs bruts en **Time Series** pour d√©tecter les pics de trafic (Bruteforce).

In [None]:
def logs_to_dataframe(raw_logs):
    """ Convertit le JSON Loki en DataFrame Pandas exploitable """
    records = []
    for stream in raw_logs:
        for entry in stream['values']:
            records.append({
                'timestamp': pd.to_datetime(int(entry[0]), unit='ns'),
                'message': entry[1]
            })
    if not records: return pd.DataFrame()
    
    df = pd.DataFrame(records)
    df.set_index('timestamp', inplace=True)
    df.sort_index(inplace=True)
    return df

print("‚úÖ Fonction de conversion charg√©e.")

### Exercice 3.1 : Calcul de la "Rolling Window"
**Mission :** Une attaque Bruteforce se caract√©rise par un grand nombre de requ√™tes en peu de temps.
Utilisez Pandas pour compter le nombre de tentatives **par fen√™tre glissante de 10 secondes**.

Si `count > 5` en 10 secondes -> **ALERTE**.

In [None]:
# R√©cup√©ration des donn√©es fra√Æches
raw_data = query_loki('{container="victim_server"} |= "Failed password"', minutes=10)
df_ssh = logs_to_dataframe(raw_data)

# --- ZONE √âTUDIANT : Utilisez .rolling() sur le DataFrame ---
df_ssh['rate'] = YOUR_CODE_HERE

# --- VALIDATION & VISUALISATION ---
if not df_ssh.empty:
    max_rate = df_ssh['attack_rate'].max()
    print(f"üìä Pic d'attaque d√©tect√© : {max_rate} tentatives / 10 sec")
    
    # Petit graphique pour voir l'attaque
    df_ssh['attack_rate'].plot(figsize=(10, 5), color='red', title="Intensit√© du Bruteforce SSH")
    plt.axhline(y=5, color='green', linestyle='--', label="Seuil d'alerte")
    plt.legend()
    plt.show()
    
    assert max_rate > 5, "Le script ne d√©tecte pas assez de volume pour une alerte."
else:
    print("‚ö†Ô∏è DataFrame vide. Impossible d'analyser.")

## üì¶ Module 4 : Network Forensics (PCAP)

Les logs ne disent pas tout. Parfois, l'attaquant bypass les logs applicatifs (ex: SYN Flood).
Nous devons analyser une capture r√©seau brute (`.pcap`) r√©alis√©e sur la machine victime.

In [None]:
from scapy.all import rdpcap, IP, TCP

PCAP_FILE = "/home/jovyan/work/evidence/capture.pcap"

try:
    packets = rdpcap(PCAP_FILE)
    print(f"üì¶ Capture charg√©e : {len(packets)} paquets.")
except FileNotFoundError:
    print("‚ö†Ô∏è Fichier PCAP introuvable. Assurez-vous d'avoir lanc√© tcpdump auparavant.")
    packets = []

In [None]:
# --- ZONE √âTUDIANT ---
# 1. Filtrer pour ne garder que les paquets TCP avec flag 'S' (SYN)
syn_packets = 

# 2. Extraire les IP sources en utilisant syn_packets
src_ips = 

# 3. Compter (Pandas est pratique pour √ßa)
if src_ips:
    ...
    top_talkers = 
    
    print("üö® Top Attaquants (SYN Flood) :")
    print(top_talkers.head())
    
    # --- VALIDATION ---
    attacker_ip = top_talkers.idxmax()
    print(f"\nüéØ Cible identifi√©e √† bannir : {attacker_ip}")
    
    # Graphique
    top_talkers.head(5).plot(kind='bar', color='orange', title="Top IP Sources (SYN)")
else:
    print("‚ö†Ô∏è Aucun paquet SYN trouv√©.")

## üõ°Ô∏è Module 5 : Active Response (Automatisation)

L'analyse est termin√©e. Il faut agir.
Nous allons interagir avec l'API Docker pour modifier les r√®gles **IPTables** du conteneur victime et bloquer l'IP trouv√©e.

In [None]:
# --- CONFIGURATION ---
TARGET_CONTAINER = "web_server"  # Le conteneur √† prot√©ger
BAD_IP = ""            # Mettre ici l'IP trouv√©e dans l'exercice pr√©c√©dent


def ban_ip(ip):
    client = docker.from_env()
    container = client.containers.get('web_server')
    
    # --- ZONE √âTUDIANT : √âcrivez la commande iptables pour DROP l'IP ---
    cmd = f"iptables ..."
    
    # Ex√©cution
    # container.exec_run(cmd, privileged=True)
    print(f"üõ°Ô∏è Tentative de blocage de {ip}")

ban_ip(attacker_ip)

## üìù Rapport Final
Ex√©cutez cette cellule pour g√©n√©rer le rapport d'incident.

In [None]:
print("="*40)
print("üìÑ RAPPORT D'INCIDENT DE S√âCURIT√â")
print("="*40)
print(f"üìÖ Date : {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"üö® Alertes SSH : {len(df_ssh) if 'df_ssh' in locals() else 0}")
print(f"üö® Alertes SQLi : {count_sqli if 'count_sqli' in locals() else 0}")
print(f"üõë Action prise : Ban IP {BAD_IP}")
print("="*40)
print("‚úÖ FIN DU WORKSHOP")