# TP Final : Analyse de Logs Web avec Spark

## Contexte
Vous êtes Data Engineer dans une entreprise e-commerce. On vous demande d'analyser les logs du serveur web pour:
- Comprendre le trafic
- Identifier les problèmes
- Détecter les anomalies

## Durée : 3 heures

## Livrables attendus
1. Pipeline de parsing robuste
2. Rapport de métriques
3. Détection d'anomalies
4. Données nettoyées en Parquet

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import re

spark = SparkSession.builder \
    .appName("TP_LogAnalysis") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")

Spark Version: 3.5.7


---
## Partie 1 : Génération des données de test (Fourni)

Ce code génère des logs réalistes pour le TP.

In [2]:
import random
from datetime import datetime, timedelta

def generate_logs(n_logs=10000):
    """Génère des logs Apache réalistes"""
    
    ips = [f"192.168.1.{i}" for i in range(1, 50)] + \
          [f"10.0.0.{i}" for i in range(1, 30)] + \
          ["203.0.113.42", "198.51.100.23"]  # IPs suspectes
    
    paths = [
        "/", "/index.html", "/about", "/contact",
        "/products", "/products/1", "/products/2", "/products/3",
        "/api/users", "/api/products", "/api/orders",
        "/login", "/logout", "/register",
        "/static/css/style.css", "/static/js/app.js",
        "/admin", "/admin/users",  # Tentatives admin
        "/.env", "/wp-admin", "/phpmyadmin",  # Tentatives malveillantes
    ]
    
    methods = ["GET"] * 80 + ["POST"] * 15 + ["PUT"] * 3 + ["DELETE"] * 2
    
    # Status codes avec poids réalistes
    statuses = [200] * 70 + [201] * 5 + [301] * 5 + [302] * 5 + \
               [400] * 3 + [401] * 3 + [403] * 2 + [404] * 5 + [500] * 2
    
    user_agents = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)",
        "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36",
        "curl/7.68.0",
        "python-requests/2.25.1",
        "-",  # Bot sans user-agent
    ]
    
    logs = []
    base_time = datetime(2024, 1, 15, 0, 0, 0)
    
    for i in range(n_logs):
        # Progression temporelle avec variations
        time_offset = timedelta(seconds=random.randint(0, 86400 * 3))  # 3 jours
        log_time = base_time + time_offset
        
        ip = random.choice(ips)
        method = random.choice(methods)
        path = random.choice(paths)
        status = random.choice(statuses)
        size = random.randint(100, 50000) if status == 200 else random.randint(0, 500)
        user_agent = random.choice(user_agents)
        referer = random.choice(["-", "https://www.google.com", "https://example.com"])
        
        # Simuler des attaques de certaines IPs
        if ip in ["203.0.113.42", "198.51.100.23"]:
            path = random.choice(["/admin", "/.env", "/wp-admin", "/login"])
            status = random.choice([401, 403, 404])
        
        # Format Apache Combined Log
        timestamp = log_time.strftime("%d/%b/%Y:%H:%M:%S +0000")
        log_line = f'{ip} - - [{timestamp}] "{method} {path} HTTP/1.1" {status} {size} "{referer}" "{user_agent}"'
        logs.append(log_line)
    
    # Ajouter quelques lignes malformées (5%)
    n_bad = int(n_logs * 0.05)
    bad_logs = [
        "invalid log line",
        "192.168.1.1 - - malformed",
        "",
        "# comment line",
    ]
    for _ in range(n_bad):
        idx = random.randint(0, len(logs) - 1)
        logs.insert(idx, random.choice(bad_logs))
    
    return logs

# Générer les logs
log_lines = generate_logs(50000)
print(f"Logs générés: {len(log_lines)}")
print("\nExemples:")
for line in log_lines[:5]:
    print(line)

Logs générés: 52500

Exemples:
10.0.0.4 - - [15/Jan/2024:17:57:36 +0000] "POST /products/1 HTTP/1.1" 200 45462 "-" "-"
192.168.1.32 - - [15/Jan/2024:05:39:01 +0000] "GET /api/products HTTP/1.1" 200 23333 "https://example.com" "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)"
192.168.1.27 - - [16/Jan/2024:03:45:31 +0000] "POST / HTTP/1.1" 200 11266 "https://example.com" "-"
192.168.1.8 - - [16/Jan/2024:02:29:29 +0000] "GET /.env HTTP/1.1" 200 19334 "https://example.com" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
192.168.1.1 - - [16/Jan/2024:02:42:48 +0000] "GET /.env HTTP/1.1" 200 49086 "https://example.com" "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36"


In [3]:
# === Créer un RDD
rdd_logs = spark.sparkContext.parallelize(log_lines)
print("- rdd_logs:")
for line in rdd_logs.take(5):
    print(line)
# === DataFrame des logs
schema = StructType([StructField("log_message", StringType(), True)])
df_logs = spark.createDataFrame([(log,) for log in log_lines], schema)
print("- df_logs:")
df_logs.show()
print(f"Nombre de lignes: {df_logs.count()}")

