In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.window import Window as w
from pyspark.ml.classification import LogisticRegression, LinearSVC, GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, TrainValidationSplit, ParamGridBuilder

import datetime
import numpy as np
import pandas as pd
from operator import itemgetter
from IPython import get_ipython

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1554594527956_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
start = datetime.datetime.now()
def time_elapsed(start=start):
    '''Prints amount of time elapsed since established start time
    start is a pre-established start time'''
    
    now = datetime.datetime.now()
    elapsed = now - start
    print(elapsed)
    

VBox()

In [3]:
time_elapsed()

VBox()

0:01:07.882746

In [4]:
print(datetime.datetime.now())

VBox()

2019-04-06 23:57:26.837978

In [5]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)

VBox()

In [6]:
### WHEN USING FRACTION OF DATA ###
# the_full = df #SAVES FULL DATA SET
# df = the_full.sample(fraction=0.05, seed=42)
# df = df.repartition(96)
## df = df.coalesce(72)


VBox()

In [7]:
df.rdd.getNumPartitions()

VBox()

96

In [8]:
#to revert back to full dataset
# df = the_full 

VBox()

In [9]:
print(datetime.datetime.now())

VBox()

2019-04-07 00:00:00.184942

In [10]:
df = df.dropna(thresh=15)
df.persist()
df.count()

VBox()

25480720

In [11]:
df = df.withColumn('datetime',f.from_unixtime(df.ts/1000))

VBox()

In [12]:
churns = list(df.where(df.page=='Cancellation Confirmation').select('userId').toPandas()['userId'])
churns = set([int(x) for x in churns])
flag_churn = f.udf(lambda x: 1 if int(x) in churns else 0, IntegerType())

VBox()

In [13]:
df = df.withColumn('churns', flag_churn(df.userId))

VBox()

In [14]:
num_churn = df.where(df.churns==1).select('userId').distinct().count()

num_stay = df.where(df.churns==0).select('userId').distinct().count()

total = df.select('userId').distinct().count()

VBox()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 44642)
----------------------------------------
Traceback (most recent call last):
  File "/usr/lib64/python2.7/SocketServer.py", line 290, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 318, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 331, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python2.7/SocketServer.py", line 652, in __init__
    self.handle()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 266, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 254, in authe

In [15]:
time_elapsed()

VBox()

0:07:26.414125

In [16]:
df = df.withColumn('CancelTime',f.when(df['page'] == 'Cancellation Confirmation', df['ts']).otherwise(None))

VBox()

In [17]:
df.persist()
df.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042', datetime=u'2018-10-01 00:00:01', churns=1, CancelTime=None)

In [18]:
#Create window functions for time-series analysis
all_time = w.partitionBy('userId')\
                .orderBy('ts')\
                .rangeBetween(w.unboundedPreceding,w.unboundedFollowing)
to_now = w.partitionBy('userId')\
                .orderBy('ts')\
                .rangeBetween(w.unboundedPreceding,0)
to_now_all = w.orderBy('ts')\
                .rangeBetween(w.unboundedPreceding,0)

VBox()

In [19]:
# define the forward-filled column
filled_column = f.last(df['CancelTime']).over(all_time)

# do the fill
df = df.withColumn('Cancel_Time', filled_column)

VBox()

In [20]:
df = df.drop('CancelTime')
df = df.withColumn('Until_Cancel',df.Cancel_Time - df.ts)

VBox()

In [21]:
#Create binary columns for various downgrade windows (48 hours, 1 week, 2 weeks)
df = df.withColumn('CancelDate', f.from_unixtime(df.Cancel_Time/1000))
check_48 = f.udf(lambda x: 1 if x != None and x <= 172800000 else 0,IntegerType())
check_week = f.udf(lambda x: 1 if x != None and x <= 604800000 else 0,IntegerType())
check_2_weeks = f.udf(lambda x: 1 if x != None and x <= 1209600000 else 0,IntegerType())
df = df.withColumn('CancelIn48',check_48(df.Until_Cancel))
df = df.withColumn('CancelInWeek',check_week(df.Until_Cancel))
df = df.withColumn('CancelIn2Weeks', check_2_weeks(df.Until_Cancel))

VBox()

In [22]:
#Create window functions for time-series analysis
window_48 = w.partitionBy('userId')\
            .orderBy('ts')\
            .rangeBetween(-172800000,0)
