# Load data

In [1]:
data_src = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"

In [2]:
AWS_ACCESS_KEY = "XXXXXXX"
AWS_SECRET_KEY = "XXXXXXX"

In [3]:
packages = [
    f'org.apache.hadoop:hadoop-aws:3.3.1',
    'com.google.guava:guava:30.1.1-jre',
    'org.apache.httpcomponents:httpcore:4.4.14', 
    'com.google.inject:guice:4.2.2', 
    'com.google.inject.extensions:guice-servlet:4.2.2'
]

In [4]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()\
    .setMaster("local")\
    .setAppName("pyspark-unittests")\
    .set("spark.sql.parquet.compression.codec", "snappy")\
    .set('spark.jars.packages', ','.join(packages))

sc = SparkContext.getOrCreate(conf) 
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY)

23/08/03 09:54:26 WARN Utils: Your hostname, DESKTOP-HT1RH4E resolves to a loopback address: 127.0.1.1; using 172.29.121.89 instead (on interface eth0)
23/08/03 09:54:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/greg/.pyenv/versions/3.8.12/envs/lewagon/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/greg/.ivy2/cache
The jars for the packages stored in: /home/greg/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.google.guava#guava added as a dependency
org.apache.httpcomponents#httpcore added as a dependency
com.google.inject#guice added as a dependency
com.google.inject.extensions#guice-servlet added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-00b0b72c-8046-4d99-a1b0-dcf2c6c7b380;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.google.guava#guava;30.1.1-jre in central
	found com.google.guava#failureaccess;1.0.1 in central
	found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found org.checkerframework#checker-qual;3.8.0 in central
	found c

In [5]:
spark = SparkSession(sc)

In [6]:
df = spark.read.json("s3a://udacity-dsnd/sparkify/mini_sparkify_event_data.json")

23/08/03 09:54:33 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

# Process data for prediction

Filter out all not logged-in users

In [9]:
df = df.filter(df["userId"] != "")

In [10]:
df.createOrReplaceTempView("events")

In [43]:
avg_session_len_per_user = spark.sql('''
    SELECT userId, AVG(length) AS avg_session_length FROM (
        SELECT userId, MAX(ts) - MIN(ts) AS length FROM events
        GROUP BY userId, sessionId
    )
    GROUP BY userId
''')

In [12]:
import pandas as pd
from pyspark.sql.functions import last, udf, pandas_udf, PandasUDFType
from pyspark.sql.types import FloatType

In [13]:
session_ends = df.sort("ts").groupby(df.userId, df.sessionId).agg(last(df.ts).alias("end"))
session_ends.show(1)

[Stage 2:>                                                          (0 + 1) / 1]

+------+---------+-------------+
|userId|sessionId|          end|
+------+---------+-------------+
|300011|       60|1538587993000|
+------+---------+-------------+
only showing top 1 row



                                                                                

In [14]:
@pandas_udf(FloatType())
def time_bw_sessions(s: pd.Series) -> float:
    return s.diff().mean()

In [15]:
avg_time_bw_sessions = session_ends.sort("sessionId").groupby("userId").agg(time_bw_sessions("end"))

In [None]:
time_per_day = spark.sql('''
    SELECT events.userId, ROUND(COUNT(*) / first(a.days), 2) AS pages_per_day
    FROM events
    JOIN (
        SELECT userId, GREATEST(1, ROUND((MAX(ts) - MIN(ts)) / 3600 / 24 / 1000, 2)) AS days FROM events
        GROUP BY userId
    ) AS a ON events.userId = a.userId
    GROUP BY events.userId
''')

In [24]:
time_per_day.show(5)

                                                                                

+------+-------------+
|userId|pages_per_day|
+------+-------------+
|100010|         8.62|
|200002|        10.42|
|   125|        550.0|
|    51|       156.15|
|   124|        80.42|
+------+-------------+
only showing top 5 rows



In [54]:
df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId']

Find how many times a user visits each page

In [28]:
pages_per_user_by_type = spark.sql('''
    SELECT userId, page, COUNT(*) AS page_visits
    FROM events
    GROUP BY userId, page
''')

In [35]:
from pyspark.sql.functions import coalesce, sum as fsum, lit

In [40]:
pages_per_user_by_type = pages_per_user_by_type.groupby("userId").pivot("page").sum("page_visits").na.fill(0)

                                                                                

In [74]:
final = pages_per_user_by_type.join(time_per_day, on="userId").join(avg_session_len_per_user, on="userId")

In [69]:
from pyspark.sql.functions import isnan, when, count, col

final.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in final.columns]).show()

                                                                                

