In [0]:
df = spark.read.format("csv") \
    .option("header","true") \
    .option("inferSchema","true") \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")

In [0]:
from pyspark.sql import functions as F

df.select(F.min("event_time"), F.max("event_time")).show()

+-------------------+-------------------+
|    min(event_time)|    max(event_time)|
+-------------------+-------------------+
|2019-10-01 00:00:00|2019-10-31 23:59:59|
+-------------------+-------------------+



In [0]:
cutoff = "2019-10-20"

In [0]:
feature_data = df.filter(F.col("event_time") <= cutoff)

features_df = feature_data.groupBy("user_id").agg(
    F.count("*").alias("total_events"),
    F.sum(F.when(F.col("event_type")=="view",1).otherwise(0)).alias("views"),
    F.sum(F.when(F.col("event_type")=="cart",1).otherwise(0)).alias("carts"),
    F.countDistinct("product_id").alias("unique_products"),
    F.avg("price").alias("avg_price")
)

In [0]:
label_data = df.filter(F.col("event_time") > cutoff)

label_df = label_data.groupBy("user_id").agg(
    F.max(
        F.when(F.col("event_type")=="purchase",1).otherwise(0)
    ).alias("purchased")
)

In [0]:
training_data = features_df.join(label_df, "user_id", "left") \
    .fillna({"purchased":0})

In [0]:
training_data.groupBy("purchased").count().show()

+---------+-------+
|purchased|  count|
+---------+-------+
|        1|  92199|
|        0|2040071|
+---------+-------+



In [0]:
train, test = training_data.randomSplit([0.8, 0.2], seed=42)

print("Train:", train.count())
print("Test:", test.count())

Train: 1706532
Test: 425738


In [0]:
train.groupBy("purchased").count().show()
test.groupBy("purchased").count().show()

+---------+-------+
|purchased|  count|
+---------+-------+
|        1|  73720|
|        0|1632812|
+---------+-------+

+---------+------+
|purchased| count|
+---------+------+
|        1| 18479|
|        0|407259|
+---------+------+



In [0]:
class_counts = train.groupBy("purchased").count().collect()

neg = [r['count'] for r in class_counts if r['purchased']==0][0]
pos = [r['count'] for r in class_counts if r['purchased']==1][0]

ratio = neg / pos

print("Imbalance ratio:", ratio)

Imbalance ratio: 22.1488334237656


In [0]:
train = train.withColumn(
    "class_weight",
    F.when(F.col("purchased")==1, ratio).otherwise(1.0)
)

In [0]:
from pyspark.ml.feature import VectorAssembler

feature_cols = [
    "total_events",
    "views",
    "carts",
    "unique_products",
    "avg_price"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

train = assembler.transform(train)
test = assembler.transform(test)

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    labelCol="purchased",
    featuresCol="features",
    weightCol="class_weight"
)

model = lr.fit(train)

In [0]:
predictions = model.transform(test)

predictions.select("purchased","probability","prediction").show(5)

+---------+--------------------+----------+
|purchased|         probability|prediction|
+---------+--------------------+----------+
|        0|[0.63560667708261...|       0.0|
|        0|[0.62770274649042...|       0.0|
|        0|[0.63112820976346...|       0.0|
|        0|[0.55481310282516...|       0.0|
|        0|[0.60799528595077...|       0.0|
+---------+--------------------+----------+
only showing top 5 rows


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="purchased"
)

auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.7683527380009435


In [0]:
train 

DataFrame[user_id: int, total_events: bigint, views: bigint, carts: bigint, unique_products: bigint, avg_price: double, purchased: int, class_weight: double, features: vector]

In [0]:
test

DataFrame[user_id: int, total_events: bigint, views: bigint, carts: bigint, unique_products: bigint, avg_price: double, purchased: int, features: vector]

In [0]:
print("Logistic AUC:", auc)     

Logistic AUC: 0.7683527380009435


# Day-7

In [0]:
import mlflow
import mlflow.spark

In [0]:
display(spark.sql("SHOW VOLUMES"))

database,volume_name
default,mlflow_volume


In [0]:
spark.sql("""
CREATE VOLUME IF NOT EXISTS workspace.default.mlflow_volume
""")

DataFrame[]

In [0]:
import os
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/default/mlflow_volume/tmp"

with mlflow.start_run(run_name="Logistic_Regression"):

    from pyspark.ml.classification import LogisticRegression

    lr = LogisticRegression(
        labelCol="purchased",
        featuresCol="features",
        weightCol="class_weight"
    )

    lr_model = lr.fit(train)

    lr_predictions = lr_model.transform(test)

    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    evaluator = BinaryClassificationEvaluator(labelCol="purchased")

    lr_auc = evaluator.evaluate(lr_predictions)

    # Log parameters
    mlflow.log_param("model_type", "LogisticRegression")

    # Log metrics
    mlflow.log_metric("AUC", lr_auc)

    # Log model
    mlflow.spark.log_model(lr_model, "logistic_model")

    print("Logged Logistic AUC:", lr_auc)



Logged Logistic AUC: 0.7683583707367663


# Day-8

In [0]:
best_model = lr_model

In [0]:
import mlflow.spark

best_model = mlflow.spark.load_model("runs:/9ea4b4a733df4df7b023f23ff23c9ead/logistic_model")

In [0]:
all_features = assembler.transform(features_df)

batch_predictions = best_model.transform(all_features)

In [0]:
all_features

DataFrame[user_id: int, total_events: bigint, views: bigint, carts: bigint, unique_products: bigint, avg_price: double, features: vector]

In [0]:
batch_predictions

DataFrame[user_id: int, total_events: bigint, views: bigint, carts: bigint, unique_products: bigint, avg_price: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [0]:
gold_predictions = batch_predictions.select(
    "user_id",
    "total_events",
    "views",
    "carts",
    "unique_products",
    "avg_price",
    "prediction"
)

In [0]:
gold_predictions

DataFrame[user_id: int, total_events: bigint, views: bigint, carts: bigint, unique_products: bigint, avg_price: double, prediction: double]

In [0]:
gold_predictions.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ecommerce.gold_user_predictions_1")

In [0]:
from pyspark.sql import functions as F

top_buyers = gold_predictions.filter(
    F.col("prediction") == 1
)

top_buyers.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ecommerce.gold_top_buyers_1")

In [0]:
%sql
SELECT user_id, total_events, views, carts
FROM ecommerce.gold_top_buyers_1
ORDER BY total_events DESC
LIMIT 10;

user_id,total_events,views,carts
512365995,2433,2431,2
546270188,2126,2126,0
512792872,1774,1769,5
515240336,1720,1716,2
546159478,1690,1683,1
516948072,1648,1642,0
513021392,1647,1597,43
551211823,1508,1508,0
526731152,1501,1501,0
530599462,1482,1428,25


In [0]:
%sql
SELECT prediction, COUNT(*) 
FROM ecommerce.gold_user_predictions
GROUP BY prediction;

prediction,COUNT(*)
1.0,333040
0.0,1799230
