# TP Final : Analyse de Logs - CORRIGÉ

Solutions complètes du TP d'analyse de logs.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("TP_LogAnalysis_Corrige") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

## Partie 2 : Parsing - SOLUTION

In [None]:
# Pattern regex pour Apache Combined Log
APACHE_PATTERN = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"'

def parse_logs(df_raw):
    """Parse les logs Apache en DataFrame structuré"""
    
    # Extraction avec regex
    df_parsed = df_raw.select(
        F.regexp_extract('value', APACHE_PATTERN, 1).alias('ip'),
        F.regexp_extract('value', APACHE_PATTERN, 4).alias('timestamp_str'),
        F.regexp_extract('value', APACHE_PATTERN, 5).alias('method'),
        F.regexp_extract('value', APACHE_PATTERN, 6).alias('path'),
        F.regexp_extract('value', APACHE_PATTERN, 8).alias('status'),
        F.regexp_extract('value', APACHE_PATTERN, 9).alias('size'),
        F.regexp_extract('value', APACHE_PATTERN, 10).alias('referer'),
        F.regexp_extract('value', APACHE_PATTERN, 11).alias('user_agent')
    )
    
    # Conversion des types
    df_typed = df_parsed \
        .withColumn("timestamp", 
            F.to_timestamp("timestamp_str", "dd/MMM/yyyy:HH:mm:ss Z")) \
        .withColumn("status", F.col("status").cast("int")) \
        .withColumn("size", 
            F.when(F.col("size") == "-", 0)
             .otherwise(F.col("size").cast("long"))) \
        .withColumn("date", F.to_date("timestamp")) \
        .withColumn("hour", F.hour("timestamp")) \
        .drop("timestamp_str")
    
    # Filtrer les lignes valides
    df_valid = df_typed.filter(
        (F.col("ip").isNotNull()) & 
        (F.col("ip") != "") &
        (F.col("timestamp").isNotNull()) &
        (F.col("status").isNotNull())
    )
    
    return df_valid

# Application
# logs_clean = parse_logs(logs_raw)
# logs_clean.cache()
# print(f"Logs valides: {logs_clean.count()}")

## Partie 3 : Métriques - SOLUTION

In [None]:
def compute_metrics(df):
    """Calcule les métriques principales"""
    
    # Requêtes par heure
    hourly = df.groupBy(
        F.date_trunc("hour", "timestamp").alias("hour")
    ).agg(
        F.count("*").alias("requests"),
        F.countDistinct("ip").alias("unique_ips"),
        F.sum("size").alias("bytes_sent")
    ).orderBy("hour")
    
    # Distribution des status codes
    total = df.count()
    status_dist = df.groupBy("status").agg(
        F.count("*").alias("count")
    ).withColumn(
        "percentage", F.round(F.col("count") / total * 100, 2)
    ).orderBy(F.desc("count"))
    
    # Top pages
    top_pages = df.groupBy("path").agg(
        F.count("*").alias("hits"),
        F.countDistinct("ip").alias("unique_visitors")
    ).orderBy(F.desc("hits")).limit(10)
    
    # Top IPs
    top_ips = df.groupBy("ip").agg(
        F.count("*").alias("requests"),
        F.countDistinct("path").alias("unique_paths")
    ).orderBy(F.desc("requests")).limit(10)
    
    return {
        "hourly": hourly,
        "status_dist": status_dist,
        "top_pages": top_pages,
        "top_ips": top_ips
    }

## Partie 4 : Détection d'anomalies - SOLUTION

In [None]:
def detect_anomalies(df):
    """Détecte les anomalies dans les logs"""
    
    # IPs suspectes
    suspicious_ips = df.groupBy("ip").agg(
        F.count("*").alias("total_requests"),
        F.sum(F.when(F.col("status") >= 400, 1).otherwise(0)).alias("errors")
    ).withColumn(
        "error_rate", F.round(F.col("errors") / F.col("total_requests") * 100, 2)
    ).filter(
        (F.col("total_requests") > 500) | (F.col("error_rate") > 50)
    ).orderBy(F.desc("total_requests"))
    
    # Tentatives malveillantes
    malicious_paths = ["/admin", "/.env", "/wp-admin", "/phpmyadmin", 
                       "/config", "/.git", "/backup"]
    
    malicious_attempts = df.filter(
        F.col("path").isin(malicious_paths)
    ).groupBy("ip", "path").agg(
        F.count("*").alias("attempts"),
        F.collect_set("status").alias("statuses")
    ).orderBy(F.desc("attempts"))
    
    # Pics de trafic (avec window functions)
    hourly_traffic = df.groupBy(
        F.date_trunc("hour", "timestamp").alias("hour")
    ).count()
    
    window_spec = Window.orderBy("hour").rowsBetween(-6, 0)
    
    traffic_anomalies = hourly_traffic \
        .withColumn("moving_avg", F.avg("count").over(window_spec)) \
        .withColumn("moving_std", F.stddev("count").over(window_spec)) \
        .withColumn("is_anomaly",
            F.abs(F.col("count") - F.col("moving_avg")) > 2 * F.col("moving_std")
        ).filter(F.col("is_anomaly"))
    
    # Erreurs 5xx par endpoint
    server_errors = df.filter(
        F.col("status").between(500, 599)
    ).groupBy("path").agg(
        F.count("*").alias("error_count")
    ).orderBy(F.desc("error_count")).limit(10)
    
    return {
        "suspicious_ips": suspicious_ips,
        "malicious_attempts": malicious_attempts,
        "traffic_anomalies": traffic_anomalies,
        "server_errors": server_errors
    }

## Partie 5 : Export - SOLUTION

In [None]:
def export_results(df, output_path):
    """Exporte les résultats en Parquet partitionné"""
    
    # Ajouter colonnes de partitionnement
    df_export = df \
        .withColumn("year", F.year("timestamp")) \
        .withColumn("month", F.month("timestamp")) \
        .withColumn("day", F.dayofmonth("timestamp"))
    
    # Repartitionner pour optimiser l'écriture
    df_repartitioned = df_export.repartition("date")
    
    # Écrire en Parquet partitionné
    df_repartitioned.write \
        .partitionBy("year", "month", "day") \
        .mode("overwrite") \
        .parquet(f"{output_path}/logs_processed")
    
    print(f"Données exportées vers {output_path}/logs_processed")

In [None]:
def create_summary_report(metrics, anomalies):
    """Crée un rapport de synthèse"""

    
    print("\nTop 5 Pages:")
    metrics["top_pages"].show(5)
    
    print("\nDistribution des Codes HTTP:")
    metrics["status_dist"].show()
    
    print("\nIPs Suspectes:")
    anomalies["suspicious_ips"].show()
    
    print("\nTentatives Malveillantes:")
    anomalies["malicious_attempts"].show(10)
    
    print("\nEndpoints avec Erreurs 5xx:")
    anomalies["server_errors"].show()

In [None]:
spark.stop()