# MACHINE LEARNING COMPONENT

Predict whether a user will eventually make a purchase based on their past behavior

#### Build a user-level feature table in PySpark

In [0]:
from pyspark.sql import functions as F

events = spark.table("gold.ecommerce.fact_events")

user_features = (
    events
    .groupBy("user_id")
    .agg(
        F.count("*").alias("total_events"),
        F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("views"),
        F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("carts"),
        F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        F.countDistinct("product_id").alias("unique_products"),
        F.countDistinct("category").alias("unique_categories"),
        F.avg("price").alias("avg_price"),
        F.max("price").alias("max_price")
    )
    # label: did this user ever purchase?
    .withColumn("label", F.when(F.col("purchases") > 0, 1).otherwise(0))
)


# Save as table
user_features.write.mode("overwrite").saveAsTable("gold.ecommerce.ml_user_features")




#### Prepare data for modeling

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

ml_df = spark.table("gold.ecommerce.ml_user_features")

feature_cols = ["total_events", "views", "carts", "purchases",
                "unique_products", "unique_categories", "avg_price", "max_price"]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

lr = LogisticRegression(featuresCol="features", labelCol="label")

pipeline = Pipeline(stages=[assembler, lr])

train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)


In [0]:
%sql
SHOW VOLUMES IN workspace.ecommerce;

database,volume_name
ecommerce,ecommerce_data


#### Run MLflow training

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.evaluation import BinaryClassificationEvaluator

mlflow.set_experiment("/Shared/ecommerce_ml_experiment")

with mlflow.start_run():
    model = pipeline.fit(train_df)
    preds = model.transform(test_df)
    
    evaluator = BinaryClassificationEvaluator(
        labelCol="label",
        rawPredictionCol="rawPrediction"
    )
    
    auc = evaluator.evaluate(preds)
    
    mlflow.log_metric("test_auc", auc)
    mlflow.spark.log_model(model, "model", dfs_tmpdir="/Volumes/workspace/ecommerce/ecommerce_data/mlflow_tmp")


