In [1]:

import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3.eu-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-484d9a43-05cb-4321-94b1-11af412e073e',
    'iam_service_endpoint': 'https://iam.eu-gb.bluemix.net/oidc/token',
    'api_key': 'sLplkCVqbxNnm9CVPbfmHX5XntM-I672AXTIaN2DTg0w'
}

configuration_name = 'os_758721e72b18404c941eec0bda75f42b_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Sparkify').getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

df = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-mmgfjln3udz4qy'))
df.take(5)


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200424010717-0000
KERNEL_ID = 70043a78-429c-4226-a0a3-4ca6d41334e9


[Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293'),
 Row(artist="John Brown's Body", auth='Logged In', firstName='Sawyer', gender='M', itemInSession=74, lastName='Larson', length=380.21179, level='free', location='Houston-The Woodlands-Sugar Land, TX', method='PUT', page='NextSong', registration=1538069638000, sessionId=97, song='Bulls', status=200, ts=1538352025000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='98'),
 Row(artist='Afroman', auth='Logged In', firstName='Maverick', gender='M', 

In [2]:
from pyspark.sql.functions import avg, desc, min, max, split, udf
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, StandardScaler, StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re

In [3]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [4]:
df.head()

Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293')

In [5]:
df.select('userId').show()

+------+
|userId|
+------+
|   293|
|    98|
|   179|
|   179|
|   246|
|   163|
|      |
|      |
|      |
|      |
|   179|
|   175|
|   100|
|   100|
|   163|
|   246|
|   179|
|    39|
|   163|
|   179|
+------+
only showing top 20 rows



In [6]:
df = df.where("userId != '' and userId != ' '")

In [7]:
df = df.dropna(how='any', subset=['userId', 'sessionId'])

In [8]:
valid = df.dropDuplicates()

In [9]:
valid.groupby('userId').max().select('userId').sort('userId').show()

+------+
|userId|
+------+
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
|100018|
+------+
only showing top 20 rows



In [10]:
valid.select('page').dropDuplicates().sort('page').show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|              Logout|
|            NextSong|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
|      Submit Upgrade|
|         Thumbs Down|
|           Thumbs Up|
|             Upgrade|
+--------------------+



### Define Churn

Create a column `Churn` to use as the label for your model. I use the `Cancellation Confirmation` events to define my churn, which happen for both paid and free users.

In [11]:
valid.select('page').show()

+---------+
|     page|
+---------+
| NextSong|
| NextSong|
|Thumbs Up|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
| NextSong|
+---------+
only showing top 20 rows



In [12]:
churn = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())

In [13]:
valid = valid.withColumn('churn', churn(valid.page))

In [14]:
valid.where("churn == 1").count()

99

In [15]:
valid.groupby(["churn", 'gender']).count().show()

+-----+------+------+
|churn|gender| count|
+-----+------+------+
|    1|     F|    45|
|    0|     M|302558|
|    1|     M|    54|
|    0|     F|225348|
+-----+------+------+



In [16]:
gender_idx = StringIndexer(inputCol="gender", outputCol='gender_idx')
page_idx = StringIndexer(inputCol="page", outputCol='page_idx')
user_idx = StringIndexer(inputCol="userAgent", outputCol='user_idx')
gender_encoder = OneHotEncoder(inputCol='gender_idx', outputCol='gender_vec')
page_encoder = OneHotEncoder(inputCol='page_idx', outputCol='page_vec')
user_encoder = OneHotEncoder(inputCol='user_idx', outputCol='user_vec')
assembler = VectorAssembler(inputCols=["gender_vec", "page_vec", "status"], outputCol="features")
idx = StringIndexer(inputCol="churn", outputCol="label")

In [17]:
model =  LogisticRegression(maxIter=15)
pipeline = Pipeline(stages=[gender_idx, page_idx, user_idx, gender_encoder, page_encoder, user_encoder, assembler, idx, model])

In [18]:
rest, validation = valid.randomSplit([0.9, 0.1], seed=42)

In [19]:
param = ParamGridBuilder() \
    .addGrid(model.regParam,[0.0, 0.1, ]) \
    .addGrid(model.elasticNetParam,[0.0, 0.1, 0.5, 1.0]) \
    .build()


cross = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [20]:
fitted = cross.fit(rest)

In [21]:
results = fitted.transform(validation)

In [22]:
results.select("prediction").show()

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 20 rows



In [23]:
fitted.avgMetrics

[1.0,
 1.0,
 1.0,
 1.0,
 0.9997190150118527,
 0.9997190150118527,
 0.9997190150118527,
 0.9997190150118527]