# Projet Incendies — Notebook Nettoyage, EDA & Modélisation
**Dernière génération :** 2025-09-12 10:57 UTC

Ce notebook fournit un pipeline **complet et documenté** pour :
1) Explorer une base **SQLite** (structure + contenu),
2) **Nettoyer** les données avec des décisions guidées par des métriques (manquants, doublons, outliers, catégories rares),
3) Réaliser une **EDA** (analyses descriptives & graphiques),
4) Construire un **modèle de classification de base** (Random Forest) pour prédire le risque d’incendie **majeur**.

> ⚠️ Adapte au besoin le chemin de la base (`db_path`) et le nom de table (`table_name`).

## 0) Setup & Imports

In [None]:
# === Imports ===
import sqlite3
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Ne pas utiliser seaborn par défaut pour limiter les dépendances et respecter des consignes stricte
# (Tu peux l'activer si tu le souhaites)
# import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, RocCurveDisplay

# === Paramètres généraux ===
pd.set_option("display.max_columns", 120)
pd.set_option("display.width", 140)
plt.rcParams['figure.figsize'] = (9,5)

# === Chemins / Tables ===
db_path = "data/incendies.db"      # ← adapte si besoin
table_name = "incendies"           # ← adapte si besoin

Path(db_path).exists(), db_path


## 1) Inspecter la base SQLite (structure)

In [None]:
con = sqlite3.connect(db_path)

# Liste des tables
tables = pd.read_sql("""
SELECT name AS table_name
FROM sqlite_master
WHERE type='table' AND name NOT LIKE 'sqlite_%'
ORDER BY name;
""", con)
tables


In [None]:
# Nombre de lignes par table + nb de colonnes (approx via PRAGMA)
def table_shape_sqlite(connection):
    out = []
    for t in tables['table_name'].tolist():
        n = pd.read_sql(f"SELECT COUNT(*) AS n FROM {t};", connection).iloc[0,0]
        cols = pd.read_sql(f"PRAGMA table_info({t});", connection)
        out.append({"table": t, "rows": n, "cols": len(cols)})
    return pd.DataFrame(out).sort_values("rows", ascending=False)

table_shapes = table_shape_sqlite(con)
table_shapes


## 2) Charger la table principale

In [None]:
# Charger un échantillon rapide (si table très grosse) puis plein si OK
sample_rows = 200_000  # ajuste si besoin
try:
    df = pd.read_sql(f"SELECT * FROM {table_name} LIMIT {sample_rows};", con)
except Exception as e:
    print("Erreur de lecture : ", e)
    raise

print("Shape (sample):", df.shape)
df.head(3)


## 3) Normaliser les noms de colonnes

In [None]:
def normalize_cols(columns):
    repl = ((" ", "_"), ("é","e"), ("è","e"), ("ê","e"), ("à","a"), ("ô","o"), ("û","u"), ("(",""), (")",""))
    out = []
    for c in columns:
        cc = c.strip().lower()
        for a,b in repl:
            cc = cc.replace(a,b)
        out.append(cc)
    return out

original_cols = df.columns.tolist()
df.columns = normalize_cols(df.columns)
df.columns[:10], len(df.columns)


> **Note :** on garde une trace du mapping `original → normalisé` pour documenter le schéma si besoin :

In [None]:
col_map = {o:n for o,n in zip(original_cols, df.columns)}
pd.DataFrame(col_map.items(), columns=["original","normalise"]).head(20)


## 4) Parsing des dates

In [None]:
# Essaie de détecter une colonne date plausible
date_candidates = [c for c in df.columns if 'date' in c or 'alerte' in c]
date_candidates


In [None]:
date_col = None
for c in date_candidates:
    try:
        tmp = pd.to_datetime(df[c], errors="raise")
        date_col = c
        break
    except Exception:
        continue

if date_col is None and len(date_candidates):
    # fallback permissif
    date_col = date_candidates[0]
    df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
elif date_col:
    df[date_col] = pd.to_datetime(df[date_col], errors="coerce")

date_col


In [None]:
# Extraire calendrier si date présente
if date_col:
    df['annee'] = df[date_col].dt.year
    df['mois']  = df[date_col].dt.month
    df['jour']  = df[date_col].dt.day
    df['jour_annee'] = df[date_col].dt.dayofyear
df[[date_col, 'annee','mois','jour']].head(3) if date_col else df.head(1)


## 5) Identifier numériques / catégorielles / surfaces

In [None]:
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
cat_cols = [c for c in df.columns if c not in num_cols]

