# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [161]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
from pyspark.sql.functions import udf, from_unixtime, trunc
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql import types as t

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [162]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [163]:
# FOR RUNNING ON A EC2 CLUSTER 
event_data_s3n_small = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
event_data_s3n_full = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
#event_data_local = "mini_sparkify_event_data.json"

df = spark.read.json(event_data_s3n_small)
df.head(2)

# User very small subset of data for trying out functions
#df = df.limit(1000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'), Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

In [164]:
# Create view to query with conventional SQL
df.createOrReplaceTempView("df")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [165]:
df.where(df.userId == 30).select("artist").head(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist='Martha Tilston'), Row(artist='Adam Lambert'), Row(artist='Daft Punk'), Row(artist='Starflyer 59'), Row(artist=None)]

In [166]:
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId']

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [167]:
#Show how many null values each column has
for column in df.columns:
    print("Column " + column + " has " + str(len(df.where(df[column] == "").collect())) + " empty values.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

invalid syntax (<stdin>, line 1)
  File "<stdin>", line 1
    Show how many null values each column has
           ^
SyntaxError: invalid syntax



In [168]:
print("Remove rows with empty userIds.")
print("Processing.")
df = df.where(df.userId != "")
print("Done.")
print("Check how many empty userIds are left: " + \
      str(len(df.where(df.userId == "").collect())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Remove rows with empty userIds.
Processing.
Done.
Check how many empty userIds are left: 0

In [169]:
print("Number of events after deduplication:")
len(df.collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of events after deduplication:
278154

In [170]:
df.select("page").dropDuplicates().orderBy(df.page.asc()).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|              Logout|
|            NextSong|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
|      Submit Upgrade|
|         Thumbs Down|
|           Thumbs Up|
|             Upgrade|
+--------------------+

In [171]:
df.select("userId").dropDuplicates().orderBy(df.userId.asc()).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+
|userId|
+------+
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
+------+
only showing top 10 rows

In [172]:
print('Number of distinct users:')
len(df.dropDuplicates(['userId']).collect())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of distinct users:
225

In [173]:
print("Example event flow for one particular user:")
df.where(df.userId == 100001).select("page", "ts").orderBy("ts").show(1000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Example event flow for one particular user:
+--------------------+-------------+
|                page|           ts|
+--------------------+-------------+
|                Home|1538376504000|
|            NextSong|1538376509000|
|         Roll Advert|1538376542000|
|            NextSong|1538376747000|
|         Roll Advert|1538376783000|
|            NextSong|1538377349000|
|            NextSong|1538377748000|
|            NextSong|1538377932000|
|            NextSong|1538378245000|
|            NextSong|1538378483000|
|            NextSong|1538378687000|
|            NextSong|1538378877000|
|            NextSong|1538379041000|
|            NextSong|1538379207000|
|         Roll Advert|1538379230000|
|            NextSong|1538379420000|
|            NextSong|1538379668000|
|            NextSong|1538380000000|
|            NextSong|1538380179000|
|              Logout|1538380180000|
|                Home|1538380429000|
|            NextSong|1538380481000|
|            NextSong|153838082

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

#### Get users that churned

In [174]:
# Get users that churned
churned_users = df.where(df.page == "Cancellation Confirmation").select("userId").dropDuplicates()

print("Example churned users")
print(churned_users.head(5))

print("\nNumber of churned users:")
print(len(churned_users.collect()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Example churned users
[Row(userId='125'), Row(userId='51'), Row(userId='54'), Row(userId='100014'), Row(userId='101')]

Number of churned users:
52

#### Add column to df indicating a churned user

In [175]:
churn = udf(lambda x: 1, IntegerType())
churned_users = churned_users.withColumn("churned", churn(churned_users.userId))
print(churned_users.head(3))

df = df.join(churned_users, on=['userId'], how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='125', churned=1), Row(userId='51', churned=1), Row(userId='54', churned=1)]

#### Aggregate on user level

In [176]:
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['userId', 'artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'churned']

In [177]:
# Create user aggregation and add churn information
df_user_agg = df.select(["userId", "gender", "churned"]).groupBy(["userId", "gender"]).max()
df_user_agg = df_user_agg.na.fill(0)
df_user_agg = df_user_agg.withColumnRenamed("max(churned)", "churned")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [178]:
df_user_agg.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+-------+
|userId|gender|churned|
+------+------+-------+
|100010|     F|      0|
|200002|     M|      0|
|   125|     M|      1|
|   124|     F|      0|
|    51|     M|      1|
+------+------+-------+
only showing top 5 rows

In [179]:
# Calculate songs per user
df_songs_per_user = df.where(df.page == "NextSong").select(["userId", "page"]).groupBy(["userId"]).count()
df_songs_per_user = df_songs_per_user.withColumnRenamed("count", "songs_listened")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [180]:
# Calculate ads per user
df_ads_per_user = df.where(df.page == "Roll Advert").select(["userId", "page"]).groupBy(["userId"]).count()
df_ads_per_user = df_ads_per_user.withColumnRenamed("count", "adverts_rolled")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [181]:
# Calculate friends added per user
df_friends_added_per_user = df.where(df.page == "Add Friend").select(["userId", "page"]).groupBy(["userId"]).count()
df_friends_added_per_user = df_friends_added_per_user.withColumnRenamed("count", "friends_added")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [182]:
# Calculate upgrades to premium per user
df_upgrades_per_user = df.where(df.page == "Submit Upgrade").select(["userId", "page"]).groupBy(["userId"]).count()
df_upgrades_per_user = df_upgrades_per_user.withColumnRenamed("count", "times_upgraded")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [183]:
# Calculate downgrades for free per user
df_downgrades_per_user = df.where(df.page == "Submit Downgrade").select(["userId", "page"]).groupBy(["userId"]).count()
df_downgrades_per_user = df_downgrades_per_user.withColumnRenamed("count", "times_downgraded")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [184]:
# Calculate playlist additions per user
df_playlist_adds_per_user = df.where(df.page == "Add to Playlist").select(["userId", "page"]).groupBy(["userId"]).count()
df_playlist_adds_per_user = df_playlist_adds_per_user.withColumnRenamed("count", "playlist_additions")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [185]:
# Calculate errors per user
df_errors_per_user = df.where(df.page == "Error").select(["userId", "page"]).groupBy(["userId"]).count()
df_errors_per_user = df_errors_per_user.withColumnRenamed("count", "errors")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [186]:
# Calculate help access per user
df_help_per_user = df.where(df.page == "Help").select(["userId", "page"]).groupBy(["userId"]).count()
df_help_per_user = df_help_per_user.withColumnRenamed("count", "help_access_count")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [187]:
# Calculate logouts per user
df_logouts_per_user = df.where(df.page == "Logout").select(["userId", "page"]).groupBy(["userId"]).count()
df_logouts_per_user = df_logouts_per_user.withColumnRenamed("count", "logouts")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [188]:
# Calculate thumbs up per user
df_thumbs_up_per_user = df.where(df.page == "Thumbs Up").select(["userId", "page"]).groupBy(["userId"]).count()
df_thumbs_up_per_user = df_thumbs_up_per_user.withColumnRenamed("count", "thumbs_up_given")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [189]:
# Calculate thumbs down per user
df_thumbs_down_per_user = df.where(df.page == "Thumbs Down").select(["userId", "page"]).groupBy(["userId"]).count()
df_thumbs_down_per_user = df_thumbs_down_per_user.withColumnRenamed("count", "thumbs_down_given")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [190]:
# Calculate time registered ("user age")
df_age_per_user = df.groupBy("userId").agg(f.max('ts').alias("last_login"), \
    f.min('registration').alias("registration"))

df_age_per_user = df_age_per_user.withColumn("user_age", df_age_per_user.last_login - df_age_per_user.registration)

df_age_per_user.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------------+-------------+----------+
|userId|   last_login| registration|  user_age|
+------+-------------+-------------+----------+
|100010|1542823952000|1538016340000|4807612000|
+------+-------------+-------------+----------+
only showing top 1 row

In [191]:
# Join aggregates together to one table
df_user_agg = df_user_agg.join(df_songs_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_ads_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_friends_added_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_playlist_adds_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_errors_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_help_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_logouts_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_thumbs_up_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_thumbs_down_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_upgrades_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_downgrades_per_user, on=['userId'], how='left')
df_user_agg = df_user_agg.join(df_age_per_user, on=['userId'], how='left')

# Fill null values created after join with 0, as they represent missing counts
df_user_agg = df_user_agg.na.fill(0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [192]:
df_user_agg.show(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+-------+--------------+--------------+-------------+------------------+------+-----------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------+----------+
|userId|gender|churned|songs_listened|adverts_rolled|friends_added|playlist_additions|errors|help_access_count|logouts|thumbs_up_given|thumbs_down_given|times_upgraded|times_downgraded|   last_login| registration|  user_age|
+------+------+-------+--------------+--------------+-------------+------------------+------+-----------------+-------+---------------+-----------------+--------------+----------------+-------------+-------------+----------+
|100010|     F|      0|           275|            52|            4|                 7|     0|                2|      5|             17|                5|             0|               0|1542823952000|1538016340000|4807612000|
+------+------+-------+--------------+--------------+-------------+------------------+------+-------

#### Observe different aggregates

In [193]:
df_user_agg.groupBy("churned").agg(f.mean('songs_listened'), \
    f.mean('adverts_rolled'), \
    f.mean('friends_added'), \
    f.mean('playlist_additions'), \
    f.mean('errors')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+-------------------+------------------+-----------------------+------------------+
|churned|avg(songs_listened)|avg(adverts_rolled)|avg(friends_added)|avg(playlist_additions)|       avg(errors)|
+-------+-------------------+-------------------+------------------+-----------------------+------------------+
|      1|  699.8846153846154| 18.596153846153847| 12.23076923076923|      19.96153846153846|0.6153846153846154|
|      0| 1108.1734104046243|  17.14450867052023|21.046242774566473|     31.722543352601157|1.2716763005780347|
+-------+-------------------+-------------------+------------------+-----------------------+------------------+

In [194]:
df_user_agg.groupBy("churned").agg(f.mean('thumbs_up_given'), \
    f.mean('thumbs_down_given'), \
    f.mean('times_upgraded'), \
    f.mean('times_downgraded'), \
    f.mean('logouts'), \
    f.mean('help_access_count')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+----------------------+-------------------+---------------------+------------------+----------------------+
|churned|avg(thumbs_up_given)|avg(thumbs_down_given)|avg(times_upgraded)|avg(times_downgraded)|      avg(logouts)|avg(help_access_count)|
+-------+--------------------+----------------------+-------------------+---------------------+------------------+----------------------+
|      1|               35.75|     9.538461538461538| 0.6153846153846154|  0.17307692307692307|10.634615384615385|     4.596153846153846|
|      0|   61.80346820809248|     11.84971098265896| 0.7341040462427746|  0.31213872832369943| 15.45086705202312|     7.023121387283237|
+-------+--------------------+----------------------+-------------------+---------------------+------------------+----------------------+

In [195]:
# Convert from unix time to normal time to see results in human readable way
df_user_agg.groupBy("churned").agg(\
    from_unixtime(f.mean('last_login') / 1000 ,"yyyy-MM-dd HH:mm:ss:SSS").alias("avg_last_login"),
    from_unixtime(f.mean('registration') / 1000 ,"yyyy-MM-dd HH:mm:ss:SSS").alias("avg_registration"), \
    (f.mean('user_age') / 1000/3600/24).alias("avg_user_age_days")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+--------------------+------------------+
|churned|      avg_last_login|    avg_registration| avg_user_age_days|
+-------+--------------------+--------------------+------------------+
|      1|2018-10-27 12:03:...|2018-08-31 04:42:...|57.305992922008535|
|      0|2018-11-24 11:26:...|2018-08-29 20:32:...| 86.62061938021837|
+-------+--------------------+--------------------+------------------+

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

#### One Hot Encoding the Gender Column

In [196]:
# Use OneHotEncoding to transform gender into one hot vector 
stringIndexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
model = stringIndexer.fit(df_user_agg)
indexed = model.transform(df_user_agg)
encoder = OneHotEncoder(inputCol="genderIndex", outputCol="genderIndexVec")
df_features = encoder.transform(indexed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [197]:
# https://stackoverflow.com/questions/49632830/pyspark-output-of-onehotencoder-looks-odd
df_features.select("gender", "genderIndex", "genderIndexVec").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-----------+--------------+
|gender|genderIndex|genderIndexVec|
+------+-----------+--------------+
|     F|        1.0|     (1,[],[])|
|     M|        0.0| (1,[0],[1.0])|
|     M|        0.0| (1,[0],[1.0])|
|     F|        1.0|     (1,[],[])|
|     M|        0.0| (1,[0],[1.0])|
+------+-----------+--------------+
only showing top 5 rows

#### Feature Vector Assembler

In [198]:
# Assemble columns into feature vector
feature_list = ["genderIndex", "songs_listened", "adverts_rolled", "friends_added", "playlist_additions",\
               "errors", "help_access_count", "logouts", "thumbs_up_given", "thumbs_down_given",\
              "times_upgraded", "times_downgraded", "user_age"]

assembler = VectorAssembler(
    inputCols=feature_list,
    outputCol="features_raw")

output = assembler.transform(df_features)

feature_str = ""
for feature in feature_list:
    feature_str = feature_str + feature + ", "
    
print("Assembled columns " + feature_str + " to vector column 'features'")

# Rename churned column to label
output = output.withColumnRenamed("churned", "label")
df_features = output.select("features_raw", "label")
df_features.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Assembled columns genderIndex, songs_listened, adverts_rolled, friends_added, playlist_additions, errors, help_access_count, logouts, thumbs_up_given, thumbs_down_given, times_upgraded, times_downgraded, user_age,  to vector column 'features'
+-------------------------------------------------------------------------+-----+
|features_raw                                                             |label|
+-------------------------------------------------------------------------+-----+
|[1.0,275.0,52.0,4.0,7.0,0.0,2.0,5.0,17.0,5.0,0.0,0.0,4.807612E9]         |0    |
|[0.0,387.0,7.0,4.0,8.0,0.0,2.0,5.0,21.0,6.0,1.0,0.0,6.054448E9]          |0    |
|(13,[1,2,12],[8.0,1.0,6.161779E9])                                       |1    |
|[1.0,4079.0,4.0,74.0,118.0,6.0,23.0,59.0,171.0,41.0,0.0,0.0,1.1366431E10]|0    |
|[0.0,2111.0,0.0,28.0,52.0,1.0,12.0,24.0,100.0,21.0,0.0,0.0,1.680985E9]   |1    |
|[0.0,150.0,16.0,1.0,5.0,1.0,1.0,3.0,7.0,1.0,0.0,0.0,6.288035E9]          |0    |
|[0.0,1914.0,1.0,31

## Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [199]:
train, test = df_features.select(["features_raw", "label"]).randomSplit([0.7, 0.3], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Apply Standard Scaler to Feature Vector (only use training data for fittting)

In [200]:
scaler = StandardScaler(inputCol="features_raw", outputCol="features",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(train)

# Normalize each feature to have unit standard deviation.
train = scalerModel.transform(train)
test = scalerModel.transform(test)

# Select only relevant columns
train = train.select("features", "label")
test = test.select("features", "label")

# Show example train and test data
train.show(1, vertical=True, truncate=False)
test.show(1, vertical=True, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 features | [-0.9090432819323164,-0.5409572220642149,-0.4611421714421565,-0.6898758454499456,-0.6095727990394836,-0.7581758900672644,-0.5916125451268657,-0.5651962896804007,-0.5117761802965368,-0.3771032641568506,0.3290094057921096,-0.4928509615516156,-0.2194480471686497] 
 label    | 0                                                                                                                                                                                                                                                                    
only showing top 1 row

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------

## Model 1 - Random Forest

In [201]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[rf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Tune Model

In [202]:
#paramGrid = ParamGridBuilder() \
#    .addGrid(rf.numTrees,[4, 8, 12]) \
#    .addGrid(rf.maxDepth,[5, 10, 20, 30]) \
#    .addGrid(rf.maxBins,[100, 200]) \
#    .build()

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees,[4]) \
    .addGrid(rf.maxDepth,[5]) \
    .addGrid(rf.maxBins,[100]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [203]:
cvModel_q1 = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [204]:
cvModel_q1.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.7436830924686649]

In [205]:
results = cvModel_q1.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [206]:
results.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[1.09330881205373...|    0|[1.88679245283018...|[0.47169811320754...|       1.0|
|[-0.9090432819323...|    1|           [1.0,3.0]|         [0.25,0.75]|       1.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 2 rows

In [207]:
# Get best pipeline and params
bestPipeline = cvModel_q1.bestModel
bestRFModel = bestPipeline.stages[-1]

maxBins_best = bestRFModel._java_obj.getMaxBins()
maxDepth_best = bestRFModel._java_obj.getMaxDepth()
numTrees_best = bestRFModel._java_obj.getNumTrees()

print("The best performing params: \n" + \
     "numTrees: " + str(numTrees_best) + "\n" + \
     "maxDepth: " + str(maxDepth_best) + "\n" + \
     "maxBins: " + str(maxBins_best) + "."
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The best performing params: 
numTrees: 4
maxDepth: 5
maxBins: 100.

### Compute Accuracy of Best Model

#### Accuracy (all users)

In [208]:
count_correct = results.filter(results.label == results.prediction).count()
count_all = results.count()
accuracy = count_correct/count_all * 100
print("The model has an overall accuracy of " + str(int(accuracy)) + "%.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The model has an overall accuracy of 69%.

#### Accuracy (churned users)

In [209]:
result_churners = results.filter(results.label == 1)
count_correct = result_churners.filter(results.label == results.prediction).count()
count_all = result_churners.count()
accuracy = count_correct/count_all * 100
print("The model has an accuracy of " + str(int(accuracy)) + "% for predicting churn correctly.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The model has an accuracy of 36% for predicting churn correctly.

## Model 2 - Logistic Regression

### Build Pipeline

In [63]:
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)
pipeline = Pipeline(stages=[lr])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Tune Model

In [64]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .addGrid(lr.maxIter,[10]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
cvModel_q2 = crossval.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'train' is not defined
Traceback (most recent call last):
NameError: name 'train' is not defined



In [66]:
cvModel_q2.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'cvModel_q2' is not defined
Traceback (most recent call last):
NameError: name 'cvModel_q2' is not defined



In [67]:
results = cvModel_q2.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'cvModel_q2' is not defined
Traceback (most recent call last):
NameError: name 'cvModel_q2' is not defined



In [68]:
results.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'results' is not defined
Traceback (most recent call last):
NameError: name 'results' is not defined



In [69]:
# Get best pipeline and params
bestPipeline = cvModel_q2.bestModel
bestLRModel = bestPipeline.stages[-1]

regParam_best = bestLRModel._java_obj.getRegParam()
maxIter_best = bestLRModel._java_obj.getMaxIter()

print("The best performing params: \n" + \
     "regParam: " + str(regParam_best) + "\n" + \
     "maxIter: " + str(maxIter_best)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'cvModel_q2' is not defined
Traceback (most recent call last):
NameError: name 'cvModel_q2' is not defined



### Compute Accuracy of Best Model

#### Accuracy (all users)

In [70]:
count_correct = results.filter(results.label == results.prediction).count()
count_all = results.count()
accuracy = count_correct/count_all * 100
print("The model has an overall accuracy of " + str(int(accuracy)) + "%.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'results' is not defined
Traceback (most recent call last):
NameError: name 'results' is not defined



#### Accuracy (churned users)

In [71]:
result_churners = results.filter(results.label == 1)
count_correct = result_churners.filter(results.label == results.prediction).count()
count_all = result_churners.count()
accuracy = count_correct/count_all * 100
print("The model has an accuracy of " + str(int(accuracy)) + "% for predicting churn correctly.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

name 'results' is not defined
Traceback (most recent call last):
NameError: name 'results' is not defined



# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.