# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, LinearSVC, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, PCA, RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import time


import matplotlib.pyplot as plt
import seaborn as sns



In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
# create a Spark session
spark = SparkSession.builder.master('local[*]').appName("Sparkfy").getOrCreate()
spark

In [4]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids.

In [5]:
# load in the dataset
df = spark.read.json("mini_sparkify_event_data.json")

# Preview the data
df.printSchema()

df.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+----------------+---------+---------+------+-------------+--------+---------+-----+------------------------------+------+--------+-------------+---------+-----------------------------+------+-------------+---------------------------------------------------------------------------------------------------

### Dealing with null values

In [6]:
# Check the total number of records before cleaning
print(f"Total records before cleaning: {df.count()}")

Total records before cleaning: 286500


In [7]:
# Check for null values in each column

missing_values = df.select([
    sum(when(col(c).isNull() | (col(c) == ''), 1).otherwise(0)).alias(c)
    for c in df.columns
])

missing_values.show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId| song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+
| 58392|   0|     8346|  8346|            0|    8346| 58392|    0|    8346|     0|   0|        8346|        0|58392|     0|  0|     8346|  8346|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+-----+------+---+---------+------+



The same count (58,392) for artist, length, and song suggests these nulls occur together (likely non-song pages)

Another group (8,346) affects user-related fields consistently

In [8]:
# Filter out rows where userId is an empty string
df = df.filter(df.userId != '')
print(f"Total records after cleaning: {df.count()}")

Total records after cleaning: 278154


# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

Churn is when a user cancels their subscription. We’ll identify them by Cancellation Confirmation events.

 ### Pages Analysis and defining churn

In [9]:
pages = df.select('page').groupBy('page').count().orderBy('count', ascending=False).toPandas()
pages

Unnamed: 0,page,count
0,NextSong,228108
1,Thumbs Up,12551
2,Home,10082
3,Add to Playlist,6526
4,Add Friend,4277
5,Roll Advert,3933
6,Logout,3226
7,Thumbs Down,2546
8,Downgrade,2055
9,Settings,1514


Churn is when a user cancels their subscription. We’ll identify them by Cancellation Confirmation events.

In [10]:
# defining Churn Label data
churned_df = df.withColumn('churn', when((col('page').isin(['Cancellation Confirmation'])),1 ).otherwise(0))\
      .groupBy('userId').agg(sum('churn').alias('churn')).withColumn('churn', when(col('churn')>=1 ,1).otherwise(0))

churned_df.show()

+------+-----+
|userId|churn|
+------+-----+
|   125|    1|
|    51|    1|
|   124|    0|
|     7|    0|
|    54|    1|
|    15|    0|
|   155|    0|
|   132|    0|
|   154|    0|
|   101|    1|
|    11|    0|
|   138|    0|
|    29|    1|
|    69|    0|
|    42|    0|
|   112|    0|
|    87|    1|
|    73|    1|
|    64|    0|
|     3|    1|
+------+-----+
only showing top 20 rows



In [11]:
# Join the churn status back with the main DataFrame
df = df.join(churned_df, on='userId')

