In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("ETL_Demand_Forecasting1") \
    .getOrCreate()

In [None]:
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/Cleaned_data.csv")

In [None]:
#Delivery Performance
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")

feature_columns = ["Days for shipping (real)", "Days for shipment (scheduled)", "Order Item Discount", "Order Item Discount Rate",
                   "Order Item Product Price", "Order Item Quantity", "Order Profit Per Order", "Order Frequency", "Month", "Year"]

# Convertir les colonnes catégorielles en indices numériques
indexer = StringIndexer(inputCol="Order Region", outputCol="Order Region Index")
csvdf = indexer.fit(csvdf).transform(csvdf)

# Ajouter la colonne indexée à la liste des caractéristiques
feature_columns.append("Order Region Index")

# Assembler les caractéristiques en un seul vecteur
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
csvdf = assembler.transform(csvdf)
# Diviser les données en ensembles d'entraînement et de test
train_data, test_data = csvdf.randomSplit([0.7, 0.3], seed=42)


# Construire le modèle de régression logistique
lr = LogisticRegression(labelCol="Late_delivery_risk", featuresCol="features")

# Entraîner le modèle
lr_model = lr.fit(train_data)
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Faire des prédictions sur les données de test
test_results = lr_model.transform(test_data)

# Évaluer la performance du modèle
evaluator = BinaryClassificationEvaluator(labelCol="Late_delivery_risk")
accuracy = evaluator.evaluate(test_results)

print(f"Accuracy: {accuracy}")

Accuracy: 0.9728807895356012


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

# Initialiser le modèle d'arbre de décision
dt = DecisionTreeClassifier(labelCol="Late_delivery_risk", featuresCol="features")

# Entraîner le modèle sur les données d'entraînement
dt_model = dt.fit(train_data)

# Faire des prédictions sur les données de test
test_results_dt = dt_model.transform(test_data)

# Évaluer la performance du modèle
accuracy_dt = evaluator.evaluate(test_results_dt)

print(f"Accuracy (Decision Tree): {accuracy_dt}")

Accuracy (Decision Tree): 0.972842541402912


In [None]:
from pyspark.sql.functions import when, col
test_results1 = test_results.withColumn("prediction", when(col("prediction") == 1.0, "Late").otherwise("Not Late"))
test_results1.select("Late_delivery_risk", "prediction").show()

+------------------+----------+
|Late_delivery_risk|prediction|
+------------------+----------+
|                 0|  Not Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 1|      Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 0|  Not Late|
|                 1|      Late|
|                 0|  Not Late|
+------------------+----------+
only showing top 20 rows



In [None]:
# Saving the predictions
predictions_pandas = test_results.select("Order Id", "prediction").withColumnRenamed("prediction", "Predicte Late Delivery Risk")
csv_file_path = "/content/drive/MyDrive/predictions/orders_predictions.csv"
predictions_pandas.toPandas().to_csv(csv_file_path, index=False)

In [None]:
#Order Profitability
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col,expr
from pyspark.ml.evaluation import RegressionEvaluator

csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")

# Ingénierie des caractéristiques
csvdf = csvdf.withColumn("Total Profit", expr("(`Order Item Product Price` * `Order Item Quantity`) - `Order Item Discount`"))


feature_columns = ["Days for shipping (real)", "Days for shipment (scheduled)", "Order Item Discount", "Order Item Discount Rate",
                   "Order Item Product Price", "Order Item Quantity", "Order Frequency", "Total Profit"]


# Assembler les caractéristiques en un seul vecteur
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
csvdf = assembler.transform(csvdf)

# Convertir la variable cible 'Order Profit Per Order' en type double
csvdf = csvdf.withColumn("Order Profit Per Order", col("Order Profit Per Order").cast("double"))
# Diviser les données en ensembles d'entraînement et de test
train_data, test_data = csvdf.randomSplit([0.7, 0.3], seed=42)