+------+-----+----------+---------------+------+-------------------------+---------+-----+----+----+------+--------+-----------+-------------+--------+----------------+--------------+-----------+---------+-------+-------------+------------------+
|userId|About|Add Friend|Add to Playlist|Cancel|Cancellation Confirmation|Downgrade|Error|Help|Home|Logout|NextSong|Roll Advert|Save Settings|Settings|Submit Downgrade|Submit Upgrade|Thumbs Down|Thumbs Up|Upgrade|pages_per_day|avg_session_length|
+------+-----+----------+---------------+------+-------------------------+---------+-----+----+----+------+--------+-----------+-------------+--------+----------------+--------------+-----------+---------+-------+-------------+------------------+
|     0|    0|         0|              0|     0|                        0|        0|    0|   0|   0|     0|       0|          0|            0|       0|               0|             0|          0|        0|      0|            1|                 0|
+------+----

# Modeling

In [47]:
from pyspark.ml.feature import VectorAssembler

Including visits to the "Cancel" page would likely introduce data leakage. And we want to predict churn before the user cancels so let's drop those columns

In [52]:
feature_cols = list(set(final.columns) - set(["userId", "Cancel", "Cancellation Confirmation"]))

In [53]:
feature_cols

['Logout',
 'Thumbs Up',
 'pages_per_day',
 'Help',
 'NextSong',
 'Save Settings',
 'Add Friend',
 'Roll Advert',
 'Upgrade',
 'Thumbs Down',
 'Submit Upgrade',
 'Add to Playlist',
 'Settings',
 'avg_session_length',
 'Home',
 'Downgrade',
 'Submit Downgrade',
 'Error',
 'About']

In [56]:
vec_assembler = VectorAssembler(outputCol="features")
vec_assembler.setInputCols(feature_cols)

VectorAssembler_9854c319a9b4

In [79]:
from pyspark.ml.classification import GBTClassifier

In [75]:
data = vec_assembler.transform(final).select("features", final["Cancellation Confirmation"].alias("label"))

In [84]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

gbt = GBTClassifier(maxIter=5, maxDepth=5, labelCol="label", seed=42,
    leafCol="leafId")

model = gbt.fit(trainingData)

                                                                                

In [98]:
# Evaluate model on test instances and compute test error
predictions = model.transform(testData)

In [117]:
from pyspark.sql.functions import when

In [121]:
confusion_mat = predictions.select(
    fsum(when(predictions.label + predictions.prediction == 0, 1).otherwise(0)).alias("TN"),
    fsum(when(predictions.label + predictions.prediction == 2, 1).otherwise(0)).alias("TP"),
    fsum(when(predictions.label > predictions.prediction, 1).otherwise(0)).alias("FN"),
    fsum(when(predictions.label < predictions.prediction, 1).otherwise(0)).alias("FP")
).collect()

                                                                                

In [128]:
precision = confusion_mat[0]["TP"] / (confusion_mat[0]["TP"] + confusion_mat[0]["FP"])
recall = confusion_mat[0]["TP"] / (confusion_mat[0]["TP"] + confusion_mat[0]["FN"])
precision, recall

(0.6363636363636364, 0.5)

Not the best results, let's try some cross validation and grid-searching.

In [129]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
import tempfile

gbt = GBTClassifier(maxIter=5, maxDepth=5, labelCol="label", seed=42,
    leafCol="leafId")
grid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [5, 10, 50]) \
    .addGrid(gbt.maxDepth, [2, 5, 10]) \
    .build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=gbt, estimatorParamMaps=grid, evaluator=evaluator)

cvModel = cv.fit(data)

                                                                                0]

In [138]:
cvModel.bestModel.getMaxDepth()

5

In [139]:
cvModel.bestModel.getMaxIter()

5

Much better scores after CV, but this is not a true model scoring as the model has seen the test set during training. There is not enough data to split further into an eval set, but we can use these params as a base for further tuning on the whole dataset later.

In [140]:
# Evaluate model on test instances and compute test error
predictions = cvModel.bestModel.transform(testData)
confusion_mat = predictions.select(
    fsum(when(predictions.label + predictions.prediction == 0, 1).otherwise(0)).alias("TN"),
    fsum(when(predictions.label + predictions.prediction == 2, 1).otherwise(0)).alias("TP"),
    fsum(when(predictions.label > predictions.prediction, 1).otherwise(0)).alias("FN"),
    fsum(when(predictions.label < predictions.prediction, 1).otherwise(0)).alias("FP")
).collect()
precision = confusion_mat[0]["TP"] / (confusion_mat[0]["TP"] + confusion_mat[0]["FP"])
recall = confusion_mat[0]["TP"] / (confusion_mat[0]["TP"] + confusion_mat[0]["FN"])
precision, recall

                                                                                

(1.0, 0.7857142857142857)