## Construction d'un pipeline MLlib

Les colonnes selectionnées pour ce pipeline sont:
- Type
- ShippingMode
- LateDeliveryRisk
- CategoryName
- CustomerSegment
- OrderItemTotal
- OrderRegion
- ShippingMonthName

In [1]:
from pyspark.sql import functions as F
from modules.spark import spark

In [2]:
df = spark.read.csv("../data/processed/data-initial-cleaning.csv", header=True, inferSchema=True)


In [3]:
df.columns

['Type',
 'Days for shipment (scheduled)',
 'Benefit per order',
 'Sales per customer',
 'Late_delivery_risk',
 'Category Name',
 'Customer Segment',
 'Department Name',
 'Latitude',
 'Longitude',
 'Market',
 'Order Item Discount',
 'Order Item Discount Rate',
 'Order Item Product Price',
 'Order Item Profit Ratio',
 'Order Item Quantity',
 'Sales',
 'Order Item Total',
 'Order Profit Per Order',
 'Order Region',
 'Product Price',
 'Shipping Mode',
 'Order_Month_Name',
 'Shipping_Month_Name']

- Equilibrage

In [4]:
df.groupBy('Late_delivery_risk').count().show()

+------------------+-----+
|Late_delivery_risk|count|
+------------------+-----+
|                 1|98977|
|                 0|73788|
+------------------+-----+



In [5]:
class_0 = df.filter(F.col("Late_delivery_risk") == 0)
class_1 = df.filter(F.col("Late_delivery_risk") == 1)

ratio = class_0.count() / class_1.count()

new_class_1 = class_1.sample(withReplacement=False, fraction=ratio)

df_balanced = class_0.union(new_class_1)

new_class_0 = df_balanced.filter(F.col("Late_delivery_risk") == 0).count()
new_class_1 = df_balanced.filter(F.col("Late_delivery_risk") == 1).count()

print(new_class_0)
print(new_class_1)

73788
73579


In [None]:
cols = [
    # Target
    'Late_delivery_risk',

    # 'Shipping Mode',
    # 'Type',
    # 'Category Name',
    # 'Customer Segment',
    # 'Order Item Quantity', 
    # 'Shipping_Month_Name',
    # 'Order_Month_Name',
    # 'Order Item Total',

    # 'Order_Month_Name',
    # 'Shipping_Month_Name',
    # 'Order Item Quantity',
    # 'Type',
    # 'Product Price',
    # 'Customer Segment',
    # 'Shipping Mode',
    # 'Order Item Total'

    'Shipping_Month_Name', 'Category Name', 'Customer Segment', 'Shipping Mode', 'Type', 'Order_Month_Name'
    # 'Order_Month_Name', 'Shipping_Month_Name', 'Order Item Quantity', 'Type', 'Product Price', 'Customer Segment', 'Shipping Mode', 'Benefit per order', 'Order Item Total'
    # 'Order_Month_Name',
    # 'Shipping_Month_Name',
    # 'Order Item Quantity',
    # 'Type',
    # 'Product Price',
    # 'Customer Segment',
    # 'Shipping Mode',
    # 'Order Item Total'
]

df_col_select = df_balanced.select(cols)

In [7]:
df_col_select.columns

['Late_delivery_risk', 'Shipping Mode']

- Separer les noms de colonnes

In [8]:
string_cols = []
int_cols = []

for c, t in df_col_select.dtypes[1:]:
    if (t == 'string'):
        string_cols.append(c)
    else:
        int_cols.append(c)

### Assembler le pipeline MLlib.

In [9]:
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

indexers = [
    StringIndexer(inputCol=c, outputCol=c+"_index") for c in string_cols
]
encoder = OneHotEncoder(
    inputCols=[c+'_index' for c in string_cols],
    outputCols=[c+'_vec' for c in string_cols]
)

assembler = VectorAssembler(
    inputCols=[c for c in int_cols] + [c+'_vec' for c in string_cols],
    outputCol='features'
)

scaler = StandardScaler(
    inputCol="features",
    outputCol='scaled_features',
    withMean=True,
    withStd=True,
)



- Division de données

In [10]:
train_df, test_df = df_col_select.randomSplit([0.8, 0.2], seed=42)

### Entrainement des modéles

- Random Forest

In [11]:

rf = RandomForestClassifier(labelCol="Late_delivery_risk", featuresCol="scaled_features")

rf_pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, rf])

rf_model = rf_pipeline.fit(train_df)
rf_predictions = rf_model.transform(test_df)

