In [8]:
import mlflow.spark
from pyspark.sql.functions import col, current_timestamp, when
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import VectorAssembler

# Configuration
MODEL_NAME = "Online_Shopper_Predictor"
# On récupère la version la plus récente marquée comme 'latest' ou 'Production'
MODEL_URI = f"models:/{MODEL_NAME}/latest" 

print(f"Chargement du modèle {MODEL_NAME} pour le scoring quotidien")

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 11, Finished, Available, Finished)

Chargement du modèle Online_Shopper_Predictor pour le scoring quotidien


In [9]:
EXPERIMENT_NAME = "Shopping_Intention_Analysis"
#  Récupération dynamique du dernier modèle entraîné
try:
    # 1. Trouver l'expérience et le dernier Run
    exp = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
    runs = mlflow.search_runs(experiment_ids=[exp.experiment_id], order_by=["attributes.start_time DESC"], max_results=1)
    last_run_id = runs.iloc[0].run_id
    
    # 2. Lister les artefacts pour trouver le nom du dossier du modèle
    client = mlflow.tracking.MlflowClient()
    artifacts = client.list_artifacts(last_run_id)
    
    # On cherche un dossier qui contient "model" dans son nom
    model_path = next((art.path for art in artifacts if art.is_dir and "model" in art.path), None)
    
    if not model_path:
        raise Exception(f"Aucun dossier de modèle trouvé dans le Run {last_run_id}. Vérifiez l'enregistrement dans le Notebook 04.")

    MODEL_URI = f"runs:/{last_run_id}/{model_path}"
    print(f" Dossier détecté : {model_path} | Run ID : {last_run_id}")
    
    # 3. Charger le modèle
    loaded_model = mlflow.spark.load_model(MODEL_URI)
    print(" Modèle chargé avec succès.")

except Exception as e:
    print(f" Erreur : {e}")
    # Si cela échoue encore, essayez le chemin forcé utilisé dans votre Notebook 04 :
    # MODEL_URI = f"runs:/{last_run_id}/random_forest_shopping_model"

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 12, Finished, Available, Finished)

2025/12/31 01:33:24 INFO mlflow.spark: 'runs:/5f54ac8d-2dea-4e46-af46-e8602dab2b45/random_forest_shopping_model' resolved as 'sds://onelakecentralus.pbidedicated.windows.net/5ca0301c-08f2-4ed5-9f8f-8bdc1e9da703/ea2e5dec-8bc5-4e6a-a5ce-3f1bc8a9c8a9/Data/5f54ac8d-2dea-4e46-af46-e8602dab2b45/artifacts/random_forest_shopping_model'


 Dossier détecté : random_forest_shopping_model | Run ID : 5f54ac8d-2dea-4e46-af46-e8602dab2b45


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]

2025/12/31 01:33:25 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false
2025/12/31 01:33:25 INFO mlflow.spark: File 'runs:/5f54ac8d-2dea-4e46-af46-e8602dab2b45/random_forest_shopping_model/sparkml' not found on DFS. Will attempt to upload the file.
2025/12/31 01:33:26 INFO mlflow.spark: Copied SparkML model to Files/tmp/mlflow/463d0b5f-8f3a-4a0a-881e-d4a45a8e23ba


 Modèle chargé avec succès.


StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 13, Finished, Available, Finished)

In [15]:
# Charger les données Gold
df_new = spark.read.table("gold.features_shopping")

#  Définition des colonnes 
feature_cols = [
    'Administrative', 'Informational', 'ProductRelated', 'BounceRates', 'ExitRates', 
    'PageValues', 'SpecialDay', 'Month_num', 'Weekend_label', 'TotalPages', 
    'TotalDuration', 'IsReturningVisitor', 'pages_per_minute', 
    'is_peak_shopping_season', 'high_intent_session'
]

#  Recréer la colonne 'features' attendue par le modèle
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_ready = assembler.transform(df_new)

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 19, Finished, Available, Finished)

In [17]:
# 4. Appliquer le modèle chargé (Scoring)
print(" Application du modèle sur les nouvelles données.")
predictions = loaded_model.transform(df_ready)

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 21, Finished, Available, Finished)

 Application du modèle sur les nouvelles données.


In [18]:
# Post-traitement et sauvegarde
final_results = predictions.withColumn("prob_array", vector_to_array(col("probability"))) \
    .withColumn("Purchase_Probability", col("prob_array")[1]) \
    .select(
        "VisitorType",
        "Month",
        "PageValues",
        col("Revenue_label").alias("Actual_Purchase"),
        col("prediction").alias("Predicted_Purchase"),
        "Purchase_Probability",
        current_timestamp().alias("Scoring_Date")
    )

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 22, Finished, Available, Finished)

In [20]:
#Sauvegarde finale
final_results.write.mode("overwrite").format("delta").saveAsTable("gold.predictions_results_pbi")

print(" Scoring terminé et sauvegardé dans 'gold.predictions_results_pbi'")
display(final_results.limit(10))

StatementMeta(, 4b738bcf-ebfd-4a9e-896c-0efc515d90b2, 24, Finished, Available, Finished)

 Scoring terminé et sauvegardé dans 'gold.predictions_results_pbi'


SynapseWidget(Synapse.DataFrame, f957217c-1756-46bb-aae2-4aaa29bfbc30)