# Create the Random Forest regression model
rf = RandomForestRegressor(labelCol="Order Profit Per Order", featuresCol="features")

# Train the model
rf_model = rf.fit(train_data)

# Make predictions on the test data
rf_predictions = rf_model.transform(test_data)

# Evaluate the performance of the Random Forest model
evaluator = RegressionEvaluator(labelCol="Order Profit Per Order", predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(rf_predictions)

print(f"Random Forest RMSE: {rf_rmse}")


Random Forest RMSE: 104.62760863397759


In [None]:
import pandas as pd
# Step 1: Read the existing CSV file into a Pandas DataFrame
csv_path = "/content/drive/MyDrive/predictions/orders_predictions.csv"
existing_df = pd.read_csv(csv_path)

# Step 2: Convert the Pandas DataFrame to a Spark DataFrame
existing_spark_df = spark.createDataFrame(existing_df)

# Step 3: Select and rename the columns in the rf_predictions DataFrame
predictions_df = rf_predictions.select("Order Id", "prediction").withColumnRenamed("prediction", "Order_Profit_Per_Order_Prediction")

# Convert the predictions DataFrame to Pandas DataFrame
predictions_pandas = predictions_df.toPandas()

# Merge the existing Pandas DataFrame with the new predictions on "Order Id"
combined_df = existing_df.merge(predictions_pandas, on="Order Id", how="left")

# Step 4: Save the combined DataFrame back to the CSV file
combined_df.to_csv(csv_path, index=False)

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")
# Ingénierie des caractéristiques
csvdf = csvdf.withColumn("Total Profit", expr("(`Order Item Product Price` * `Order Item Quantity`) - `Order Item Discount`"))

feature_columns = ["Days for shipping (real)", "Days for shipment (scheduled)", "Order Item Discount", "Order Item Discount Rate", "Order Item Product Price", "Order Item Quantity", "Order Frequency", "Total Profit"]

# Assembler les caractéristiques en un seul vecteur
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
csvdf = assembler.transform(csvdf)

# Convertir la variable cible 'Order Profit Per Order' en type double
csvdf = csvdf.withColumn("Order Profit Per Order", col("Order Profit Per Order").cast("double"))

# Diviser les données en ensembles d'entraînement et de test
train_data, test_data = csvdf.randomSplit([0.7, 0.3], seed=42)

# Créer le modèle XGBoost
xgb = GBTRegressor(labelCol="Order Profit Per Order", featuresCol="features", maxDepth=5, maxBins=32, maxIter=20, seed=42)

# Entraîner le modèle
xgb_model = xgb.fit(train_data)

# Faire des prédictions sur les données de test
xgb_predictions = xgb_model.transform(test_data)

# Évaluer la performance du modèle
evaluator = RegressionEvaluator(labelCol="Order Profit Per Order", predictionCol="prediction", metricName="rmse")
xgb_rmse = evaluator.evaluate(xgb_predictions)

print(f"XGBoost RMSE: {xgb_rmse}")

XGBoost RMSE: 105.2589233758695


In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

# Load data
csvdf = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/content/drive/MyDrive/cleaned_data.csv")

# Feature engineering
csvdf = csvdf.withColumn("Total Profit", expr("(`Order Item Product Price` * `Order Item Quantity`) - `Order Item Discount`"))

# Define feature columns
feature_columns = ["Days for shipping (real)", "Days for shipment (scheduled)",
                   "Order Item Discount", "Order Item Discount Rate",
                   "Order Item Product Price", "Order Item Quantity",
                   "Order Frequency", "Total Profit"]

# Assemble features into a feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
csvdf = assembler.transform(csvdf)

# Convert the target variable to double
csvdf = csvdf.withColumn("Order Profit Per Order", col("Order Profit Per Order").cast("double"))

# Split data into training and test sets
train_data, test_data = csvdf.randomSplit([0.7, 0.3], seed=42)

# Create Decision Tree Regressor model
dt = DecisionTreeRegressor(labelCol="Order Profit Per Order", featuresCol="features", maxDepth=10, maxBins=50, seed=42)

# Train the model on the training data
dt_model = dt.fit(train_data)

# Make predictions on the test data
dt_predictions = dt_model.transform(test_data)

# Evaluate model performance
evaluator = RegressionEvaluator(labelCol="Order Profit Per Order", predictionCol="prediction", metricName="rmse")
dt_rmse = evaluator.evaluate(dt_predictions)

print(f"Decision Tree RMSE: {dt_rmse}")


Decision Tree RMSE: 106.14559686186314


In [None]:
combined_df.head(5)

Unnamed: 0,Order Id,Predicte Late Delivery Risk,Order_Profit_Per_Order_Prediction
0,57963,0.0,19.584126
1,61453,0.0,21.521338
2,61453,0.0,30.285518
3,61453,0.0,30.285518
4,61453,0.0,12.948679


In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession



# Charger les données depuis le fichier CSV
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")


selected_features = ["Days for shipping (real)", "Order Region", "Days for shipment (scheduled)"]

# Drop rows with missing values in selected features
csvdf = csvdf.dropna(subset=selected_features)

# Convert categorical columns to numerical values
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep") for column in selected_features]

# Convert the target column "Shipping Mode" to numeric values
label_indexer = StringIndexer(inputCol="Shipping Mode", outputCol="Shipping Mode_index").fit(csvdf)

# Vectorize features
assembler = VectorAssembler(inputCols=[column+"_index" for column in selected_features], outputCol="features")

# Define the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="Shipping Mode_index", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Pipeline stages
pipeline_stages = indexers + [label_indexer] + [assembler, lr]

# Create a Pipeline
pipeline = Pipeline(stages=pipeline_stages)

# Fit the pipeline to the data
pipeline_model = pipeline.fit(csvdf)

# Make predictions
predictions = pipeline_model.transform(csvdf)

# Show the predicted shipping mode for each order
predictions.select("Shipping Mode", "prediction").show()

from pyspark.sql.functions import col, expr

# Define shipping mode labels
shipping_mode_labels = {
    0.0: "Standard Class",
    1.0: "First Class",
    2.0: "Second Class",
    3.0: "Same Day"
}
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Créer un évaluateur pour évaluer les performances du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="Shipping Mode_index", predictionCol="prediction", metricName="accuracy")