window_1_week = w.partitionBy('userId')\
            .orderBy('ts')\
            .rangeBetween(-604800000,0)
window_2_weeks = w.partitionBy('userId')\
            .orderBy('ts')\
            .rangeBetween(-1209600000,0)

VBox()

In [23]:
# add columns for number of actions taken by user over various timeframes

df = df.withColumn('actions_48',f.count(df.page).over(window_48))\
    .withColumn('actions_week', f.count(df.page).over(window_1_week))\
    .withColumn('actions_2weeks', f.count(df.page).over(window_2_weeks))

VBox()

In [24]:
advert = f.udf(lambda x: 1 if x =='Roll Advert' else 0, IntegerType())
df = df.withColumn('ad_here',advert(df.page))

VBox()

In [25]:
df.persist()
df.head()

VBox()

Row(artist=u'Coldplay', auth=u'Logged In', firstName=u'Frank', gender=u'M', itemInSession=0, lastName=u'Warren', length=307.51302, level=u'free', location=u'Findlay, OH', method=u'PUT', page=u'NextSong', registration=1535470939000, sessionId=1623, song=u'Clocks', status=200, ts=1538428238000, userAgent=u'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0', userId=u'1000280', datetime=u'2018-10-01 21:10:38', churns=1, Cancel_Time=1542149985000, Until_Cancel=3721747000, CancelDate=u'2018-11-13 22:59:45', CancelIn48=0, CancelInWeek=0, CancelIn2Weeks=0, actions_48=1, actions_week=1, actions_2weeks=1, ad_here=0)

In [None]:
#function for testing features if other features created
def test_feat(feat):
    print(df.groupBy('CancelIn2Weeks').avg(feat).collect())

In [26]:
friend = f.udf(lambda x: 1 if x =='Add Friend' else 0, IntegerType())
df = df.withColumn('friend_here',friend(df.page))
total_friends = f.sum(df['friend_here']).over(to_now)
df = df.withColumn('friends',total_friends)

VBox()

In [27]:
# add columns for number of advertisements over various timeframes

df = df.withColumn('ads_2_weeks',f.sum(df.ad_here).over(window_2_weeks))\
    .withColumn('ads_1_week',f.sum(df.ad_here).over(window_1_week))\
    .withColumn('ads_48',f.sum(df.ad_here).over(window_48))

VBox()

In [28]:
#average session item per userId over various time windows
df = df.withColumn('sessionItems_48',f.mean(df.itemInSession).over(window_48))\
        .withColumn('sessionItems_week',f.mean(df.itemInSession).over(window_1_week))\
        .withColumn('sessionItems_2_weeks',f.mean(df.itemInSession).over(window_2_weeks))

VBox()

In [29]:
#add columns for ads per action over various time frames
df = df.withColumn('ads_per_action_48',f.col('ads_48')/f.col('actions_48'))\
        .withColumn('ads_per_action_week',f.col('ads_1_week')/f.col('actions_week'))\
        .withColumn('ads_per_action_2_weeks',f.col('ads_2_weeks')/f.col('actions_2weeks'))

VBox()

In [30]:
#add various columns related to session numbers and items in sessions
df = df.withColumn('sessions',f.size(f.collect_set('sessionId').over(to_now)))\
        .withColumn('avgSessionItems', f.mean('itemInSession').over(to_now))\
        .withColumn('maxItems', f.max('itemInSession').over(to_now))

VBox()

In [31]:
#features for method ratios
check_put = f.udf(lambda x: 1 if x == "PUT" else 0,IntegerType())
df = df.withColumn('putHere',check_put(df.method))
check_get = f.udf(lambda x: 1 if x == "GET" else 0,IntegerType())
df = df.withColumn('getHere',check_get(df.method))

df = df.withColumn('get_48',f.sum(df.getHere).over(window_48))\
        .withColumn('get_week',f.sum(df.getHere).over(window_1_week))\
        .withColumn('get_2_weeks',f.sum(df.getHere).over(window_2_weeks))

df = df.withColumn('put_48',f.sum(df.putHere).over(window_48))\
        .withColumn('put_week',f.sum(df.putHere).over(window_1_week))\
        .withColumn('put_2_weeks',f.sum(df.putHere).over(window_2_weeks))

df = df.withColumn('get_put_48',df.get_48/df.put_48)\
        .withColumn('get_put_week',df.get_week/df.put_week)\
        .withColumn('get_put_2_weeks',df.get_2_weeks/df.put_2_weeks)