- Logistic Regression

In [12]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="Late_delivery_risk", featuresCol="scaled_features")

lr_pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, lr])

lr_model = lr_pipeline.fit(train_df)
lr_predictions = lr_model.transform(test_df)


- GBT

In [13]:
from pyspark.ml.classification import GBTClassifier

gb = GBTClassifier(labelCol="Late_delivery_risk", featuresCol="scaled_features")

gb_pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, gb])

gb_model = gb_pipeline.fit(train_df)
gb_predictions = gb_model.transform(test_df)

### Evaluation de performance

- Random Forest

In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# évaluer les prédictions
evaluator = BinaryClassificationEvaluator(
    labelCol="Late_delivery_risk",     # vraie étiquette
    rawPredictionCol="rawPrediction",  # par défaut
    metricName="areaUnderROC"  # ou "areaUnderPR"
)

rf_auc = evaluator.evaluate(rf_predictions)


In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# accuracy
acc_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="accuracy"
)
rf_accuracy = acc_eval.evaluate(rf_predictions)

# F1-score
f1_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="f1"
)
rf_f1 = f1_eval.evaluate(rf_predictions)

precision_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedPrecision"
)
rf_precision = precision_eval.evaluate(rf_predictions)

recall_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedRecall"
)
rf_recall = recall_eval.evaluate(rf_predictions)



- Logistic Regression

In [16]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# évaluer les prédictions
evaluator = BinaryClassificationEvaluator(
    labelCol="Late_delivery_risk",     # vraie étiquette
    rawPredictionCol="rawPrediction",  # par défaut
    metricName="areaUnderROC"  # ou "areaUnderPR"
)

lr_auc = evaluator.evaluate(lr_predictions)


In [17]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# accuracy
acc_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="accuracy"
)
lr_accuracy = acc_eval.evaluate(lr_predictions)

# F1-score
f1_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="f1"
)
lr_f1 = f1_eval.evaluate(lr_predictions)

# Precision
precision_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedPrecision"
)
lr_precision = precision_eval.evaluate(lr_predictions)

# F1-score
recall_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedRecall"
)
lr_recall = recall_eval.evaluate(lr_predictions)

- GBT

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# évaluer les prédictions
evaluator = BinaryClassificationEvaluator(
    labelCol="Late_delivery_risk",     # vraie étiquette
    rawPredictionCol="rawPrediction",  # par défaut
    metricName="areaUnderROC"  # ou "areaUnderPR"
)

gb_auc = evaluator.evaluate(gb_predictions)


In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# accuracy
acc_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="accuracy"
)
gb_accuracy = acc_eval.evaluate(gb_predictions)

# F1-score
f1_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="f1"
)
gb_f1 = f1_eval.evaluate(gb_predictions)

# F1-score
precision_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedPrecision"
)
gb_precision = precision_eval.evaluate(gb_predictions)

# F1-score
recall_eval = MulticlassClassificationEvaluator(
    labelCol="Late_delivery_risk", predictionCol="prediction", metricName="weightedRecall"
)
gb_recall = recall_eval.evaluate(gb_predictions)


- Sauvegardez le modèle (pipeline)

In [20]:
# gb_model.write().overwrite().save('../models/model_2')

In [21]:
print("------------- LR")
print(f"AUC = {lr_auc:.3f}")
print(f"Accuracy = {lr_accuracy:.3f}")
print(f"F1-score = {lr_f1:.3f}")
print(f"Precision = {lr_precision:.3f}")
print(f"Recall = {lr_recall:.3f}")

print("\n------------- RF")
print(f"AUC = {rf_auc:.3f}")
print(f"Accuracy = {rf_accuracy:.3f}")
print(f"F1-score = {rf_f1:.3f}")
print(f"Precision = {rf_precision:.3f}")
print(f"Recall = {rf_recall:.3f}")

print("\n------------- GBT")
print(f"AUC = {gb_auc:.3f}")
print(f"Accuracy = {gb_accuracy:.3f}")
print(f"F1-score = {gb_f1:.3f}")
print(f"Precision = {gb_precision:.3f}")
print(f"Recall = {gb_recall:.3f}")

------------- LR
AUC = 0.744
Accuracy = 0.725
F1-score = 0.716
Precision = 0.761
Recall = 0.725

------------- RF
AUC = 0.744
Accuracy = 0.725
F1-score = 0.716
Precision = 0.761
Recall = 0.725

------------- GBT
AUC = 0.744
Accuracy = 0.725
F1-score = 0.716
Precision = 0.761
Recall = 0.725