#groupby churn to get counts
df.select(['userId', 'churn']).dropDuplicates().groupBy('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|   52|
|    0|  173|
+-----+-----+



52 churned users compared to 173 active users, which indicates that the dataset is quite imbalanced in terms of churn.

### Churn by activity level

let's explore the activity level of these users to see how engaged the churned users were compared to the non-churned users. We can do this by looking at the average itemInSession (which shows how many items a user interacted with in a session) and average session length (length).


In [12]:
# Calculate average itemInSession and session length for each churn group
activity_analysis = df.groupBy("churn") \
    .agg(
        {"itemInSession": "avg", "length": "avg"}
    ).withColumnRenamed("avg(itemInSession)", "avg_item_in_session") \
     .withColumnRenamed("avg(length)", "avg_session_length")

# Show the activity analysis for churned vs non-churned users
activity_analysis.show()



+-----+-------------------+------------------+
|churn|avg_item_in_session|avg_session_length|
+-----+-------------------+------------------+
|    1| 109.23299304564907|248.63279564406238|
|    0|  115.9888465000643|249.20913538880825|
+-----+-------------------+------------------+



This suggests that churned users weren't drastically less engaged in terms of session length or item interactions.

### Churn by Gender and Subscription Level


In [13]:
# churn by gender
df.select('userId','gender','churn') \
    .distinct() \
    .groupBy('gender','churn') \
    .count() \
    .show()

+------+-----+-----+
|gender|churn|count|
+------+-----+-----+
|     M|    1|   32|
|     F|    0|   84|
|     F|    1|   20|
|     M|    0|   89|
+------+-----+-----+



It appears that females and free-tier users have slightly higher churn rates, but the overall churn distribution is still imbalanced, with fewer churned users in general.

In [14]:
# Churn by subscription level
df.select('userId', 'level', 'churn') \
    .distinct() \
    .groupBy('level', 'churn') \
    .count() \
    .show()

+-----+-----+-----+
|level|churn|count|
+-----+-----+-----+
| paid|    0|  129|
| free|    0|  149|
| paid|    1|   36|
| free|    1|   46|
+-----+-----+-----+



# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

### Time-based Features

Change to datetime format and add them to the dataframe

In [15]:
#Change to datetime format and add them to the dataframe

df = df.withColumn("registration_time", from_unixtime(col("registration") / 1000))
df = df.withColumn("event_time", from_unixtime(col("ts") / 1000))

In [16]:
#Creating a column of last interaction per user

last_interaction_df =  df.groupBy('userId').agg(max('ts').alias('lastIteraction'))
df = last_interaction_df.join(df, on= 'userId', how='left').withColumn('registeredDays', ((f.col('lastIteraction')-f.col('registration'))/86400000).cast(IntegerType()))

### Session-based Features:

In [17]:
# Number of sessions per user
df_session_count = df.groupBy("userId").agg(
    f.countDistinct("sessionId").alias("session_count")
)


In [18]:
# Average session length per user
df_avg_session_length = df.groupBy("userId").agg(
    f.avg("length").alias("avg_session_length")
)

In [19]:
# Max session length per user
df_max_session_length = df.groupBy("userId").agg(
    f.max("length").alias("max_session_length")
)

### User Behavior Features:

In [20]:
# Total songs played per user
df_total_songs_played = df.groupBy("userId").agg(
    f.countDistinct("song").alias("total_songs_played")
)

In [21]:
# Total interactions per user (page views, song plays, etc.)
df_total_interactions = df.groupBy("userId").agg(
    f.count("page").alias("total_interactions")
)

In [22]:
# Number of roll ads actions per user
df_roll_ads = df.filter(df.page == "Roll Advert").groupBy("userId").agg(
    f.count("page").alias("num_roll_ads")
)

In [23]:
# Number of thumbs down actions per user
df_thumb_down = df.filter(df.page == "Thumbs Down").groupBy("userId").agg(
    f.count("page").alias("num_thumb_down")
)


In [24]:
# Number of thumbs up actions per user
df_thumb_up = df.filter(df.page == "Thumbs Up").groupBy("userId").agg(
    f.count("page").alias("num_thumb_up")
)

In [25]:
# Number of friends added per user
df_friends_added = df.filter(df.page == "Add Friend").groupBy("userId").agg(
    f.count("page").alias("num_friends_added")
)

In [26]:
# Number of songs added to playlist per user
df_songs_added_to_playlist = df.filter(df.page == "Add to Playlist").groupBy("userId").agg(
    f.count("page").alias("num_songs_added_to_playlist")
)

In [27]:
# Join all these new feature DataFrames to the main DataFrame

df = df.join(df_session_count, on="userId", how="left")
df = df.join(df_avg_session_length, on="userId", how="left")
df = df.join(df_max_session_length, on="userId", how="left")
df = df.join(df_total_songs_played, on="userId", how="left")
df = df.join(df_total_interactions, on="userId", how="left")
df = df.join(df_roll_ads, on="userId", how="left")
df = df.join(df_thumb_down, on="userId", how="left")
df = df.join(df_thumb_up, on="userId", how="left")
df = df.join(df_friends_added, on="userId", how="left")
df = df.join(df_songs_added_to_playlist, on="userId", how="left")

df.take(3)

[Row(userId='124', lastIteraction=1543590766000, artist=None, auth='Logged In', firstName='Nicole', gender='F', itemInSession=0, lastName='Beck', length=None, level='paid', location='Vineland-Bridgeton, NJ', method='GET', page='Home', registration=1532224335000, sessionId=123, song=None, status=200, ts=1538407030000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', churn=0, registration_time='2018-07-22 01:52:15', event_time='2018-10-01 15:17:10', registeredDays=131, session_count=29, avg_session_length=248.17653659965674, max_session_length=2369.04444, total_songs_played=3339, total_interactions=4825, num_roll_ads=4, num_thumb_down=41, num_thumb_up=171, num_friends_added=74, num_songs_added_to_playlist=118),
 Row(userId='124', lastIteraction=1543590766000, artist='Train', auth='Logged In', firstName='Nicole', gender='F', itemInSession=1, lastName='Beck', length=227.47383, level='paid', location='Vin

In [28]:
num_rows = df.count()
num_cols = len(df.dtypes)
print(f'The dataframe imported has {num_rows} rows, and {num_cols} columns.')

The dataframe imported has 278154 rows, and 33 columns.


In [29]:
feature_df = df.groupBy("userId").agg(
    first("gender").alias("gender"),
    last("level").alias("level"),  # last known level
    first("churn").alias("churn"),
    first("registeredDays").alias("registeredDays"),
    first("session_count").alias("session_count"),
    first("avg_session_length").alias("avg_session_length"),
    first("max_session_length").alias("max_session_length"),
    first("total_songs_played").alias("total_songs_played"),
    first("total_interactions").alias("total_interactions"),
    first("num_roll_ads").alias("num_roll_ads"),
    first("num_thumb_down").alias("num_thumb_down"),
    first("num_thumb_up").alias("num_thumb_up"),
    first("num_friends_added").alias("num_friends_added"),
    first("num_songs_added_to_playlist").alias("num_songs_added_to_playlist")
)

In [30]:
feature_df = feature_df.fillna(0)

feature_df = feature_df.withColumnRenamed("churn", "label")

In [31]:
feature_df.show(3)

#check this works
feature_df.printSchema()

+------+------+-----+-----+--------------+-------------+------------------+------------------+------------------+------------------+------------+--------------+------------+-----------------+---------------------------+
|userId|gender|level|label|registeredDays|session_count|avg_session_length|max_session_length|total_songs_played|total_interactions|num_roll_ads|num_thumb_down|num_thumb_up|num_friends_added|num_songs_added_to_playlist|
+------+------+-----+-----+--------------+-------------+------------------+------------------+------------------+------------------+------------+--------------+------------+-----------------+---------------------------+
|   100|     M| paid|    0|            64|           35|250.88659828113387|        2520.99873|              2302|              3214|          25|            27|         148|               49|                         61|
|100004|     F| paid|    0|           172|           21|245.73289733545636|        1485.97506|               881|       

## Processing

Now we have a dataframe with all the features we can into our model where each row represents a user.However first we need to do some preprocessing.

### Encode categorical features (gender, level, userAgent and location)

In [32]:
# StringIndexer for categorical columns
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
level_indexer = StringIndexer(inputCol="level", outputCol="level_index")



In [33]:
# VectorAssembler for all features
assembler = VectorAssembler(
    inputCols=[
        "registeredDays",
        "session_count",
        "avg_session_length",
        "max_session_length",
        "total_songs_played",
        "total_interactions",
        "num_roll_ads",
        "num_thumb_down",
        "num_thumb_up",
        "num_friends_added",
        "num_songs_added_to_playlist",
        "gender_index",
        "level_index"
    ],
    outputCol="features"
)

# Define pipeline
pipeline = Pipeline(stages=[gender_indexer, level_indexer, assembler])

# Fit and transform the data
pipeline_model = pipeline.fit(feature_df)
processed_df = pipeline_model.transform(feature_df)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [34]:
# Set seed for reproducibility
train_data, val_data, test_data = processed_df.randomSplit([0.7, 0.15, 0.15], seed=42)

# Optional: check counts
print(f"Train count: {train_data.count()}")
print(f"Validation count: {val_data.count()}")
print(f"Test count: {test_data.count()}")

Train count: 153
Validation count: 46
Test count: 26


In [38]:
# instantiate all of our models and include a seed for reproduciblity where possible
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', seed=1996)
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'label', maxIter=10, seed=1996)
lsvc = LinearSVC(featuresCol = 'features', labelCol = 'label')
nb = NaiveBayes(featuresCol = 'features', labelCol = 'label')

#list of models
model_list = [lr,rf,gbt,lsvc,nb]




In [41]:
# evaluator we are using is multiclassclassificationevaluator to get the F1 scores
evaluator = MulticlassClassificationEvaluator(labelCol = 'label', predictionCol='prediction')

In [42]:
# for loop to go through all our models
for model in model_list:
    # get model name
    model_name = model.__class__.__name__

    # print training started
    print(model_name, 'training started')

    # start time
    start = time.time()
    # fit the models on train dataset
    model = model.fit(train_data)
    # end time
    end = time.time()

    # print training ended
    print(model_name, 'training ended')
    # print time taken
    print('Time taken for {} is:'.format(model_name),(end-start),'seconds')

    # predict
    print(model_name, 'predicting started')
    predictions = model.transform(val_data)
    print(model_name, 'predicting ended')

    # get metrics to evaluate
    # f1
    print('F1 for {} is:'.format(model_name), evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))
    # accuracy
    accuracy = predictions.filter(predictions.label == predictions.prediction).count() / (predictions.count())
    print("The accuracy of the {} model is:".format(model_name), accuracy)

LogisticRegression training started
LogisticRegression training ended
Time taken for LogisticRegression is: 152.3760223388672 seconds
LogisticRegression predicting started
LogisticRegression predicting ended
F1 for LogisticRegression is: 0.7504290617848971
The accuracy of the LogisticRegression model is: 0.782608695652174
RandomForestClassifier training started
RandomForestClassifier training ended
Time taken for RandomForestClassifier is: 217.51329922676086 seconds
RandomForestClassifier predicting started
RandomForestClassifier predicting ended
F1 for RandomForestClassifier is: 0.6467391304347826
The accuracy of the RandomForestClassifier model is: 0.7391304347826086
GBTClassifier training started
GBTClassifier training ended
Time taken for GBTClassifier is: 114.79780721664429 seconds
GBTClassifier predicting started
GBTClassifier predicting ended
F1 for GBTClassifier is: 0.7747584541062802
The accuracy of the GBTClassifier model is: 0.782608695652174
LinearSVC training started
Linea

Model Selection and Tuning
Now that we have our model results, we can select the best candidate for tuning. While Logistic Regression, Random Forest, and Gradient Boosted Trees (GBT) all performed reasonably well, GBTClassifier stood out with the highest F1 score (0.775) and accuracy (0.783), making it the top-performing model overall.

Although Random Forest was slightly faster (around 3 minutes 37 seconds) than GBT (about 1 minute 55 seconds), it had a noticeably lower F1 score (0.647) and accuracy (0.739). Considering our goal is to optimize F1 score—especially given the class imbalance in churn prediction—the GBTClassifier provides the best balance between performance and efficiency.

In [43]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", seed=42)

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5, 7]) \
    .addGrid(gbt.maxIter, [10, 20, 30]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

In [44]:
cv = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=42
)

In [None]:
cv_model = cv.fit(train_data)

# Best model from CV
best_gbt_model = cv_model.bestModel

# Predict on validation
val_predictions = best_gbt_model.transform(val_data)
val_f1 = evaluator.evaluate(val_predictions)
print(f"Validation F1 Score after tuning: {val_f1}")

# Predict on test
test_predictions = best_gbt_model.transform(test_data)
test_f1 = evaluator.evaluate(test_predictions)
print(f"Test F1 Score after tuning: {test_f1}")

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.