In [43]:
import datetime

import numpy as np
import pandas as pd

import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql.functions import isnan, when, count, col, isnull, desc, asc, udf, max, sum, mean, countDistinct, datediff
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [44]:

import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-0d865840-f207-4d2d-8d5f-801326c3f157',
    'iam_service_endpoint': 'https://iam.bluemix.net/oidc/token',
    'api_key': 'LRE_HtNpx7OdDqgKCz3QdTzffBlKpxeL9VukOVqspiJF'
}

configuration_name = 'os_3489cab9ef414102879fb22c3f32bd60_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

data = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-ljrykmjvegypmg'))
data.show(5)


+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|              song|status|           ts|           userAgent|userId|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+------------------+------+-------------+--------------------+------+
|    Martin Orford|Logged In|   Joseph|     M|           20| Morales|597.55057| free|  Corpus Christi, TX|   PUT|NextSong|1532063507000|      292|     Grand Designs|   200|1538352011000|"Mozilla/5.0 (Mac...|   293|
|John Brown's Body|Logged In|   Sawyer|     M|           74|  Larson|380.21179| free|Houston-The Woodl...|   PUT|NextSong|1538069638000|    

In [45]:
print(data.count())
print(len(data.columns))
data.printSchema()

543705
18
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [46]:
data = data.filter(data['userId'] != '')

In [47]:
df = data.select(data.userId, data.page)
final_df = df.groupby('userId').pivot('page').count().fillna(0)

In [48]:
final_df = final_df.drop('Cancel').withColumnRenamed('Cancellation Confirmation', 'label')

In [49]:
# Add feature -  total length of all the songs played by each user.
total_songs_length = data.filter(data.page == 'NextSong').groupby('userId').agg(sum('length'))

final_df = final_df.join(total_songs_length, on = 'userId', how = 'left') \
                   .withColumnRenamed('NextSong', 'Total Songs Played') \
                   .withColumnRenamed('sum(length)', 'Total Songs Length')
                   

In [50]:
get_date = udf(lambda x: datetime.datetime.fromtimestamp(x/1000), DateType())

In [51]:
# Add feature : Average number of songs played per day by each user
temp = data.select(['userId', 'ts', 'page']).filter(data.page == 'NextSong').withColumn('date', get_date('ts'))
count_by_days = temp.groupby(['userId', 'date']).count()
avg_songs_per_day = count_by_days.groupby('userId').agg(mean('count')).withColumnRenamed('avg(count)', 'Average Songs Per Day')

final_df = final_df.join(avg_songs_per_day, on = 'userId', how = 'left')

In [52]:
# Add feature : Number of days between registration and last log entry
temp = data.select(['userId', 'ts', 'registration'])
registration_timestamp = temp.groupby('userId').min('registration')
min_timestmp = temp.select(['userId', 'ts']).groupby('userId').min('ts')
max_timestmp = temp.select(['userId', 'ts']).groupby('userId').max('ts')

days_registered = registration_timestamp.join(min_timestmp, on='userId').join(max_timestmp, on='userId')
days_registered = days_registered.withColumn('Registration Date', get_date('min(registration)')) \
                         .withColumn('Max Date', get_date('max(ts)')).withColumn('Min Date', get_date('min(ts)')) \
                         .withColumn('Days Registered',  datediff(col('Max Date'), col('Registration Date'))) \
                         .select(['userId', 'Days Registered'])

final_df = final_df.join(days_registered, on = 'userId', how = 'left')

In [53]:
# Add feature : Number of days each user has visited the site.
temp = data.select(['userId', 'ts', 'page']).withColumn('date', get_date('ts'))
count_by_days = temp.groupby(['userId', 'date']).count()
days_visited = count_by_days.groupby('userId').agg(count('date')).withColumnRenamed('count(date)', 'Days Visited')

final_df = final_df.join(days_visited, on = 'userId', how = 'left')

In [54]:
# Add feature : Most recent level of user
flag_paid_level = udf(lambda x: 1 if x == 'paid' else 0, IntegerType())

levels = data.select(['userId', 'ts', 'level']).orderBy(desc('ts')).dropDuplicates(['userId']).select(['userId', 'level'])
'''temp = data.select(['userId', 'ts', 'level'])
max_ts = temp.groupby(['userId']).max('ts').withColumnRenamed('max(ts)', 'ts')
levels = max_ts.join(temp, on = ['userId', 'ts'], how = 'left').select(['userId', 'level'])'''
levels = levels.withColumn('level', flag_paid_level('level'))

final_df = final_df.join(levels, on = 'userId', how = 'left')

In [55]:
# Add feature : gender
flag_male_gender = udf(lambda x: 1 if x == 'M' else 0, IntegerType())
levels = data.select(['userId', 'gender']).dropDuplicates(['userId']).withColumn('gender', flag_male_gender('gender'))

final_df = final_df.join(levels, on = 'userId', how = 'left')

In [56]:
# Add feature : userAgent

def extract_os_info(input):
    if 'Windows' in input:
        return 'Windows'
    elif 'Linux' in input:
        return 'Linux'
    elif 'Mac OS X' in input:
        return 'Mac OS X'

get_os = udf(extract_os_info, StringType())
temp_df = data.select('userId', 'userAgent').dropDuplicates(['userId']).withColumn('userAgent', get_os('userAgent'))

In [57]:
indexer = StringIndexer(inputCol="userAgent", outputCol="userAgentIndex")
temp_df = indexer.fit(temp_df).transform(temp_df)

encoder = OneHotEncoderEstimator(inputCols=['userAgentIndex'], outputCols=['userAgentVec'])
encoded = encoder.fit(temp_df).transform(temp_df).select(['userId', 'userAgentVec'])

final_df = final_df.join(encoded, on = 'userId', how = 'left')

In [58]:
def evaluate_prediction(predictions):
    '''Evaluates the input predictions. Prints accuracy and f1 score, and returns the confusion matrix.'''
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy: ", accuracy)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
    f1_score = evaluator.evaluate(predictions)
    print("F1 Score: ", f1_score)

    confusion_matrix = predictions.groupby("label").pivot("prediction").count().toPandas()
    return confusion_matrix

In [59]:
# Modeling
training_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)
#validation_df, test_df = rest.randomSplit([0.5,0.5], seed=42)

feature_names = final_df.drop('userId', 'label').schema.names

assembler = VectorAssembler(inputCols=feature_names, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
rf = RandomForestClassifier(featuresCol="scaledFeatures", labelCol="label", numTrees=50)

pipeline = Pipeline(stages=[assembler, scaler, rf])
model = pipeline.fit(training_df)
preds = model.transform(test_df)
evaluate_prediction(preds)

Accuracy:  0.9069767441860465
F1 Score:  0.8932850095640793


Unnamed: 0,label,0.0,1.0
0,0,73,1
1,1,7,5


In [60]:
paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [50, 100, 150]).addGrid(rf.maxDepth,[5, 10, 20]).build()

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, 
                          evaluator=MulticlassClassificationEvaluator(metricName="f1"), numFolds=3)
cv_model = crossval.fit(training_df)
predictions = cv_model.transform(test_df)
evaluate_prediction(predictions)

Accuracy:  0.8953488372093024
F1 Score:  0.8832889336885303


Unnamed: 0,label,0.0,1.0
0,0,72,2
1,1,7,5
