In [None]:
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import LabelEncoder
import shap

# Charger les données
df = pd.read_csv("new_data.csv")

# Sélection des features
features = [
    # Valeur
    "assessed_value_usd",
    "value_per_kg",
    "AVG_FOB_x", "AVG_TAX_x",
    "AVG_FOB_y", "AVG_TAX_y",
    "AVG_FOB_3MTH", "AVG_TAX_3MTH",
    
    # Quantité
    "item_count",
    "net_weight_kg",
    "gross_weight_kg",
    
    # Origine
    "origin_country",
    
    # Fréquences
    "DECL_FREQ_x",
    "DECL_FREQ_y"
]

X = df[features].copy()

# Encoder les colonnes catégorielles
for col in X.select_dtypes(include=["object"]).columns:
    X[col] = LabelEncoder().fit_transform(X[col].astype(str))

# Isolation Forest
iso = IsolationForest(
    n_estimators=300,
    contamination=0.05,
    random_state=42
)
df["anomaly_score"] = iso.fit_predict(X)

# SHAP pour interprétation
explainer = shap.TreeExplainer(iso)
shap_values = explainer.shap_values(X)

# Résumé global des features
shap.summary_plot(shap_values, X, plot_type="bar")

# explication d'une déclaration suspecte
suspect_idx = df[df["anomaly_score"] == -1].index[0]  # première anomalie détectée
suspect = X.iloc[[suspect_idx]]

shap.force_plot(explainer.expected_value, 
                shap_values[suspect_idx], 
                suspect)


In [1]:
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, davies_bouldin_score
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import regularizers
from sklearn.pipeline import Pipeline
import shap
import joblib

# Chargement des données
df = pd.read_csv("new_data.csv", delimiter=";", encoding="latin1")
df.columns

Index(['instanceid', 'nif_imp', 'dec_cod', 'item_count', 'hs_code_diversity',
       'declaration_date', 'is_physically_inspected', 'name_x',
       'FIRST_OP_DAT_x', 'LAST_OP_DAT_x', 'AVG_FOB_x', 'AVG_TAX_x',
       'AVG_TAX_x.1', 'DECL_FREQ_x', 'nif', 'name_y', 'FIRST_OP_DAT_y',
       'LAST_OP_DAT_y', 'AVG_FOB_y', 'AVG_TAX_y', 'AVG_FOB_3MTH',
       'AVG_TAX_3MTH', 'DECL_FREQ_y', 'key_itm_nbr', 'hs_cod', 'net_weight_kg',
       'gross_weight_kg', 'origin_country', 'assessed_value_usd',
       'value_per_kg'],
      dtype='object')

In [2]:
# Colonnes pertinentes
cols_valeur = [
    "assessed_value_usd",
    "value_per_kg",
    "net_weight_kg",
    "gross_weight_kg",
    "AVG_FOB_x",
    "AVG_FOB_y",
    "AVG_TAX_x",
    "AVG_TAX_y",
    "AVG_FOB_3MTH",
    "AVG_TAX_3MTH",
    "DECL_FREQ_x",
    "DECL_FREQ_y"
]
df_valeur = df[cols_valeur].copy()

In [None]:
# Feature engineering
df_valeur["val_per_net_weight"] = df["assessed_value_usd"] / (df["net_weight_kg"] + 1e-6) #prix unitaire par kg
df_valeur["val_vs_avg_fob_x"]   = df["assessed_value_usd"] / (df["AVG_FOB_x"] + 1e-6) #comparaison avec prix moyens de référence.
df_valeur["val_vs_avg_fob_y"]   = df["assessed_value_usd"] / (df["AVG_FOB_y"] + 1e-6)
df_valeur["gross_net_ratio"]    = (df["gross_weight_kg"] + 1e-6) / (df["net_weight_kg"] + 1e-6) #cohérence poids brut / net.

# Nettoyage
df_valeur.replace([np.inf, -np.inf], np.nan, inplace=True)
df_valeur.fillna(0, inplace=True)