# Colonnes surfaces
surface_cols = [c for c in df.columns if 'surface' in c]
num_cols[:10], cat_cols[:10], surface_cols[:5]


## 6) Diagnostic qualité — métriques pour décider (ne rien supprimer à l’aveugle)

In [None]:
def missing_report(dataframe):
    miss = dataframe.isna().sum().sort_values(ascending=False)
    ratio = (dataframe.isna().mean()*100).sort_values(ascending=False)
    return pd.DataFrame({'manquants': miss, 'pourcentage': ratio}).reset_index(names='col')

def duplicate_report(dataframe):
    n_dup = dataframe.duplicated().sum()
    return pd.DataFrame({'doublons':[n_dup], 'pourcentage':[100*n_dup/len(dataframe) if len(dataframe) else 0]})

def outlier_report_iqr(series):
    s = pd.to_numeric(series, errors='coerce').dropna()
    if s.empty:
        return {'q1':np.nan,'q3':np.nan,'iqr':np.nan,'borne_sup':np.nan,'outliers':0,'pct':0.0}
    q1, q3 = s.quantile([0.25, 0.75])
    iqr = q3 - q1
    borne_sup = q3 + 1.5*iqr
    out = (s > borne_sup).sum()
    return {'q1':q1,'q3':q3,'iqr':iqr,'borne_sup':borne_sup,'outliers':out,'pct':100*out/len(s)}

def category_report(series, top_n=20):
    vc = series.astype('string').fillna('<NA>').value_counts(dropna=False, normalize=True)*100
    return vc.head(top_n).rename('pct').to_frame()

# Rapports
miss_df = missing_report(df)
dup_df  = duplicate_report(df)
miss_df.head(15), dup_df


In [None]:
# Rapport d'outliers (IQR) pour les colonnes numériques principales (dont surfaces si numériques)
iqr_rows = []
for c in (surface_cols if surface_cols else num_cols):
    stats = outlier_report_iqr(df[c])
    stats['col'] = c
    iqr_rows.append(stats)
iqr_df = pd.DataFrame(iqr_rows).sort_values('pct', ascending=False)
iqr_df.head(15)


> **Interprétation (règles pratiques)**  
- **Valeurs manquantes** :  
  - `< 5%` → imputation simple (0, médiane, mode) acceptable.  
  - `5–30%` → à discuter, imputation prudente.  
  - `> 30%` → envisager de supprimer la colonne **non critique**.  
- **Doublons** :  
  - `< 1%` → suppression OK.  
  - `> 10%` → probable problème amont (ingestion / jointures).  
- **Outliers (IQR)** :  
  - `< 1%` → capping/suppression possible.  
  - `> 5%` → peut refléter des cas extrêmes **réels** (à garder).

## 7) Paramètres & Actions de Nettoyage

In [None]:
# === Paramètres de décision ===
MISSING_DROP_THRESHOLD = 30.0   # % manquants au-delà duquel on drop (si non critique)
OUTLIER_CAP = True              # True = on tronque au seuil IQR
FILL_SURFACES_WITH_ZERO = True  # surfaces NaN → 0
FILL_CATEG_MODE = True          # catégorielles NaN → mode

# === 7.1) Traiter les colonnes de surfaces ===
for c in surface_cols:
    df[c] = pd.to_numeric(df[c], errors='coerce')
if FILL_SURFACES_WITH_ZERO and surface_cols:
    df[surface_cols] = df[surface_cols].fillna(0)
    for c in surface_cols:
        df.loc[df[c] < 0, c] = 0  # pas de valeurs négatives logiques

# === 7.2) Imputer certaines catégorielles par le mode ===
if FILL_CATEG_MODE and cat_cols:
    for c in cat_cols:
        mode_val = df[c].mode(dropna=True)
        if len(mode_val):
            df[c] = df[c].fillna(mode_val[0]).astype(str).str.strip().str.lower()

# === 7.3) Supprimer colonnes trop manquantes (non critiques) ===
cols_to_drop = miss_df.loc[miss_df['pourcentage']>MISSING_DROP_THRESHOLD, 'col'].tolist()
# Protéger des colonnes critiques si tu en as (ex: identifiants, dates, codes INSEE)
critical_cols = [date_col] if date_col else []
cols_to_drop = [c for c in cols_to_drop if c not in critical_cols]

df.drop(columns=list(set(cols_to_drop)), inplace=True, errors='ignore')
cols_to_drop


