In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [3]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

In [4]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

### Data Exploration

In [5]:
user_log.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')

In [10]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
print(f"Total number of rows: {user_log.count()}")

Total number of rows: 10000


In [12]:
user_log.filter(user_log.userId=="").count()

336

In [13]:
emptyId_df=user_log.filter(user_log.userId=="")

In [14]:
def empty_id(df):
    return df.filter(df.userId=="").count()
print(f"Total rows with empty user ID: {empty_id(user_log)}")

Total rows with empty user ID: 336


### Quiz

1. Which page did user id "" (empty string) NOT visit?

In [15]:
def page_visited(df):
    return df.filter(df.userId=="").select("page").dropDuplicates()

pages_empty = page_visited(user_log)
pages_all = user_log.select("page").dropDuplicates()

#for row in pages_all.collect():
#    if row not in pages_empty.collect():
#        print(row.page)

for row in set(pages_all.collect()) - set(pages_empty.collect()):
    print(row.page)

Error
Downgrade
Submit Downgrade
Upgrade
Settings
Save Settings
Logout
NextSong
Submit Upgrade


2. How many female users do we have in the data set?

In [16]:
def female_count(df):
    df_unique = df.select('userId').dropDuplicates()
    return df_unique.filter(df.gender=='F').count()
print(female_count(user_log))

462


3. How many songs were played from the most played artist?

In [17]:
user_log.groupby('artist').count().orderBy(desc('count')).show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|                null| 1653|
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
|       Justin Bieber|   43|
|      The Black Keys|   40|
|         OneRepublic|   37|
|                Muse|   36|
|        Jack Johnson|   36|
|           Radiohead|   31|
|        Taylor Swift|   29|
|          Lily Allen|   28|
|               Train|   28|
|Barry Tuckwell/Ac...|   28|
|           Metallica|   27|
|          Nickelback|   27|
|           Daft Punk|   27|
|          Kanye West|   26|
+--------------------+-----+
only showing top 20 rows



In [18]:
user_log.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

In [19]:
def by_artist(df):
    df = df.filter(df.artist != "").groupby('artist').count().orderBy(desc('count'))
    return df.take(1)
most_played = by_artist(user_log)
most_played_artist = most_played[0].artist
most_played_artist

'Coldplay'

In [20]:
user_log.filter(user_log.artist==most_played_artist).count()

83

In [21]:
user_log.show()

+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shreveport-Bossie...|   PUT

In [43]:
user_log.select('page').dropDuplicates().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [22]:
user_log.select(['song', 'artist']).filter(user_log.artist==most_played_artist).count()

83

4. How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [44]:
from pyspark.sql.window import Window

In [45]:
df = user_log

In [46]:
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

In [47]:
user_window = Window.partitionBy('userID').orderBy(desc('ts')).rangeBetween(Window.unboundedPreceding, 0)

In [48]:
cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function('page')) \
    .withColumn('period', Fsum('homevisit').over(user_window))

In [49]:
cusum.show(50)

+------+--------+-------------+---------+------+
|userID|    page|           ts|homevisit|period|
+------+--------+-------------+---------+------+
|  1436|NextSong|1513783259284|        0|     0|
|  1436|NextSong|1513782858284|        0|     0|
|  2088|    Home|1513805972284|        1|     1|
|  2088|NextSong|1513805859284|        0|     1|
|  2088|NextSong|1513805494284|        0|     1|
|  2088|NextSong|1513805065284|        0|     1|
|  2088|NextSong|1513804786284|        0|     1|
|  2088|NextSong|1513804555284|        0|     1|
|  2088|NextSong|1513804196284|        0|     1|
|  2088|NextSong|1513803967284|        0|     1|
|  2088|NextSong|1513803820284|        0|     1|
|  2088|NextSong|1513803651284|        0|     1|
|  2088|NextSong|1513803413284|        0|     1|
|  2088|NextSong|1513803254284|        0|     1|
|  2088|NextSong|1513803057284|        0|     1|
|  2088|NextSong|1513802824284|        0|     1|
|  2162|NextSong|1513781246284|        0|     0|
|  2162|NextSong|151

In [53]:
cusum.filter((cusum.page == 'NextSong')).groupBy('userID', 'period').agg({'period':'count'}).show()