# Standardisation
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_valeur)

In [None]:
"""# Isolation Forest
iso = IsolationForest(n_estimators=200, contamination=0.05, random_state=42)
df["iso_pred"] = iso.fit_predict(X_scaled)
df["iso_pred"] = df["iso_pred"].map({-1: 1, 1: 0})  # 1 = fraude potentielle
df["iso_score"] = iso.decision_function(X_scaled)"""

# Entraînement du modèle
from sklearn.ensemble import IsolationForest

iso = IsolationForest(n_estimators=200, contamination=0.05, random_state=42)
df["iso_pred"] = iso.fit_predict(X_scaled)
df["iso_pred"] = df["iso_pred"].map({-1: 1, 1: 0})
df["iso_score"] = iso.decision_function(X_scaled)

# Sauvegarde du modèle
joblib.dump(iso, "isolation_forest_model.pkl")

# Chargement du modèle
#iso_loaded = joblib.load("isolation_forest_model.pkl")

# Vérification : refaire une prédiction
#y_loaded_pred = iso_loaded.predict(X_scaled)


In [None]:
# One-Class SVM
ocsvm = OneClassSVM(kernel="rbf", gamma="auto", nu=0.05)
df["svm_pred"] = ocsvm.fit_predict(X_scaled)
df["svm_pred"] = df["svm_pred"].map({-1: 1, 1: 0})
df["svm_score"] = ocsvm.decision_function(X_scaled)

# --- Enregistrement du modèle ---
joblib.dump(ocsvm, "oneclass_svm_model.pkl")

In [None]:


# --- Enregistrement du modèle ---
#joblib.dump(ocsvm, "oneclass_svm_model.pkl")

# --- Chargement du modèle ---
#ocsvm_loaded = joblib.load("oneclass_svm_model.pkl")

In [None]:
# Autoencoder
input_dim = X_scaled.shape[1]
encoding_dim = 6

# Réseau autoencodeur
input_layer = Input(shape=(input_dim,))
encoder = Dense(encoding_dim, activation="relu", activity_regularizer=regularizers.l1(1e-5))(input_layer)
decoder = Dense(input_dim, activation="linear")(encoder)
autoencoder = Model(inputs=input_layer, outputs=decoder)

autoencoder.compile(optimizer="adam", loss="mse")
autoencoder.fit(X_scaled, X_scaled, epochs=50, batch_size=32, shuffle=True, verbose=0)

# Reconstruction
reconstructions = autoencoder.predict(X_scaled)
mse = np.mean(np.power(X_scaled - reconstructions, 2), axis=1)
threshold = np.percentile(mse, 95)  # seuil à 95%
df["ae_pred"] = (mse > threshold).astype(int)  # 1 = anomalie
df["ae_score"] = mse

In [None]:
# Comparaison des modèles
results = {}

for model in ["iso_pred", "svm_pred", "ae_pred"]:
    anomalies = df[model].sum()
    prop = anomalies / len(df)
    try:
        sil = silhouette_score(X_scaled, df[model])
        db = davies_bouldin_score(X_scaled, df[model])
    except:
        sil, db = None, None
    results[model] = {
        "Anomalies détectées": anomalies,
        "Proportion": round(prop, 3),
        "Silhouette": sil,
        "Davies-Bouldin": db
    }

results_df = pd.DataFrame(results).T
print("\n=== Résultats comparatifs ===")
print(results_df)

In [None]:
# Cas suspects (top 5)
suspects = df[(df["iso_pred"] == 1) | (df["svm_pred"] == 1) | (df["ae_pred"] == 1)].copy()

print("\nNombre de cas suspects détectés (au moins 1 modèle):", suspects.shape[0])
print(suspects[[
    "instanceid", "nif_imp", "hs_cod",
    "assessed_value_usd", "value_per_kg",
    "iso_pred", "svm_pred", "ae_pred"
]].head(5))