VBox()

In [33]:
df.persist()
df.head()

VBox()

Row(artist=u'Coldplay', auth=u'Logged In', firstName=u'Frank', gender=u'M', itemInSession=0, lastName=u'Warren', length=307.51302, level=u'free', location=u'Findlay, OH', method=u'PUT', page=u'NextSong', registration=1535470939000, sessionId=1623, song=u'Clocks', status=200, ts=1538428238000, userAgent=u'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0', userId=u'1000280', datetime=u'2018-10-01 21:10:38', churns=1, Cancel_Time=1542149985000, Until_Cancel=3721747000, CancelDate=u'2018-11-13 22:59:45', CancelIn48=0, CancelInWeek=0, CancelIn2Weeks=0, actions_48=1, actions_week=1, actions_2weeks=1, ad_here=0, friend_here=0, friends=0, ads_2_weeks=0, ads_1_week=0, ads_48=0, sessionItems_48=0.0, sessionItems_week=0.0, sessionItems_2_weeks=0.0, ads_per_action_48=0.0, ads_per_action_week=0.0, ads_per_action_2_weeks=0.0, sessions=1, avgSessionItems=0.0, maxItems=0, putHere=1, getHere=0, get_48=0, get_week=0, get_2_weeks=0, put_48=1, put_week=1, put_2_weeks=1, get_put_48=0.0, ge

In [34]:
print(datetime.datetime.now())

VBox()

2019-04-07 00:33:39.102669

In [35]:
# Find proportions between churn and non-churn for features

# feats = ['itemInSession','length', 'page', 'CancelIn48','CancelIn2Weeks','CancelInWeek', 'actions_48', 'actions_week', 
#            'actions_2weeks', 'friends', 'ads_2_weeks', 'ads_1_week', 'ads_48', 'sessionItems_48', 'sessionItems_week',
#            'sessionItems_2_weeks', 'ads_per_action_48', 'ads_per_action_week', 'ads_per_action_2_weeks', 'sessions', 
#            'sessions_48', 'sessions_2weeks', 'avgSessionItems', 'maxItems', 'get_48', 'get_week', 'get_2_weeks',
#            'put_48', 'put_week', 'put_2_weeks', 'get_put_48', 'get_put_week', 'get_put_2_weeks']

# thankful for this help on sorting nested lists with built-in modules rather than building it out myself
# https://stackoverflow.com/a/409423/9554411
# feat_list = []
# for col in feats:
#     try:
#         a = df.groupBy('CancelIn2Weeks').avg(col).collect()[0][1]
#         b = df.groupBy('CancelIn2Weeks').avg(col).collect()[1][1]
#         rel = abs(1-(a/b))
#         relation = [col,rel]
#         feat_list.append(relation)
#     except:
#         pass
    
# feat_list.sort(key=itemgetter(1), reverse = True)
# for item in feat_list:
#     print(item)

VBox()

List of absolute value of proportions for 2 weeks descending

['ads_1_week', 0.7311668955971533]

['ads_2_weeks', 0.7152864112346531]

['ads_48', 0.598072267687942]

['get_2_weeks', 0.36486674864136925]

['get_week', 0.3592168923547885]

['actions_2weeks', 0.28626801305861505]

['put_2_weeks', 0.28064126132314327]

['actions_week', 0.2806124076262986]

['put_week', 0.2750849903368342]

['get_48', 0.25929123735696646]

['ads_per_action_2_weeks', 0.2214579364834579]

['ads_per_action_week', 0.2139230107515362]

['ads_per_action_48', 0.19491715886089223]

['actions_48', 0.1870587865697524]

['put_48', 0.1820652161095082]

['friends', 0.16497880171603985]

['sessions', 0.16286493493166065]

['sessionId', 0.14768153795457617]

['sessionItems_48', 0.10882520362069004]

['sessionItems_week', 0.08192850886961645]

['maxItems', 0.07112715174466788]

['sessionItems_2_weeks', 0.06585026521652226]

['get_put_2_weeks', 0.05355915755875862]

['itemInSession', 0.043110954759168374]

['get_put_week', 0.04131305413071429]

['avgSessionItems', 0.03751318044368879]

['get_put_48', 0.0271334247269448]

['length', 0.0005621037099000636]


In [36]:
# Final features determined after extensive testing
final_feats = ['ads_1_week', 'ads_2_weeks', 'ads_48', 'get_2_weeks', 'get_week', 'actions_2weeks',
               'put_2_weeks', 'actions_week', 'put_week', 'get_48', 'ads_per_action_2_weeks', 
               'ads_per_action_week', 'ads_per_action_48', 'actions_48', 'put_48', 'friends', 'sessions', 
               'sessionItems_48', 'sessionItems_week', 'maxItems', 'sessionItems_2_weeks', 'get_put_2_weeks']
print(final_feats)

VBox()

['ads_1_week', 'ads_2_weeks', 'ads_48', 'get_2_weeks', 'get_week', 'actions_2weeks', 'put_2_weeks', 'actions_week', 'put_week', 'get_48', 'ads_per_action_2_weeks', 'ads_per_action_week', 'ads_per_action_48', 'actions_48', 'put_48', 'friends', 'sessions', 'sessionItems_48', 'sessionItems_week', 'maxItems', 'sessionItems_2_weeks', 'get_put_2_weeks']

In [37]:
feat_cols = final_feats
feat_descrip = 'top 22, churn in 2 week'
print(feat_cols)

VBox()

['ads_1_week', 'ads_2_weeks', 'ads_48', 'get_2_weeks', 'get_week', 'actions_2weeks', 'put_2_weeks', 'actions_week', 'put_week', 'get_48', 'ads_per_action_2_weeks', 'ads_per_action_week', 'ads_per_action_48', 'actions_48', 'put_48', 'friends', 'sessions', 'sessionItems_48', 'sessionItems_week', 'maxItems', 'sessionItems_2_weeks', 'get_put_2_weeks']

In [38]:
df = df.fillna(-1)

VBox()

In [39]:
print(datetime.datetime.now())

VBox()

2019-04-07 00:33:40.113292

In [40]:
# If training different model, drop vectorized feature column
# df = df.drop('DescVec')

VBox()

In [41]:
assembler = VectorAssembler(inputCols=feat_cols, outputCol='DescVec')
df = assembler.transform(df)

VBox()

In [42]:
df.persist()
df.head()

VBox()

Row(artist=u'Coldplay', auth=u'Logged In', firstName=u'Frank', gender=u'M', itemInSession=0, lastName=u'Warren', length=307.51302, level=u'free', location=u'Findlay, OH', method=u'PUT', page=u'NextSong', registration=1535470939000, sessionId=1623, song=u'Clocks', status=200, ts=1538428238000, userAgent=u'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0', userId=u'1000280', datetime=u'2018-10-01 21:10:38', churns=1, Cancel_Time=1542149985000, Until_Cancel=3721747000, CancelDate=u'2018-11-13 22:59:45', CancelIn48=0, CancelInWeek=0, CancelIn2Weeks=0, actions_48=1, actions_week=1, actions_2weeks=1, ad_here=0, friend_here=0, friends=0, ads_2_weeks=0, ads_1_week=0, ads_48=0, sessionItems_48=0.0, sessionItems_week=0.0, sessionItems_2_weeks=0.0, ads_per_action_48=0.0, ads_per_action_week=0.0, ads_per_action_2_weeks=0.0, sessions=1, avgSessionItems=0.0, maxItems=0, putHere=1, getHere=0, get_48=0, get_week=0, get_2_weeks=0, put_48=1, put_week=1, put_2_weeks=1, get_put_48=0.0, ge

In [43]:
data = df.select(f.col("CancelIn2Weeks").alias("label"), f.col('DescVec').alias("features"))

VBox()

In [44]:
# unpersist to save space after selecting data
df.unpersist()

VBox()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, datetime: string, churns: int, Cancel_Time: bigint, Until_Cancel: bigint, CancelDate: string, CancelIn48: int, CancelInWeek: int, CancelIn2Weeks: int, actions_48: bigint, actions_week: bigint, actions_2weeks: bigint, ad_here: int, friend_here: int, friends: bigint, ads_2_weeks: bigint, ads_1_week: bigint, ads_48: bigint, sessionItems_48: double, sessionItems_week: double, sessionItems_2_weeks: double, ads_per_action_48: double, ads_per_action_week: double, ads_per_action_2_weeks: double, sessions: int, avgSessionItems: double, maxItems: bigint, putHere: int, getHere: int, get_48: bigint, get_week: bigint, get_2_weeks: bigint, put_48: bigint, put_week: bigint, put_2_weeks: bigint

In [45]:
data.persist()
data.head()

VBox()

Row(label=0, features=SparseVector(22, {5: 1.0, 6: 1.0, 7: 1.0, 8: 1.0, 13: 1.0, 14: 1.0, 16: 1.0}))

In [46]:
train_data, test_data = data.randomSplit([0.75,0.25], seed=42)

VBox()

In [47]:
time_elapsed()

VBox()

0:37:29.604157

In [48]:
#GBT classifier determined as final. Previous classifiers left for reference
gbt = GBTClassifier(maxDepth = 10, stepSize = 0.3)
# svc = LinearSVC()
# lr = LogisticRegression()
# rf = RandomForestClassifier(maxDepth=25, seed=42)

VBox()

In [49]:
# Param Grid for hyperparameter tuning and testing
# grid = ParamGridBuilder()\
#     .addGrid(gbt.maxDepth,[10])\
#     .addGrid(gbt.stepSize,[0.3,0.4])\
#     .build()
# evaluator = MulticlassClassificationEvaluator(metricName='weightedRecall')

VBox()

In [50]:
prev = datetime.datetime.now()
print(prev)

VBox()

2019-04-07 00:33:48.325917

In [59]:
model_start = datetime.datetime.now()

VBox()

In [51]:
# tv = TrainValidationSplit(estimator=gbt, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2, seed=42)
# tv_model = tv.fit(train_data)
# results = tv_model.transform(test_data)

VBox()

In [52]:
gbt_model = gbt.fit(train_data)
results = gbt_model.transform(test_data)

VBox()

In [53]:
# svc_model = svc.fit(train_data)
# results = svc_model.transform(test_data)

VBox()

In [54]:
# lr_model = lr.fit(train_data)
# results = lr_model.transform(test_data)

VBox()

In [55]:
# rf_model = rf.fit(train_data)
# results = rf_model.transform(test_data)

VBox()

In [56]:
post = datetime.datetime.now()
print(post)

VBox()

2019-04-07 01:11:51.903757

In [57]:
results.persist()
results.head()

VBox()

Row(label=0, features=SparseVector(22, {1: 4.0, 3: 11.0, 4: 1.0, 5: 65.0, 6: 54.0, 7: 1.0, 9: 1.0, 10: 0.0615, 13: 1.0, 16: 3.0, 19: 61.0, 20: 26.9846, 21: 0.2037}), rawPrediction=DenseVector([1.4294, -1.4294]), probability=DenseVector([0.9458, 0.0542]), prediction=0.0)

In [58]:
num_found = results.filter((results.prediction==1) & (results.label == 1)).count()
false_pos = results.filter((results.prediction==1) & (results.label == 0)).count()
print(num_found)
print(false_pos)
print(results.filter(results.label == 1).count())
print(datetime.datetime.now())

VBox()

136267
6555
679089
2019-04-07 01:24:04.793194

In [59]:
# For model tuning result examination
# best = tv_model.bestModel
# cv_model.avgMetrics
# best.extractParamMap()

VBox()

In [60]:
print(post-prev)
print(num_found)

VBox()

0:38:03.577840
136267

Various testing trials and results

total anomolies: 201970

MaxDepth = 5, stepSize = 0.3, *Results = 229*, time_elapsed (after model start) 34 min.

MaxDepth = 8, stepSize = 0.3, *Results = 1753*, time_elapsed (after model start) 13 min.

MaxDepth = 10, stepSize = 0.3, *Results = 4694*, time_elapsed (after model start) 18 min.

**More features**

MaxDepth = 10, stepSize = 0.3

**function**
11836, < .95, >1.05

20669, time_elapsed (after model start) 28 min., '<.97 >1.03'

22551 , time_elapsed (after model start) 'w/ put/get, .97, 1.03'

131768 / 679089 (19.4%)2 week cancelations, 

5812619 /6366782 (91.3%, with the vast majority of missclassifications coming from churning customers)

two weeks, top 12, 25 min. model: 59368 / 679089 (8%)

two weeks, top 22, 36 min. model: 136272 / 679089 (20%)

In [61]:
correct_predictions = results.filter(results.label==results.prediction).count()
total_rows = results.count()
print(correct_predictions, total_rows)

VBox()

(5817405, 6366782)

In [62]:
print(datetime.datetime.now())

VBox()

2019-04-07 01:24:16.161052

In [63]:
time_elapsed()

VBox()

1:27:57.883359