In [None]:
## test de recuperation de donnee 

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# --- Spark ---
spark = SparkSession.builder.appName("count_test").getOrCreate()

# --- Colonnes ---
feature_numeric_cols = [
    "Days for shipment (scheduled)", "Benefit per order", "Sales per customer",
    "Order Item Discount", "Order Item Discount Rate", "Order Item Product Price",
    "Order Item Profit Ratio", "Order Item Quantity", "Sales", "Order Profit Per Order"
]

feature_categorical_cols = [
    "Type", "Shipping Mode", "Market", "Customer Segment",
    "Order Region", "Category Name"
]

# --- Charger données ---
df = (
    spark.read.option("header", True).option("inferSchema", True)
    .csv("../data/DataCoSupplyChainDataset.csv")
    .filter(col("Delivery Status") != "Shipping canceled")
    .select(*(feature_numeric_cols + feature_categorical_cols + ["Late_delivery_risk"]))
).dropna()


for c in feature_categorical_cols:
    print(c, df.select(c).distinct().count())


In [20]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col, expr

# ---------- Winsorizer (cap outliers) ----------
class Winsorizer(Transformer):
    def __init__(self, inputCols=None, lower=0.01, upper=0.99):
        super(Winsorizer, self).__init__()
        self.inputCols = inputCols
        self.lower = lower
        self.upper = upper
    
    def _transform(self, df):
        for c in self.inputCols:
            q = df.approxQuantile(c, [self.lower, self.upper], 0.01)
            low, up = q

            df = df.withColumn(
                c,
                expr(
                    f"CASE "
                    f"WHEN `{c}` < {low} THEN {low} "
                    f"WHEN `{c}` > {up} THEN {up} "
                    f"ELSE `{c}` END"
                )
            )
        return df


# ---------- Spark ----------
spark = SparkSession.builder.appName("ML_Models_Test").getOrCreate()

# ---------- Colonnes ----------
feature_numeric_cols = [
    "Days for shipment (scheduled)", "Benefit per order", "Sales per customer",
    "Order Item Discount", "Order Item Discount Rate", "Order Item Product Price",
    "Order Item Profit Ratio", "Order Item Quantity", "Sales", "Order Profit Per Order"
]

feature_categorical_cols = [
    "Type", "Shipping Mode", "Market", "Customer Segment",
    "Order Region", "Category Name"
]

# ---------- Data ----------
df = (
    spark.read.option("header", True).option("inferSchema", True)
    .csv("../data/DataCoSupplyChainDataset.csv")
    .filter(col("Delivery Status") != "Shipping canceled")
    .select(*(feature_numeric_cols + feature_categorical_cols + ["Late_delivery_risk"]))
).dropna()

# ---------- Prétraitement ----------
indexers = [
    StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep")
    for c in feature_categorical_cols
]

winsor = Winsorizer(inputCols=feature_numeric_cols, lower=0.01, upper=0.99)

assembler_num = VectorAssembler(inputCols=feature_numeric_cols, outputCol="num_features")

scaler = StandardScaler(inputCol="num_features", outputCol="num_features_scaled",
                        withMean=True, withStd=True)

assembler_final = VectorAssembler(
    inputCols=["num_features_scaled"] + [c+"_idx" for c in feature_categorical_cols],
    outputCol="features"
)

# ---------- Modèles ----------
models = {
    "GBTClassifier": GBTClassifier(featuresCol="features", labelCol="Late_delivery_risk",
                                   maxIter=30, maxDepth=5, maxBins=2000),

    "RandomForest": RandomForestClassifier(featuresCol="features", labelCol="Late_delivery_risk",
                                           numTrees=50, maxDepth=5, maxBins=2000),

    "LogisticRegression": LogisticRegression(featuresCol="features", labelCol="Late_delivery_risk",
                                             maxIter=50)
}

# ---------- Train/Test ----------
train, test = df.randomSplit([0.8, 0.2], seed=42)

# ---------- Fonction évaluation ----------
def evaluate_model(model_name, pipeline_model):
    pred = pipeline_model.transform(test)

    evaluator_acc = MulticlassClassificationEvaluator(
        labelCol="Late_delivery_risk", predictionCol="prediction", metricName="accuracy"
    )
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="Late_delivery_risk", predictionCol="prediction", metricName="f1"
    )
    evaluator_prec = MulticlassClassificationEvaluator(
        labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedPrecision"
    )
    evaluator_recall = MulticlassClassificationEvaluator(
        labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedRecall"
    )
    evaluator_auc = BinaryClassificationEvaluator(
        labelCol="Late_delivery_risk", rawPredictionCol="rawPrediction"
    )

    print(f"\n===== {model_name} =====")
    print("Accuracy =", evaluator_acc.evaluate(pred))
    print("F1-score =", evaluator_f1.evaluate(pred))
    print("Precision =", evaluator_prec.evaluate(pred))
    print("Recall =", evaluator_recall.evaluate(pred))
    print("AUC =", evaluator_auc.evaluate(pred))

# ---------- Entraînement + évaluation ----------
# for name, classifier in models.items():
#     pipeline = Pipeline(stages=indexers + [winsor, assembler_num, scaler, assembler_final, classifier])
#     model = pipeline.fit(train)
#     evaluate_model(name, model)

for name, classifier in models.items():
    pipeline = Pipeline(stages=indexers + [winsor, assembler_num, scaler, assembler_final, classifier])
    model = pipeline.fit(train)
    evaluate_model(name, model)

    if name == "LogisticRegression":
        best_model = model  



===== GBTClassifier =====
Accuracy = 0.697117903930131
F1-score = 0.6936591440259341
Precision = 0.7463293681199681
Recall = 0.697117903930131
AUC = 0.7437718164261659

===== RandomForest =====
Accuracy = 0.6940902474526929
F1-score = 0.6923440477551527
Precision = 0.7323106600794665
Recall = 0.6940902474526929
AUC = 0.7284670397636691

===== LogisticRegression =====
Accuracy = 0.6939737991266376
F1-score = 0.6922342061251481
Precision = 0.7321398439841479
Recall = 0.6939737991266376
AUC = 0.7177239679216725


In [21]:


# ---------- Sauvegarde ----------
model_path = "../models/logistic_regression_pipeline"
print(f"Sauvegarde du modèle dans {model_path}...")

best_model.write().overwrite().save(model_path)

print("✓ Pipeline sauvegardé avec succès!")

# ---------- Sauvegarde des métadonnées ----------
import json

metadata = {
    "model_type": "LogisticRegression",
    "feature_numeric_cols": feature_numeric_cols,
    "feature_categorical_cols": feature_categorical_cols,
    "accuracy": 0.6939737991266376,
    "f1_score": 0.6922342061251481,
    "precision": 0.7321398439841479,
    "recall": 0.6939737991266376,
    "auc": 0.7177266968937516
}

with open("../models/model_metadata.json", "w") as f:
    json.dump(metadata, f, indent=4)

print("✓ Métadonnées sauvegardées!")

spark.stop()


Sauvegarde du modèle dans ../models/logistic_regression_pipeline...


ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'Winsorizer_2b4181ca1c24', <class '__main__.Winsorizer'>)