# Calculer l'accuracy du modèle
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Autres métriques comme la précision, le rappel et le score F1
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)


+--------------+----------+
| Shipping Mode|prediction|
+--------------+----------+
|Standard Class|       0.0|
|Standard Class|       0.0|
|Standard Class|       0.0|
|Standard Class|       0.0|
|Standard Class|       0.0|
|Standard Class|       0.0|
|   First Class|       0.0|
|   First Class|       0.0|
|  Second Class|       0.0|
|   First Class|       0.0|
|  Second Class|       0.0|
|  Second Class|       0.0|
|  Second Class|       0.0|
|   First Class|       0.0|
|   First Class|       0.0|
|   First Class|       0.0|
|  Second Class|       0.0|
|   First Class|       0.0|
|   First Class|       0.0|
|      Same Day|       1.0|
+--------------+----------+
only showing top 20 rows

Accuracy: 0.5969011572189077
Precision: 0.3766046392046688
Recall: 0.5969011572189077
F1 Score: 0.46182723468339054


In [None]:
from pyspark.ml.classification import NaiveBayes


# Définir le modèle de classification Naive Bayes
nb = NaiveBayes(featuresCol="features", labelCol="Shipping Mode_index", smoothing=1.0)

# Stades du pipeline
pipeline_stages = indexers + [label_indexer] + [assembler, nb]

# Créer un pipeline
pipeline = Pipeline(stages=pipeline_stages)

# Adapter le pipeline aux données
pipeline_model = pipeline.fit(csvdf)