In [None]:
# === 7.4) Capping des outliers (IQR) ===
if OUTLIER_CAP:
    for c in (surface_cols if surface_cols else num_cols):
        stats = outlier_report_iqr(df[c])
        if not np.isnan(stats['borne_sup']):
            df.loc[df[c] > stats['borne_sup'], c] = stats['borne_sup']

# === 7.5) Supprimer les doublons ===
n_dup = df.duplicated().sum()
df = df.drop_duplicates()
n_dup


## 8) Variables dérivées utiles

In [None]:
# Total des surfaces brûlées
if surface_cols:
    df['surface_totale'] = df[surface_cols].sum(axis=1)
else:
    df['surface_totale'] = np.nan

# Cible binaire : incendie majeur (> 1000 m²)
df['incendie_majeur'] = (df['surface_totale'] > 1000).astype(int)

# Saisonnalité cyclique (si mois dispo)
if 'mois' in df.columns:
    df['sin_mois'] = np.sin(2*np.pi*df['mois']/12)
    df['cos_mois'] = np.cos(2*np.pi*df['mois']/12)

df[['surface_totale','incendie_majeur']].head(5)


## 9) EDA — Explorations rapides

In [None]:
# 9.1) Nombre d'incendies par année
if 'annee' in df.columns:
    fires_per_year = df.groupby('annee').size()
    ax = fires_per_year.plot(marker='o')
    ax.set_title("Nombre d'incendies par année")
    ax.set_xlabel("Année")
    ax.set_ylabel("Nombre")
    ax.grid(True)
    plt.show()

# 9.2) Distribution des surfaces totales (log)
if 'surface_totale' in df.columns:
    ax = df['surface_totale'].plot(kind='hist', bins=60, log=True)
    ax.set_title("Distribution des surfaces totales (log)")
    ax.set_xlabel("Surface (m²)")
    plt.show()


In [None]:
# 9.3) Top départements (si colonne dispo)
dept_col_candidates = [c for c in df.columns if c in ['departement','code_departement','dept','code_dept']]
if dept_col_candidates:
    dcol = dept_col_candidates[0]
    top_dept = df[dcol].astype(str).value_counts().head(15)
    ax = top_dept.plot(kind='bar')
    ax.set_title(f"Top départements par nombre d'enregistrements ({dcol})")
    ax.set_ylabel("Nombre")
    plt.xticks(rotation=45)
    plt.show()


## 10) Modélisation — Baseline Random Forest (classification)

In [None]:
# === Features ===
feature_cols = []
if surface_cols:
    feature_cols += surface_cols
for c in ['annee','mois','sin_mois','cos_mois']:
    if c in df.columns:
        feature_cols.append(c)

# Sélection et nettoyage final
X = df[feature_cols].copy().fillna(0)
y = df['incendie_majeur'].copy() if 'incendie_majeur' in df.columns else None

X.shape, y.value_counts(normalize=True) if y is not None else "Pas de cible"


In [None]:
# Split train/test (stratifié si possible)
if y is not None and y.nunique() == 2 and len(X) > 0:
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )

    rf = RandomForestClassifier(n_estimators=250, random_state=42, n_jobs=-1)
    rf.fit(X_train, y_train)
    y_pred = rf.predict(X_test)
    y_proba = rf.predict_proba(X_test)[:,1] if hasattr(rf, 'predict_proba') else None

    print("=== Rapport de classification ===")
    print(classification_report(y_test, y_pred))

    # Matrice de confusion
    cm = confusion_matrix(y_test, y_pred)
    print("Matrice de confusion:\n", cm)

    # ROC-AUC si proba dispo
    if y_proba is not None:
        auc = roc_auc_score(y_test, y_proba)
        print(f"ROC-AUC: {auc:.3f}")
        RocCurveDisplay.from_predictions(y_test, y_proba)
        plt.show()

    # Importances
    imp = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=False)
    display(imp.head(15))
    ax = imp.head(20).sort_values().plot(kind='barh')
    ax.set_title("Top importances — RandomForest")
    plt.show()
else:
    print("Cible indisponible ou données insuffisantes pour l'apprentissage.")


## 11) Pistes d'amélioration
- **Validation géographique** : entraîner sur des départements et tester sur d’autres.
- **Enrichissement** : ajouter météo (vent, température, sécheresse), végétation (Corine Land Cover), altitude.
- **Modèles** : Gradient Boosting (XGBoost/LightGBM), calibration des probabilités, courbe PR.
- **Clustering spatio-temporel** : DBSCAN/HDBSCAN pour hotspots + saisonnalité (features cycliques plus fines).
- **App Streamlit** : deux onglets *Historique* et *Prédiction* pour une démonstration end‑to‑end.
