In [None]:
import sys
import pandas as pd
import requests

# === Chemin vers les modules personnalisés ===
sys.path.append("../scripts")  # ou chemin absolu

# === Import des fonctions Spark ===
from spark_pipeline import load_data, preprocess, run_all_models

# === URLs des microservices ===
INGESTION_URL = "http://localhost:8001"
TRAIN_URL = "http://localhost:8002"
PREDICT_URL = "http://localhost:8003"
COMPARE_URL = "http://localhost:8004"

# === Vérification de l'état des microservices ===
print("\n🔍 Vérification des microservices :")
for name, url in [
    ("ingestion-service", INGESTION_URL),
    ("train-service", TRAIN_URL),
    ("predict-service", PREDICT_URL),
    ("compare-service", COMPARE_URL)
]:
    try:
        res = requests.get(f"{url}/status")
        print(f"✅ {name} OK :", res.json())
    except Exception as e:
        print(f"❌ {name} erreur :", e)

# === Exemple : récupération des features du predict-service ===
try:
    features_res = requests.get(f"{PREDICT_URL}/features")
    print("\n📊 Features du predict-service :", features_res.json())
except Exception as e:
    print("❌ Erreur récupération features :", e)

# === Pipeline Spark ===

print("\n🚀 Lancement du pipeline Spark...\n")

# 1. Chargement des données
spark, df = load_data("../data/creditcard.csv")

# 2. Analyse rapide
df.printSchema()
df.groupBy("Class").count().show()
df.describe().show()

# 3. Préparation et split
df_prepared = preprocess(df)
train, test = df_prepared.randomSplit([0.8, 0.2], seed=42)

# 4. Entraînement et évaluation
results = run_all_models(train, test)

# 5. Affichage des résultats
print("\n📈 Résultats des modèles :")
for model, metrics in results.items():
    print(f"{model}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}" if isinstance(value, float) else f"  {metric}: {value}")

# 6. Sauvegarde de toutes les métriques dans le CSV
all_metrics = []
for model, metrics in results.items():
    row = {"model": model}
    row.update(metrics)
    all_metrics.append(row)
df_results = pd.DataFrame(all_metrics)
df_results.to_csv("../models/resultats_auc.csv", index=False)  # Toutes les colonnes sont sauvegardées

    
# 8. Fermeture de la session Spark
spark.stop()


🔍 Vérification des microservices :
✅ ingestion-service OK : {'status': 'ingestion-service running'}
✅ train-service OK : {'status': 'train-service running'}
❌ predict-service erreur : ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))
✅ compare-service OK : {'detail': 'Not Found'}
❌ Erreur récupération features : ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))

🚀 Lancement du pipeline Spark...



Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/02 13:23:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

                                                                                

+-----+------+
|Class| count|
+-----+------+
|    1|   492|
|    0|284315|
+-----+------+



25/06/02 13:26:20 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|                 V14|                 V15|  

25/06/02 13:29:05 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                


📈 Résultats des modèles :
LogisticRegression:
  auc: 0.9560
  accuracy: 0.9992
  recall: 0.9999
  precision: 0.9993
  training_time: 235.5118
RandomForest:
  auc: 0.9698
  accuracy: 0.9993
  recall: 0.9998
  precision: 0.9995
  training_time: 233.6377

📬 Réponse du compare-service : {'status': 'ok', 'message': 'Résultats reçus'}


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import shutil
import os

# Démarre Spark (si pas déjà fait)
spark = SparkSession.builder.appName("TrainLogisticModel").getOrCreate()

# Charge les données
df = spark.read.csv("../data/creditcard.csv", header=True, inferSchema=True)

# Préparation des features
features = [
    "Time", "V1", "V2", "V3", "V4", "V5", "V6", "V7", "V8", "V9", "V10",
    "V11", "V12", "V13", "V14", "V15", "V16", "V17", "V18", "V19", "V20",
    "V21", "V22", "V23", "V24", "V25", "V26", "V27", "V28", "Amount"
]
assembler = VectorAssembler(inputCols=features, outputCol="features")
df_prepared = assembler.transform(df).select("features", "Class")
train, test = df_prepared.randomSplit([0.8, 0.2], seed=42)

# Entraînement du modèle
lr = LogisticRegression(featuresCol="features", labelCol="Class")
model = lr.fit(train)

# Sauvegarde des statistiques du dataset
desc_pd = df.describe().toPandas()
desc_pd.to_csv("../models/summary_stats.csv", index=False)

# Affiche les stats pour vérification
summary = pd.read_csv("../models/summary_stats.csv")
print(summary)

# Sauvegarde du modèle (en supprimant l'ancien dossier si besoin)
model_path = "../models/spark_logistic_model"
if os.path.exists(model_path):
    shutil.rmtree(model_path)
model.save(model_path)

# (optionnel) spark.stop()

                                                                                

  summary           Time            V1            V2            V3  \
0   count  284807.000000  2.848070e+05  2.848070e+05  2.848070e+05   
1    mean   94813.859575  2.235360e-15  6.865750e-17 -5.824711e-15   
2  stddev   47488.145955  1.958696e+00  1.651309e+00  1.516255e+00   
3     min       0.000000 -5.640751e+01 -7.271573e+01 -4.832559e+01   
4     max  172792.000000  2.454930e+00  2.205773e+01  9.382558e+00   

             V4            V5            V6            V7            V8  ...  \
0  2.848070e+05  2.848070e+05  2.848070e+05  2.848070e+05  2.848070e+05  ...   
1  2.011824e-15  3.704312e-15  1.140034e-15 -1.149614e-16 -2.953869e-16  ...   
2  1.415869e+00  1.380247e+00  1.332271e+00  1.237094e+00  1.194353e+00  ...   
3 -5.683171e+00 -1.137433e+02 -2.616051e+01 -4.355724e+01 -7.321672e+01  ...   
4  1.687534e+01  3.480167e+01  7.330163e+01  1.205895e+02  2.000721e+01  ...   

            V21           V22           V23           V24           V25  \
0  2.848070e+05  2.8480

                                                                                

In [None]:
import shutil
import os

model_path = "../models/spark_logistic_model"
if os.path.exists(model_path):
    shutil.rmtree(model_path)

model.save(model_path)