+------+------+-------------+
|userID|period|count(period)|
+------+------+-------------+
|  1436|     0|            2|
|  2088|     1|           13|
|  2162|     0|           19|
|  2162|     2|           15|
|  2294|     0|            4|
|  2294|     1|           17|
|  2294|     2|            3|
|  2294|     3|           16|
|  2294|     4|            4|
|  2294|     5|           11|
|  2904|     0|            1|
|   691|     1|            3|
|  1394|     0|            9|
|  1394|     1|           17|
|  2275|     0|            3|
|  2756|     0|            4|
|  2756|     2|            1|
|   451|     0|            1|
|   451|     1|            1|
|   800|     1|            2|
+------+------+-------------+
only showing top 20 rows



In [None]:
cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period': 'count'}) \
    .agg({'count(period)':'avg'}).show()

In [30]:
cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function('page')) \
    .withColumn('period', Fsum('homevisit').over(user_window))

In [39]:
cusum.filter((cusum.page=='NextSong')).groupBy('userID', 'period').count().show()

+------+------+-----+
|userID|period|count|
+------+------+-----+
|  1436|     0|    2|
|  2088|     1|   13|
|  2162|     0|   19|
|  2162|     2|   15|
|  2294|     0|    4|
|  2294|     1|   17|
|  2294|     2|    3|
|  2294|     3|   16|
|  2294|     4|    4|
|  2294|     5|   11|
|  2904|     0|    1|
|   691|     1|    3|
|  1394|     0|    9|
|  1394|     1|   17|
|  2275|     0|    3|
|  2756|     0|    4|
|  2756|     2|    1|
|   451|     0|    1|
|   451|     1|    1|
|   800|     1|    2|
+------+------+-----+
only showing top 20 rows



In [40]:
mid_result = cusum.filter((cusum.page=='NextSong')).groupBy('userID', 'period')

In [42]:
mid_result.agg({'period': 'count'}).show()

+------+------+-------------+
|userID|period|count(period)|
+------+------+-------------+
|  1436|     0|            2|
|  2088|     1|           13|
|  2162|     0|           19|
|  2162|     2|           15|
|  2294|     0|            4|
|  2294|     1|           17|
|  2294|     2|            3|
|  2294|     3|           16|
|  2294|     4|            4|
|  2294|     5|           11|
|  2904|     0|            1|
|   691|     1|            3|
|  1394|     0|            9|
|  1394|     1|           17|
|  2275|     0|            3|
|  2756|     0|            4|
|  2756|     2|            1|
|   451|     0|            1|
|   451|     1|            1|
|   800|     1|            2|
+------+------+-------------+
only showing top 20 rows



In [54]:
temp = mid_result.agg({'period': 'count'})

In [58]:
mylist = temp.select('count(period)').collect()
for i in mylist:
    print(mylist['count(period)'])

TypeError: list indices must be integers or slices, not str

In [36]:
cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period': 'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



In [153]:
df.filter((df.page=='NextSong') | (df.page=='Home')).select('userID', 'page', 'ts').withColumn('homevisit', function('page')).show()

+------+--------+-------------+---------+
|userID|    page|           ts|homevisit|
+------+--------+-------------+---------+
|  1046|NextSong|1513720872284|        0|
|  1000|NextSong|1513720878284|        0|
|  2219|NextSong|1513720881284|        0|
|  2373|NextSong|1513720905284|        0|
|  1747|    Home|1513720913284|        1|
|  1162|NextSong|1513720955284|        0|
|  1061|NextSong|1513720959284|        0|
|   748|    Home|1513720959284|        1|
|   597|    Home|1513720980284|        1|
|  1806|NextSong|1513720983284|        0|
|   748|NextSong|1513720993284|        0|
|  1176|NextSong|1513721031284|        0|
|  2164|NextSong|1513721045284|        0|
|  2146|NextSong|1513721058284|        0|
|  2219|NextSong|1513721077284|        0|
|  1176|    Home|1513721088284|        1|
|  2904|NextSong|1513721095284|        0|
|   597|NextSong|1513721097284|        0|
|   226|NextSong|1513721104284|        0|
|  1046|NextSong|1513721104284|        0|
+------+--------+-------------+---