# Faire des prédictions
predictions = pipeline_model.transform(csvdf)


from pyspark.sql.functions import when

# Traduire les labels numériques en étiquettes textuelles
translate_label = when(predictions["prediction"] == 0.0, "Standard Class") \
    .when(predictions["prediction"] == 1.0, "First Class") \
    .when(predictions["prediction"] == 2.0, "Second Class") \
    .when(predictions["prediction"] == 3.0, "Same Day") \
    .otherwise("Unknown")

# Ajouter une colonne avec les étiquettes traduites
predictions_with_labels = predictions.withColumn("Predicted Shipping Mode", translate_label)

# Afficher le mode de livraison prédit pour chaque commande avec les étiquettes traduites
predictions_with_labels.select("Shipping Mode", "Predicted Shipping Mode").show()


# Créer un évaluateur pour évaluer les performances du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="Shipping Mode_index", predictionCol="prediction", metricName="accuracy")

# Calculer l'accuracy du modèle
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

# Autres métriques comme la précision, le rappel et le score F1
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1_score)

+--------------+-----------------------+
| Shipping Mode|Predicted Shipping Mode|
+--------------+-----------------------+
|Standard Class|         Standard Class|
|Standard Class|         Standard Class|
|Standard Class|         Standard Class|
|Standard Class|         Standard Class|
|Standard Class|         Standard Class|
|Standard Class|         Standard Class|
|   First Class|           Second Class|
|   First Class|           Second Class|
|  Second Class|            First Class|
|   First Class|           Second Class|
|  Second Class|            First Class|
|  Second Class|            First Class|
|  Second Class|            First Class|
|   First Class|           Second Class|
|   First Class|           Second Class|
|   First Class|           Second Class|
|  Second Class|            First Class|
|   First Class|           Second Class|
|   First Class|           Second Class|
|      Same Day|            First Class|
+--------------+-----------------------+
only showing top

In [None]:
import pandas as pd
# Step 1: Read the existing CSV file into a Pandas DataFrame
csv_path = "/content/drive/MyDrive/predictions/orders_predictions.csv"
existing_df = pd.read_csv(csv_path)

# Step 2: Convert the Pandas DataFrame to a Spark DataFrame
existing_spark_df = spark.createDataFrame(existing_df)

# Step 3: Select and rename the columns in the rf_predictions DataFrame
predictions_df = predictions_with_labels.select("Order Id", "Predicted Shipping Mode")

# Convert the predictions DataFrame to Pandas DataFrame
predictions_pandas = predictions_df.toPandas()

# Merge the existing Pandas DataFrame with the new predictions on "Order Id"
combined_df = existing_df.merge(predictions_pandas, on="Order Id", how="left")

# Step 4: Save the combined DataFrame back to the CSV file
combined_df.to_csv(csv_path, index=False)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

assembler = VectorAssembler(inputCols=["Total Purchases", "Order Frequency", "Customer Duration"], outputCol="features")

csvdf = assembler.transform(csvdf)
train_data, test_data = csvdf.randomSplit([0.8, 0.2])

lr = LinearRegression(featuresCol='features', labelCol='CLV')
model = lr.fit(train_data)
predictions = model.transform(test_data)
predictions.select("Customer Id", "CLV", "prediction").show()

+-----------+------------------+------------------+
|Customer Id|               CLV|        prediction|
+-----------+------------------+------------------+
|          2| 62732.79853569999| 60083.53286816209|
|          4|33836.249701447945| 48734.63667210754|
|          4|33836.249701447945| 48734.63667210754|
|          4|33836.249701447945| 48734.63667210754|
|          4|33836.249701447945| 48734.63667210754|
|          4|33836.249701447945| 48734.63667210754|
|          4|33836.249701447945| 48734.63667210754|
|          5|32506.125583440004|  41964.1061338201|
|          6| 50418.37846964298| 72652.09166906371|
|          7| 77261.72402976341| 77083.42483666026|
|          8|47146.391433983634|64487.879380614046|
|          8|47146.391433983634|64487.879380614046|
|          8|47146.391433983634|64487.879380614046|
|          8|47146.391433983634|64487.879380614046|
|          8|47146.391433983634|64487.879380614046|
|          9| 77396.97561693034| 69603.74169491288|
|          9

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='CLV', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("RMSE:", rmse)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print("R-squared (R²):", r2)