- rdd_logs:
10.0.0.4 - - [15/Jan/2024:17:57:36 +0000] "POST /products/1 HTTP/1.1" 200 45462 "-" "-"
192.168.1.32 - - [15/Jan/2024:05:39:01 +0000] "GET /api/products HTTP/1.1" 200 23333 "https://example.com" "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X)"
192.168.1.27 - - [16/Jan/2024:03:45:31 +0000] "POST / HTTP/1.1" 200 11266 "https://example.com" "-"
192.168.1.8 - - [16/Jan/2024:02:29:29 +0000] "GET /.env HTTP/1.1" 200 19334 "https://example.com" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
192.168.1.1 - - [16/Jan/2024:02:42:48 +0000] "GET /.env HTTP/1.1" 200 49086 "https://example.com" "Mozilla/5.0 (Linux; Android 10) AppleWebKit/537.36"
- df_logs:
+--------------------+
|         log_message|
+--------------------+
|10.0.0.4 - - [15/...|
|192.168.1.32 - - ...|
|192.168.1.27 - - ...|
|192.168.1.8 - - [...|
|192.168.1.1 - - [...|
|192.168.1.17 - - ...|
|192.168.1.36 - - ...|
|192.168.1.14 - - ...|
|192.168.1.5 - - [...|
|192.168.1.49 - - ...|
|10.0.0.

---
## Partie 2 : Parsing des logs (45 min)

### Tâches:
1. Définir le pattern regex pour le format Apache Combined
2. Extraire tous les champs
3. Convertir les types (timestamp, status, size)
4. Gérer les lignes invalides
5. Calculer le taux de lignes valides/invalides

In [None]:
# TODO: Définir le pattern regex
# Format: IP - - [timestamp] "METHOD PATH HTTP/1.1" STATUS SIZE "REFERER" "USER-AGENT"

APACHE_PATTERN = r'???'  # À compléter

# TODO: Parser les logs
logs_parsed = logs_raw  # À remplacer

# Afficher le résultat
logs_parsed.show(5, truncate=False)

In [None]:
# TODO: Convertir les types
# - timestamp: to_timestamp avec format "dd/MMM/yyyy:HH:mm:ss Z"
# - status: integer
# - size: long (gérer les "-")

logs_typed = logs_parsed  # À remplacer

In [None]:
# TODO: Calculer le taux de validité
total_lines = logs_raw.count()
valid_lines = 0  # À calculer
invalid_lines = 0  # À calculer

print(f"Total: {total_lines}")
print(f"Valides: {valid_lines} ({valid_lines/total_lines*100:.2f}%)")
print(f"Invalides: {invalid_lines} ({invalid_lines/total_lines*100:.2f}%)")

---
## Partie 3 : Métriques de base (45 min)

### Tâches:
1. Requêtes par heure
2. Distribution des codes HTTP
3. Top 10 des pages
4. Top 10 des IPs
5. Répartition par méthode HTTP

In [None]:
# TODO: Requêtes par heure
# Ajouter une colonne hour et grouper

In [None]:
# TODO: Distribution des codes HTTP
# Grouper par status, compter, calculer le pourcentage

In [None]:
# TODO: Top 10 des pages les plus visitées

In [None]:
# TODO: Top 10 des IPs les plus actives

In [None]:
# TODO: Répartition par méthode HTTP

---
## Partie 4 : Détection d'anomalies (45 min)

### Tâches:
1. IPs suspectes (beaucoup de requêtes, beaucoup d'erreurs)
2. Tentatives d'accès admin/malveillantes
3. Pics de trafic anormaux
4. Erreurs 5xx fréquentes

In [None]:
# TODO: Identifier les IPs suspectes
# Critères:
# - Plus de 500 requêtes
# - Ou plus de 50% d'erreurs (4xx/5xx)

In [None]:
# TODO: Tentatives d'accès malveillantes
# Paths suspects: /admin, /.env, /wp-admin, /phpmyadmin

In [None]:
# TODO: Détecter les pics de trafic
# Utiliser une window function pour calculer la moyenne mobile
# Identifier les heures avec trafic > 2 écarts-types de la moyenne

In [None]:
# TODO: Analyser les erreurs 5xx
# Grouper par path et compter les erreurs 5xx
# Identifier les endpoints problématiques

---
## Partie 5 : Export et optimisation (30 min)

### Tâches:
1. Sauvegarder les logs nettoyés en Parquet partitionné par date
2. Sauvegarder les rapports d'anomalies
3. Optimiser le partitionnement

In [None]:
# TODO: Ajouter une colonne date pour le partitionnement
# Sauvegarder en Parquet partitionné

In [None]:
# TODO: Créer un rapport de synthèse
# Combiner toutes les métriques dans un DataFrame de rapport

In [None]:
# Nettoyage
spark.stop()
print("TP terminé!")