In [2]:
import pandas as pd
import seaborn as sns
import plotly.express as px
import matplotlib.pyplot as plt
import plotly.io as pio
import sklearn
import warnings
import sksurv.datasets
import numpy as np
from sksurv.metrics import concordance_index_censored

from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.exceptions import UndefinedMetricWarning
from sklearn import set_config
from sklearn.model_selection import GridSearchCV, KFold
from sklearn.pipeline import make_pipeline
from sksurv.datasets import load_breast_cancer
from sksurv.linear_model import CoxnetSurvivalAnalysis, CoxPHSurvivalAnalysis
from sksurv.preprocessing import OneHotEncoder
from sksurv.util import Surv

warnings.filterwarnings("ignore", category=UndefinedMetricWarning)
set_config(display="text")  # displays text representation of estimators

In [3]:
df_corse=pd.read_csv('https://projet-incendie.s3.eu-west-3.amazonaws.com/dataset_modele_decompte.csv', sep=';')




#### Modèle MVP

In [4]:
# 1. Supprimer les colonnes avec trop de NaN (> 50% par exemple)
threshold = 0.5
cols_to_keep = df_corse.columns[df_corse.isnull().mean() < threshold]
df_clean = df_corse[cols_to_keep].copy()
df_clean['Feu prévu'] = df_clean['Feu prévu'].astype(bool)  # Assurez-vous que 'Feux prévu' est de type float

# 2. Sélectionner uniquement les colonnes numériques nécessaires pour le modèle (ici à adapter)
features = [
    "RR", "TN", "TX", "TM", "UM", "ETPGRILLE", "ETPMON",
    "moyenne precipitations année", "moyenne precipitations mois",
    "moyenne evapotranspiration année", "moyenne evapotranspiration mois",
    "moyenne vitesse vent année", "moyenne vitesse vent mois",
    "moyenne temperature année", "moyenne temperature mois","Nombre de feu par an","Nombre de feu par mois",'jours_sans_pluie','jours_TX_sup_30','ETPGRILLE_7j'
]
# Ne garder que les features existantes dans df_clean (évite erreur si colonne supprimée)
features = [f for f in features if f in df_clean.columns]

# 3. Retirer les lignes où y ("évènement" et "décompte") est manquant
df_clean = df_clean.dropna(subset=["Feu prévu", "décompte"])

# 4. Extraire X et y
X = df_clean[features]
y = Surv.from_dataframe("Feu prévu", "décompte", df_clean)

# 5. Pipeline avec SimpleImputer pour remplacer les NaN restants par la médiane
pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),
    ("coxnet", CoxnetSurvivalAnalysis(l1_ratio=0.9, alpha_min_ratio=0.05))
])

# 6. Fit du modèle
pipeline.fit(X, y)

# 7. Vérifier qu'il n'y a plus de NaN dans X imputé (optionnel)
X_imputed = pipeline.named_steps["imputer"].transform(X)
# print(f"Nombre de NaN après imputation: {np.isnan(X_imputed).sum()}")
risque = pipeline.predict(X)

df_clean["risque_feu"] = risque

### Modele predict survival fonction

In [None]:
X = SimpleImputer(strategy="median").fit_transform(X)
estimator = CoxnetSurvivalAnalysis(l1_ratio=0.99, fit_baseline_model=True)
estimator.fit(X, y)

X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
surv_funcs = {}
for alpha in estimator.alphas_[:5]:
    # print(f"Shape of X_df.iloc[:1]: {X_df.iloc[:1].shape}")
    surv_funcs[alpha] = estimator.predict_survival_function(
        X_df.iloc[:1], alpha=alpha)

for alpha, surv_alpha in surv_funcs.items():
     for fn in surv_alpha:
         plt.step(fn.x, fn(fn.x), where="post",
                  label=f"alpha = {alpha:.3f}")


In [None]:


# Étape 1 : prédire les fonctions de survie sur tout X
surv_funcs = estimator.predict_survival_function(X)

# Étape 2 : évaluer S(t) à t = 365 jours (exemple)
durees = [7, 30, 60, 90, 180, 365]  # ou autre valeur de durée
df_plot = df_clean.copy()
for t in durees:
    risques = []
    for fn in surv_funcs:
        # Si t dépasse la durée max connue de la fonction, on prend la dernière valeur
        if t in fn.x:
            risques.append(1 - fn(t))
        else:
            risques.append(1 - fn(fn.x[-1]))
    df_plot[f"risque_{t}j"] = risques

# Étape 4 : afficher avec plotly (attention aux noms de colonnes)
fig = px.scatter_map(
    df_plot,
    lat="latitude",
    lon="longitude",
    color="risque_7j",
    # color="risque_365j",
    color_continuous_scale="YlOrRd",
    hover_name="ville",
    hover_data={"risque_7j": True},
    # hover_data={"risque_365j": True},
    zoom=7,
    height=800
)

fig.update_layout(
    mapbox_style="open-street-map",
    title="Risque estimé selon la durée (jours)",
    margin={"r": 0, "t": 40, "l": 0, "b": 0},
    updatemenus=[
        dict(
            type="dropdown",
            direction="down",
            buttons=[
                dict(label="7 jours", method="restyle", args=[{"marker.color": [df_plot["risque_7j"]]}]),
                dict(label="30 jours", method="restyle", args=[{"marker.color": [df_plot["risque_30j"]]}]),
                dict(label="60 jours", method="restyle", args=[{"marker.color": [df_plot["risque_60j"]]}]),
                dict(label="90 jours", method="restyle", args=[{"marker.color": [df_plot["risque_90j"]]}]),
                dict(label="180 jours", method="restyle", args=[{"marker.color": [df_plot["risque_180j"]]}]),
                dict(label="365 jours", method="restyle", args=[{"marker.color": [df_plot["risque_365j"]]}])
            ],
            pad={"r": 10, "t": 10},
            showactive=True,
            x=0.05,
            xanchor="left",
            y=1.1,
            yanchor="top"
        )
    ]
)

fig.show()