RMSE: 11556.866374980169
R-squared (R²): 0.648539664218625


In [None]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol='features', labelCol='CLV')
model = gbt.fit(train_data)
predictions = model.transform(test_data)
predictions.select("Customer Id", "CLV", "prediction").show()
predictions_pandas = predictions.select("Customer Id", "prediction").withColumnRenamed("prediction", "Customer Lifetime Value prediction")
csv_file_path = "/content/drive/MyDrive/predictions/customers_predictions.csv"
predictions_pandas.toPandas().to_csv(csv_file_path, index=False)

+-----------+------------------+------------------+
|Customer Id|               CLV|        prediction|
+-----------+------------------+------------------+
|          2| 62732.79853569999| 58791.00435478454|
|          4|33836.249701447945|43392.945140675816|
|          4|33836.249701447945|43392.945140675816|
|          4|33836.249701447945|43392.945140675816|
|          4|33836.249701447945|43392.945140675816|
|          4|33836.249701447945|43392.945140675816|
|          4|33836.249701447945|43392.945140675816|
|          5|32506.125583440004|36191.088345328164|
|          6| 50418.37846964298| 74994.22317281606|
|          7| 77261.72402976341| 71380.32433896791|
|          8|47146.391433983634| 62798.36606941417|
|          8|47146.391433983634| 62798.36606941417|
|          8|47146.391433983634| 62798.36606941417|
|          8|47146.391433983634| 62798.36606941417|
|          8|47146.391433983634| 62798.36606941417|
|          9| 77396.97561693034| 74239.53683073807|
|          9

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator_rmse = RegressionEvaluator(labelCol="CLV", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)
print("RMSE:", rmse)
evaluator_mae = RegressionEvaluator(labelCol="CLV", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)
print("MAE:", mae)

RMSE: 11282.672954281825
MAE: 8145.781056261307


In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

assembler = VectorAssembler(inputCols=["Total Purchases", "Order Frequency", "Duration"], outputCol="features")
csvdf = assembler.transform(csvdf)

train_data, test_data = csvdf.randomSplit([0.8, 0.2])

# Initialiser le modèle de forêt aléatoire
rf = RandomForestRegressor(featuresCol='features', labelCol='CLV')

# Entraîner le modèle avec les données d'entraînement
model = rf.fit(train_data)

# Faire des prédictions sur les données de test
predictions = model.transform(test_data)

# Afficher les prédictions
predictions.select("Customer Id", "CLV", "prediction").show()