#### Cross Validation

In [11]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#### 

In [None]:
# from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, StandardScaler
# from pyspark.ml import Pipeline
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.classification import LogisticRegression
# from pyspark.ml.classification import GBTClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# important_cols = [
#     # Target
#     'Late_delivery_risk',

#     # Important Columns
# ]

# extra_cols = [
#     'Shipping Mode',
#     'Type',
#     'Category Name',
#     'Customer Segment',
#     'Order Item Total',
#     'Benefit per order',
#     'Order_Month_Name',
#     'Shipping_Month_Name',
#     'Order Region',
#     'Product Price',
#     'Order Item Quantity',
# ]


# result = {
#     "acc": 0,
#     "col": []
# }

# history = []

# for a in range(len(extra_cols)):
#     for b in range(a+1, len(extra_cols)):
#         for c in range(b+1, len(extra_cols)):
#             for d in range(c+1, len(extra_cols)):
#                 for e in range(d+1, len(extra_cols)):
#                     for f in range(e, len(extra_cols)):
#                         for g in range(f, len(extra_cols)):
#                             for h in range(g, len(extra_cols)):
#                                 for i in range(h, len(extra_cols)):

#                                     indexes = set([a, b, c, d, e, f, g, h, i])
#                                     if (indexes in history):
#                                         print(a, b, c, d, e, f, g, h, i, ' -- ', 'Skipped')
#                                         continue

#                                     history.append(indexes)
                                    
#                                     selected_extra_cols = list(set([extra_cols[a]] + [extra_cols[b]] + [extra_cols[c]] + [extra_cols[d]] + [extra_cols[e]] + [extra_cols[f]] + [extra_cols[g]] + [extra_cols[h]] + [extra_cols[i]]))
#                                     select_cols = important_cols + selected_extra_cols
                                    
#                                     df_col_select = df.select(select_cols)

#                                     string_cols = []
#                                     int_cols = []

#                                     for col, dtype in df_col_select.dtypes[1:]:
#                                         if (dtype == 'string'):
#                                             string_cols.append(col)
#                                         else:
#                                             int_cols.append(col)

#                                     indexers = [
#                                         StringIndexer(inputCol=col, outputCol=col+"_index") for col in string_cols
#                                     ]
#                                     encoder = OneHotEncoder(
#                                         inputCols=[col+'_index' for col in string_cols],
#                                         outputCols=[col+'_vec' for col in string_cols]
#                                     )

#                                     assembler = VectorAssembler(
#                                         inputCols=[col for col in int_cols] + [col+'_vec' for col in string_cols],
#                                         outputCol='features'
#                                     )

#                                     train_df, test_df = df_col_select.randomSplit([0.8, 0.2], seed=42)

#                                     # GBT

#                                     gb = GBTClassifier(labelCol="Late_delivery_risk", featuresCol="features")

#                                     gb_pipeline = Pipeline(stages=indexers + [encoder, assembler, gb])

#                                     gb_model = gb_pipeline.fit(train_df)
#                                     gb_predictions = gb_model.transform(test_df)

#                                     acc_eval = MulticlassClassificationEvaluator(
#                                         labelCol="Late_delivery_risk", predictionCol="prediction", metricName="accuracy"
#                                     )
#                                     gb_accuracy = acc_eval.evaluate(gb_predictions)

#                                     print(a, b, c, d, e, f, g, h, i, ' - ', gb_accuracy, ' --- ', result['acc'], ' | ', result['col'])

#                                     if result["acc"] < gb_accuracy:
#                                         result["acc"] = gb_accuracy
#                                         result["col"] = selected_extra_cols
    
# print("The best acc", result["acc"])
# print("The best col", result["col"])




- 0.7000640018618723
- ['Order Item Quantity', 'Shipping_Month_Name', 'Order_Month_Name', 'Order Item Total']

----
- 0.70250770931518
- ['Shipping_Month_Name', 'Category Name', 'Customer Segment', 'Shipping Mode', 'Type', 'Order_Month_Name']
----
- 0.7040786641065921
- ['Order_Month_Name', 'Shipping_Month_Name', 'Order Item Quantity', 'Type', 'Product Price', 'Customer Segment', 'Shipping Mode', 'Benefit per order', 'Order Item Total']
----
- 0.7043404899051608
- ['Order_Month_Name', 'Shipping_Month_Name', 'Order Item Quantity', 'Type', 'Product Price', 'Customer Segment', 'Shipping Mode', 'Order Item Total']
----