In [32]:
import re

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import count as Fcount
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.window import Window


In [33]:
dfpath = '../data/medium-sparkify-event-data.json'
spark = SparkSession\
        .builder\
        .appName('sparkify_etl')\
        .getOrCreate()

df = spark.read.json(dfpath)

# EDA

In [34]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [35]:
valuable_columns = ['artist', 'auth', 'gender', 'itemInSession', 'level', 'location', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']
df = df.select(valuable_columns)

In [36]:
for column in df.columns:
    df.describe(column).show()

+-------+-----------------+
|summary|           artist|
+-------+-----------------+
|  count|           432877|
|   mean|527.5289537712895|
| stddev|966.1072451772758|
|    min|              !!!|
|    max|ÃÂlafur Arnalds|
+-------+-----------------+

+-------+----------+
|summary|      auth|
+-------+----------+
|  count|    543705|
|   mean|      null|
| stddev|      null|
|    min| Cancelled|
|    max|Logged Out|
+-------+----------+

+-------+------+
|summary|gender|
+-------+------+
|  count|528005|
|   mean|  null|
| stddev|  null|
|    min|     F|
|    max|     M|
+-------+------+

+-------+------------------+
|summary|     itemInSession|
+-------+------------------+
|  count|            543705|
|   mean|107.30629109535502|
| stddev|116.72350849188155|
|    min|                 0|
|    max|              1005|
+-------+------------------+

+-------+------+
|summary| level|
+-------+------+
|  count|543705|
|   mean|  null|
| stddev|  null|
|    min|  free|
|    max|  paid|
+----

In [37]:
for column in df.dtypes:
    if column[1] == 'string':
        df.select(column[0]).drop_duplicates().show()

+--------------------+
|              artist|
+--------------------+
|      The Black Keys|
|        Yann Tiersen|
|    Jane's Addiction|
|          Tim Hughes|
|Dashboard Confess...|
|                Silk|
|Yonder Mountain S...|
|            La Shica|
|        Elvis Crespo|
|         Silverstein|
|         Eva Cassidy|
|        Generation X|
|     Robyn Hitchcock|
|           Kate Nash|
|       Jupiter Jones|
|           Los Lobos|
|               Rufio|
|     Drive Like Jehu|
|       Yuichi Tamate|
|      Jarabe De Palo|
+--------------------+
only showing top 20 rows

+----------+
|      auth|
+----------+
|Logged Out|
| Cancelled|
|     Guest|
| Logged In|
+----------+

+------+
|gender|
+------+
|     F|
|  null|
|     M|
+------+

+-----+
|level|
+-----+
| free|
| paid|
+-----+

+--------------------+
|            location|
+--------------------+
|     Gainesville, FL|
|Atlantic City-Ham...|
|        Richmond, VA|
|          Tucson, AZ|
|       Oskaloosa, IA|
|Deltona-Daytona B..

In [38]:
df_size = df.count()

for column in df.columns:
    missing_values = df_size - df.select(column).filter(df[column] != "").na.drop().count()
    print(f'{column}: {missing_values} missing values')

artist: 110828 missing values
auth: 0 missing values
gender: 15700 missing values
itemInSession: 543705 missing values
level: 0 missing values
location: 15700 missing values
page: 0 missing values
registration: 543705 missing values
sessionId: 543705 missing values
song: 110828 missing values
status: 543705 missing values
ts: 543705 missing values
userAgent: 15700 missing values
userId: 15700 missing values


In [39]:
df_size = df.filter(df.auth.isin(['Logged Out', 'Guest']) == False).count()

for column in df.columns:
    missing_values = ((df_size - df.filter(df.auth.isin(['Logged Out', 'Guest']) == False)\
                                    .select(column)\
                                    .filter(df[column].cast(StringType()) != "")\
                                    .na.drop()\
                                    .count()) 
                        / df_size) * 100
    print(f'{column}: {missing_values:.2f} % missing values')

artist: 18.02 % missing values
auth: 0.00 % missing values
gender: 0.00 % missing values
itemInSession: 0.00 % missing values
level: 0.00 % missing values
location: 0.00 % missing values
page: 0.00 % missing values
registration: 0.00 % missing values
sessionId: 0.00 % missing values
song: 18.02 % missing values
status: 0.00 % missing values
ts: 0.00 % missing values
userAgent: 0.00 % missing values
userId: 0.00 % missing values


All missing values for the `userId` column are created by users that are either `Logged Out`, or logged in as a `Guest`.

The empty values for `Artist`, `Song` and `Length` are most likely due not no song being played at the time of the event. 

It is also important to note, that the `userId` is sometimes an empty string, not NaN


# Clean data

As there is no value in looking at users that are not logged in for churn analysis - their entries will be disregarded. 
The empty `Artist`, `Song` and `Length` values will not be removed, as the rest of their rows provide valuable data.

In [40]:
df = df.filter(df.auth.isin(['Logged Out', 'Guest']) == False)

In [41]:
def detect_os(column):
    detect = re.findall(r'\((\w+)', column)[0]
    if detect in ['iPhone', 'iPad', 'Macintosh']:
        return 'Apple'
    elif detect in ['Windows', 'compatible']:
        return'Microsoft'
    elif detect in ['X11']:
        return'Linux'
    else:
        return 'Not Detected'

detect_os_udf = udf(detect_os)
df = df.withColumn('platform', detect_os_udf(df.userAgent)).withColumn('ageHours', (df.ts - df.registration)/3600000).drop('userAgent')

In [42]:
flag_cancellation_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df_churn = df.withColumn('ChurnFlag', flag_cancellation_event('page'))
windowed = Window.partitionBy('userId').orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding, 0)
df_churn = df_churn.withColumn('Churned', Fsum('ChurnFlag').over(windowed))
df_churn.filter(df_churn.Churned == 1).show()

+--------------------+---------+------+-------------+-----+--------------------+--------------------+-------------+---------+--------------------+------+-------------+------+--------+------------------+---------+-------+
|              artist|     auth|gender|itemInSession|level|            location|                page| registration|sessionId|                song|status|           ts|userId|platform|          ageHours|ChurnFlag|Churned|
+--------------------+---------+------+-------------+-----+--------------------+--------------------+-------------+---------+--------------------+------+-------------+------+--------+------------------+---------+-------+
|                null|Cancelled|     F|           67| free|Bridgeport-Stamfo...|Cancellation Conf...|1538016340000|      166|                null|   200|1539254318000|100010|   Apple| 343.8827777777778|        1|      1|
|                null|Logged In|     F|           66| free|Bridgeport-Stamfo...|              Cancel|1538016340000| 

In [43]:
user_level = df_churn.groupBy('userId')\
                .agg(
                    Fmax('ts').alias('latestSession'), 
                    Fmax(when(col('ChurnFlag') == 1, col('ts'))).alias('ChurnTime'))
df_churn = df_churn.join(user_level, on= 'userId', how='left')
df_churn = df_churn.withColumn(
    'lastMonth',
        when(col('churned') == 1, (col('ChurnTime') - col('ts')) /3600000 <= 720)
        .otherwise((col('latestSession') - col('ts')) /3600000 <= 720))

In [44]:
df_last_month = df_churn.filter(col('lastMonth') == True).groupBy()