# Évaluer les performances du modèle
evaluator = RegressionEvaluator(labelCol='CLV', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print("RMSE:", rmse)

# Calculer le coefficient de détermination (R²)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print("R-squared (R²):", r2)


+-----------+------------------+------------------+
|Customer Id|               CLV|        prediction|
+-----------+------------------+------------------+
|          2| 62732.79853569999|  58061.5007736413|
|          2| 62732.79853569999|  58061.5007736413|
|          2| 62732.79853569999|  58061.5007736413|
|          2| 62732.79853569999|  58061.5007736413|
|          3| 73004.85284498819| 63762.32376415015|
|          3| 73004.85284498819| 63762.32376415015|
|          4|33836.249701447945| 56736.33910082461|
|          4|33836.249701447945| 56736.33910082461|
|          6| 50418.37846964298| 65503.53660579874|
|          7| 77261.72402976341| 63875.17198233078|
|          7| 77261.72402976341| 63875.17198233078|
|          7| 77261.72402976341| 63875.17198233078|
|          7| 77261.72402976341| 63875.17198233078|
|          7| 77261.72402976341| 63875.17198233078|
|          8|47146.391433983634|63896.257405071505|
|          8|47146.391433983634|63896.257405071505|
|          8

In [None]:
from pyspark.ml.classification import RandomForestClassifier
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")
# Define conditions to determine churn


churn_condition = (col("Late_delivery_risk") == 1) & (col("Order Frequency") < 3) & (col("High_Value_Customer") == 0)

csvdf = csvdf.withColumn("Churn", when(churn_condition, 1).otherwise(0))

churn_features = ["Late_delivery_risk", "Order Frequency", "High_Value_Customer"]

# Drop rows with missing values
churn_data = csvdf.dropna(subset=churn_features)

# Vectorize features
churn_assembler = VectorAssembler(inputCols=churn_features, outputCol="churn_features")
churn_data = churn_assembler.transform(churn_data)

# Train a Random Forest model for churn prediction
churn_rf_model = RandomForestClassifier(featuresCol="churn_features", labelCol="Churn")
churn_model_rf = churn_rf_model.fit(churn_data)

# Make predictions
churn_predictions_rf = churn_model_rf.transform(churn_data)

# Replace prediction values with more descriptive labels
churn_predictions_rf = churn_predictions_rf.withColumn("Prediction_Label", when(col("prediction") == 1.0, "Churn").otherwise("Not Churn"))

# Display predictions with the new labels
churn_predictions_rf.select("Churn", "prediction", "Prediction_Label").show()

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Créer un évaluateur pour calculer l'areaUnderROC
evaluator = BinaryClassificationEvaluator(labelCol="Churn")

# Calculer l'areaUnderROC
auc = evaluator.evaluate(churn_predictions_rf)

# Afficher l'areaUnderROC
print(f"Area Under ROC (Random Forest): {auc}")


+-----+----------+----------------+
|Churn|prediction|Prediction_Label|
+-----+----------+----------------+
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
+-----+----------+----------------+
only showing top 20 rows

Area Under ROC (Random Forest): 0.9928249565791234


In [None]:
from pyspark.ml.classification import GBTClassifier

# Définir le modèle Gradient-Boosted Trees
gbt_model = GBTClassifier(featuresCol="churn_features", labelCol="Churn")

# Entraîner le modèle sur les données
gbt_model = gbt_model.fit(churn_data)

# Faire des prédictions avec le modèle Gradient-Boosted Trees
gbt_predictions = gbt_model.transform(churn_data)

# Remplacer les valeurs de prédiction par des étiquettes plus explicites
gbt_predictions = gbt_predictions.withColumn("Prediction_Label", when(col("prediction") == 1.0, "Churn").otherwise("Not Churn"))

# Afficher les prédictions avec les nouvelles étiquettes
gbt_predictions.select("Churn", "prediction", "Prediction_Label").show()

# Évaluer le modèle Gradient-Boosted Trees
gbt_auc = evaluator.evaluate(gbt_predictions)

# Afficher l'aire sous la courbe ROC pour le modèle Gradient-Boosted Trees
print(f"Area Under ROC (GBT): {gbt_auc}")


+-----+----------+----------------+
|Churn|prediction|Prediction_Label|
+-----+----------+----------------+
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    0|       0.0|       Not Churn|
|    1|       1.0|           Churn|
|    0|       0.0|       Not Churn|
+-----+----------+----------------+
only showing top 20 rows

Area Under ROC (GBT): 0.9999262071190331


In [None]:
import pandas as pd
# Step 1: Read the existing CSV file into a Pandas DataFrame
existing_csv_path = "/content/drive/MyDrive/predictions/customers_predictions.csv"
existing_df = pd.read_csv(existing_csv_path)

# Step 2: Convert the Pandas DataFrame to a Spark DataFrame
existing_spark_df = spark.createDataFrame(existing_df)

# Step 3: Select and rename the columns in the predictions_with_labels DataFrame
predictions_df = gbt_predictions.select("Customer Id", "Prediction_Label").withColumnRenamed("prediction", "Churn Prediction")

# Convert the predictions DataFrame to Pandas DataFrame
predictions_pandas = predictions_df.toPandas()

# Merge the existing Pandas DataFrame with the new predictions on "Order Id"
combined_df = existing_df.merge(predictions_pandas, on="Customer Id", how="left")

# Step 4: Save the combined DataFrame back to the CSV file
combined_df.to_csv(existing_csv_path, index=False)

In [None]:
combined_df

Unnamed: 0,Customer Id,Customer Lifetime Value prediction,Prediction_Label
0,2,61143.725133,Not Churn
1,2,61143.725133,Not Churn
2,2,61143.725133,Not Churn
3,2,61143.725133,Not Churn
4,2,61143.725133,Not Churn
...,...,...,...
907389,20738,11415.433569,Churn
907390,20744,11415.433569,Not Churn
907391,20745,11415.433569,Not Churn
907392,20749,11415.433569,Not Churn


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import when
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import ClusteringEvaluator

csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/content/drive/MyDrive/cleaned_data.csv")


segmentation_features = ["Late_delivery_risk", "Order Frequency", "High_Value_Customer"]


segmentation_data = csvdf.dropna(subset=segmentation_features)

# Vectorize features
segmentation_assembler = VectorAssembler(inputCols=segmentation_features, outputCol="segmentation_features")
segmented_data = segmentation_assembler.transform(segmentation_data)

# Train the K-means model
kmeans = KMeans(k=3, seed=1, featuresCol="segmentation_features")
model = kmeans.fit(segmented_data)

# Make predictions
predictions = model.transform(segmented_data)

# Initialize the evaluator with the correct features column
evaluator = ClusteringEvaluator(featuresCol="segmentation_features")

# Calculate the Silhouette Coefficient
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Coefficient = {silhouette}")

# Get cluster centers to understand cluster characteristics
centers = model.clusterCenters()
for i, center in enumerate(centers):
    print(f"Cluster {i} Center: {center}")


# Based on the analysis above, map clusters to categories
predictions = predictions.withColumn(
    "Customer Category",
    when(predictions.prediction == 0, "Regular Customer")
    .when(predictions.prediction == 1, "At Risk Customer")
    .when(predictions.prediction == 2, "VIP Customer")
)

# Show predictions with customer categories
predictions.select("Customer Id", "prediction", "Customer Category").show()



Silhouette Coefficient = 0.660192271091166
Cluster 0 Center: [ 0.5444786  18.93646994  0.30539035]
Cluster 1 Center: [0.54820398 9.2928048  0.30067455]
Cluster 2 Center: [ 0.55890336 29.35022503  0.30539046]
+-----------+----------+-----------------+
|Customer Id|prediction|Customer Category|
+-----------+----------+-----------------+
|      20755|         1| At Risk Customer|
|      19492|         1| At Risk Customer|
|      19491|         1| At Risk Customer|
|      19490|         1| At Risk Customer|
|      19489|         1| At Risk Customer|
|      19488|         1| At Risk Customer|
|      19487|         1| At Risk Customer|
|      19486|         1| At Risk Customer|
|      19485|         1| At Risk Customer|
|      19484|         1| At Risk Customer|
|      19483|         1| At Risk Customer|
|      19482|         1| At Risk Customer|
|      19481|         1| At Risk Customer|
|      19480|         1| At Risk Customer|
|      19479|         1| At Risk Customer|
|      19478|     

In [None]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Train the hierarchical clustering model
bisecting_kmeans = BisectingKMeans(k=3, seed=1, featuresCol="segmentation_features")
model_hierarchical = bisecting_kmeans.fit(segmented_data)

# Make predictions
predictions_hierarchical = model_hierarchical.transform(segmented_data)

# Initialize the evaluator with the correct features column
evaluator_hierarchical = ClusteringEvaluator(featuresCol="segmentation_features")

# Calculate the Silhouette Coefficient
silhouette_hierarchical = evaluator_hierarchical.evaluate(predictions_hierarchical)
print(f"Silhouette Coefficient (Hierarchical): {silhouette_hierarchical}")

# Get cluster centers to understand cluster characteristics
centers_hierarchical = model_hierarchical.clusterCenters()
for i, center in enumerate(centers_hierarchical):
    print(f"Cluster {i} Center (Hierarchical): {center}")

# Based on the analysis above, map clusters to categories
predictions_hierarchical = predictions_hierarchical.withColumn(
    "Customer Category (Hierarchical)",
    when(predictions_hierarchical.prediction == 0, "Regular Customer")
    .when(predictions_hierarchical.prediction == 1, "At Risk Customer")
    .when(predictions_hierarchical.prediction == 2, "VIP Customer")
)

# Show predictions with customer categories
predictions_hierarchical.select("Customer Id", "prediction", "Customer Category (Hierarchical)").show()
# Initialize the evaluator with the correct features column
evaluator_hierarchical = ClusteringEvaluator(featuresCol="segmentation_features")

# Calculate the Silhouette Coefficient
silhouette_hierarchical = evaluator_hierarchical.evaluate(predictions_hierarchical)
print(f"Silhouette Coefficient (Hierarchical): {silhouette_hierarchical}")

Silhouette Coefficient (Hierarchical): 0.5426586001393826
Cluster 0 Center (Hierarchical): [0.54664089 6.22232963 0.29896354]
Cluster 1 Center (Hierarchical): [ 0.54854156 14.13767526  0.30384639]
Cluster 2 Center (Hierarchical): [ 0.54886422 23.8306855   0.30536832]
+-----------+----------+--------------------------------+
|Customer Id|prediction|Customer Category (Hierarchical)|
+-----------+----------+--------------------------------+
|      20755|         0|                Regular Customer|
|      19492|         0|                Regular Customer|
|      19491|         0|                Regular Customer|
|      19490|         0|                Regular Customer|
|      19489|         0|                Regular Customer|
|      19488|         0|                Regular Customer|
|      19487|         0|                Regular Customer|
|      19486|         0|                Regular Customer|
|      19485|         0|                Regular Customer|
|      19484|         0|            

In [None]:
import pandas as pd
# Step 1: Read the existing CSV file into a Pandas DataFrame
existing_csv_path = "/content/drive/MyDrive/predictions/customers_predictions.csv"
existing_df = pd.read_csv(existing_csv_path)

# Step 2: Convert the Pandas DataFrame to a Spark DataFrame
existing_spark_df = spark.createDataFrame(existing_df)

# Step 3: Select and rename the columns in the predictions_with_labels DataFrame
predictions_df = predictions.select("Customer Id", "Customer Category").withColumnRenamed("Customer Category", "Customer sgmentation Prediction")

# Convert the predictions DataFrame to Pandas DataFrame
predictions_pandas = predictions_df.toPandas()

# Merge the existing Pandas DataFrame with the new predictions on "Order Id"
combined_df = existing_df.merge(predictions_pandas, on="Customer Id", how="left")

# Step 4: Save the combined DataFrame back to the CSV file
combined_df.to_csv(existing_csv_path, index=False)

In [None]:
combined_df.head()

Unnamed: 0,Customer Id,Customer Lifetime Value prediction,Prediction_Label,Customer sgmentation Prediction
0,2,61143.725133,Not Churn,At Risk Customer
1,2,61143.725133,Not Churn,At Risk Customer
2,2,61143.725133,Not Churn,At Risk Customer
3,2,61143.725133,Not Churn,At Risk Customer
4,2,61143.725133,Not Churn,At Risk Customer
