In [1]:
from pyspark.sql import SparkSession


In [2]:
import datetime
from pyspark.sql.functions import asc_nulls_first, asc

In [3]:
import datetime
from pyspark.sql.functions import asc_nulls_first, asc

In [4]:
spark = SparkSession\
        .builder\
        .appName("Data Wrangling with DataFrame")\
        .getOrCreate()

In [5]:
filelocation = r"D:\Github\Udacity-Spark\sparkify_log_small.json"
user_log = spark.read.json(filelocation)

In [6]:
user_log.describe().show(truncate = True)

+-------+-----------------+----------+---------+------+------------------+--------+-----------------+-----+------------+------+-------+--------------------+------------------+--------+------------------+-------------------+--------------------+------------------+
|summary|           artist|      auth|firstName|gender|     itemInSession|lastName|           length|level|    location|method|   page|        registration|         sessionId|    song|            status|                 ts|           userAgent|            userId|
+-------+-----------------+----------+---------+------+------------------+--------+-----------------+-----+------------+------+-------+--------------------+------------------+--------+------------------+-------------------+--------------------+------------------+
|  count|             8347|     10000|     9664|  9664|             10000|    9664|             8347|10000|        9664| 10000|  10000|                9664|             10000|    8347|             10000|     

In [7]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [8]:
user_log.describe("page").show()

+-------+-------+
|summary|   page|
+-------+-------+
|  count|  10000|
|   mean|   null|
| stddev|   null|
|    min|  About|
|    max|Upgrade|
+-------+-------+



In [9]:
user_log.select("page").dropDuplicates().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [10]:
user_log.select("page").dropDuplicates().sort("page").show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [11]:
from pyspark.sql.functions import desc             # 'desc' method is for sorting a column in descending order.

user_log.select("page").dropDuplicates().orderBy(desc("page")).show()

+----------------+
|            page|
+----------------+
|         Upgrade|
|  Submit Upgrade|
|Submit Downgrade|
|        Settings|
|   Save Settings|
|        NextSong|
|          Logout|
|           Login|
|            Home|
|            Help|
|           Error|
|       Downgrade|
|           About|
+----------------+



#### Selecting a few columns from the dataframe....

In [12]:
user_log.select(["userId", "firstName", "lastName", "page", "song"]).where(user_log.userId == "1046").show(truncate = False)

+------+---------+--------+--------+----------------------------------------------------+
|userId|firstName|lastName|page    |song                                                |
+------+---------+--------+--------+----------------------------------------------------+
|1046  |Kenneth  |Matthews|NextSong|Christmas Tears Will Fall                           |
|1046  |Kenneth  |Matthews|NextSong|Be Wary Of A Woman                                  |
|1046  |Kenneth  |Matthews|NextSong|Public Enemy No.1                                   |
|1046  |Kenneth  |Matthews|NextSong|Reign Of The Tyrants                                |
|1046  |Kenneth  |Matthews|NextSong|Father And Son                                      |
|1046  |Kenneth  |Matthews|NextSong|No. 5                                               |
|1046  |Kenneth  |Matthews|NextSong|Seventeen                                           |
|1046  |Kenneth  |Matthews|Home    |null                                                |
|1046  |Ke

In [13]:
user_log.select(["userId", "firstName", "page", "song"]).where(user_log.userId == "1046").collect()

[Row(userId='1046', firstName='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Father And Son'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='No. 5'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Seventeen'),
 Row(userId='1046', firstName='Kenneth', page='Home', song=None),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='War on war'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userId='1046', firstName='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userId='1046', firstName='Kenneth', page='Logout', song=None),
 Row(userId='1046', firstName='Kenneth'

In [14]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
user_log.select("ts").show()

+-------------+
|           ts|
+-------------+
|1513720872284|
|1513720878284|
|1513720881284|
|1513720905284|
|1513720913284|
|1513720932284|
|1513720955284|
|1513720959284|
|1513720959284|
|1513720980284|
|1513720983284|
|1513720993284|
|1513721031284|
|1513721045284|
|1513721058284|
|1513721077284|
|1513721088284|
|1513721095284|
|1513721097284|
|1513721104284|
+-------------+
only showing top 20 rows



In [16]:
user_log.describe("page").show()

+-------+-------+
|summary|   page|
+-------+-------+
|  count|  10000|
|   mean|   null|
| stddev|   null|
|    min|  About|
|    max|Upgrade|
+-------+-------+



In [17]:
user_log.select("page").distinct().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [18]:
user_log.select("page").drop_duplicates().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [19]:
user_log.head() # just a simple check!

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')

## Calculating statistics by Hour

#### finding at what particular hour does a user plays a particular song.

In [20]:
# this is a user defined function (UDF)

from pyspark.sql.functions import udf

get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0).hour)

In [21]:
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [22]:
user_log.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='3')

In [23]:
user_log.count()

10000

In [24]:
user_log.take(5)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='3'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000', hour='3'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth=

In [25]:
user_log.show()

+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|hour|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|   3|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shrevep

In [26]:
user_log.select('hour').distinct().show()

+----+
|hour|
+----+
|   7|
|  15|
|  11|
|   3|
|   8|
|  22|
|  16|
|   0|
|   5|
|  18|
|  17|
|   6|
|  19|
|  23|
|   9|
|   1|
|  20|
|  10|
|   4|
|  12|
+----+
only showing top 20 rows



In [27]:
user_log.filter(user_log["userID"] == "").count()

336

In [28]:
user_log.select("userID").describe().show()

+-------+------------------+
|summary|            userID|
+-------+------------------+
|  count|             10000|
|   mean|1442.4413286423842|
| stddev| 829.8909432082621|
|    min|                  |
|    max|               999|
+-------+------------------+



In [29]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- hour: string (nullable = true)



In [30]:
user_log.select("userID").dropDuplicates().sort("userID").show()

+------+
|userID|
+------+
|      |
|    10|
|   100|
|  1000|
|  1003|
|  1005|
|  1006|
|  1017|
|  1019|
|  1020|
|  1022|
|  1025|
|  1030|
|  1035|
|  1037|
|   104|
|  1040|
|  1042|
|  1043|
|  1046|
+------+
only showing top 20 rows



In [31]:
user_log.select("userID").distinct().sort("userID").show()

+------+
|userID|
+------+
|      |
|    10|
|   100|
|  1000|
|  1003|
|  1005|
|  1006|
|  1017|
|  1019|
|  1020|
|  1022|
|  1025|
|  1030|
|  1035|
|  1037|
|   104|
|  1040|
|  1042|
|  1043|
|  1046|
+------+
only showing top 20 rows



**Since we are having empty UserIDs hence its wise to remove them from the dataframe.!**

In [32]:
user_log_valid = user_log.filter(user_log["UserID"] != "")
user_log_valid.show(5)

+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|              artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|hour|
+--------------------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+----+
|       Showaddywaddy|Logged In|  Kenneth|     M|          112| Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|   3|
|          Lily Allen|Logged In|Elizabeth|     F|            7|    Chase|195.23873| free|Shrevep

In [33]:
pddf = user_log_valid.toPandas()
pddf.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,hour
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380319284,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046,3
1,Lily Allen,Logged In,Elizabeth,F,7,Chase,195.23873,free,"Shreveport-Bossier City, LA",PUT,NextSong,1512718541284,5027,Cheryl Tweedy,200,1513720878284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1000,3
2,Cobra Starship Featuring Leighton Meester,Logged In,Vera,F,6,Blackwell,196.20526,paid,"Racine, WI",PUT,NextSong,1499855749284,5516,Good Girls Go Bad (Feat.Leighton Meester) (Alb...,200,1513720881284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2219,3
3,Alex Smoke,Logged In,Sophee,F,8,Barker,405.99465,paid,"San Luis Obispo-Paso Robles-Arroyo Grande, CA",PUT,NextSong,1513009647284,2372,Don't See The Point,200,1513720905284,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2373,3
4,,Logged In,Jordyn,F,0,Jones,,free,"Syracuse, NY",GET,Home,1513648531284,1746,,200,1513720913284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1747,3


**Checking if the invalid userIDs are successfully removed or not!**

In [34]:
user_log_valid.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|    10|
|   100|
|  1000|
|  1003|
|  1005|
|  1006|
|  1017|
|  1019|
|  1020|
|  1022|
|  1025|
|  1030|
|  1035|
|  1037|
|   104|
|  1040|
|  1042|
|  1043|
|  1046|
|  1048|
+------+
only showing top 20 rows



#### Lets check how many user have downgraded their account and moved to a free subscription.

In [35]:
user_log_valid.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- hour: string (nullable = true)



In [36]:
user_log_valid.select(["page"]).distinct().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [37]:
user_log_valid.select(["userId", "firstName", "lastName", "page", "level"])\
              .filter(user_log_valid["page"] == "Submit Downgrade")\
              .show()

+------+---------+--------+----------------+-----+
|userId|firstName|lastName|            page|level|
+------+---------+--------+----------------+-----+
|  1138|    Kelly|  Newton|Submit Downgrade| paid|
+------+---------+--------+----------------+-----+



In [38]:
total_1138_records = user_log_valid.select(["userId", "firstName", "lastName", "page", "level", "song"])\
              .filter("""userId = 1138""").count()

print(total_1138_records)

user_log_valid.select(["userId", "firstName", "lastName", "page", "level", "song"])\
              .filter("""userId = 1138""").show(n = total_1138_records,truncate = False)

68
+------+---------+--------+----------------+-----+---------------------------------------+
|userId|firstName|lastName|page            |level|song                                   |
+------+---------+--------+----------------+-----+---------------------------------------+
|1138  |Kelly    |Newton  |Home            |paid |null                                   |
|1138  |Kelly    |Newton  |NextSong        |paid |Everybody Everybody                    |
|1138  |Kelly    |Newton  |NextSong        |paid |Gears                                  |
|1138  |Kelly    |Newton  |NextSong        |paid |Use Somebody                           |
|1138  |Kelly    |Newton  |NextSong        |paid |Love Of My Life (1993 Digital Remaster)|
|1138  |Kelly    |Newton  |NextSong        |paid |Down In The Valley Woe                 |
|1138  |Kelly    |Newton  |NextSong        |paid |Treat Her Like A Lady                  |
|1138  |Kelly    |Newton  |NextSong        |paid |Everybody Thinks You're An Angel     

In [39]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

flag_downgrade_event = udf(lambda target_page : 1 if target_page == "Submit Downgrade" else 0, IntegerType())

In [40]:
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [41]:
user_log_valid.select(["userId", "firstName", "lastName", "page", "level", "downgraded"])\
              .filter(user_log_valid["page"] == "Submit Downgrade")\
              .show()

+------+---------+--------+----------------+-----+----------+
|userId|firstName|lastName|            page|level|downgraded|
+------+---------+--------+----------------+-----+----------+
|  1138|    Kelly|  Newton|Submit Downgrade| paid|         1|
+------+---------+--------+----------------+-----+----------+



In [42]:
user_log_valid.select(["userId", "firstName", "lastName", "page", "level", "downgraded"]).show(5)

+------+---------+---------+--------+-----+----------+
|userId|firstName| lastName|    page|level|downgraded|
+------+---------+---------+--------+-----+----------+
|  1046|  Kenneth| Matthews|NextSong| paid|         0|
|  1000|Elizabeth|    Chase|NextSong| free|         0|
|  2219|     Vera|Blackwell|NextSong| paid|         0|
|  2373|   Sophee|   Barker|NextSong| paid|         0|
|  1747|   Jordyn|    Jones|    Home| free|         0|
+------+---------+---------+--------+-----+----------+
only showing top 5 rows



In [43]:
user_log_valid.select(["userId", "firstName", "lastName", "page", "level", "downgraded"])\
              .filter(user_log_valid["userID"] == "1138")\
              .show()

+------+---------+--------+--------+-----+----------+
|userId|firstName|lastName|    page|level|downgraded|
+------+---------+--------+--------+-----+----------+
|  1138|    Kelly|  Newton|    Home| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|NextSong| paid|         0|
|  1138|    Kelly|  Newton|N

In [44]:
from pyspark.sql import Window

In [45]:
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [46]:
from pyspark.sql.functions import sum as Fsum

user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"])\
              .where(user_log_valid.userId == "1138").sort("ts").collect()

[Row(userId='1138', firstname='Kelly', ts=1513729066284, page='Home', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729066284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729313284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729552284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513729783284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730001284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730263284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730518284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513730768284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firstname='Kelly', ts=1513731182284, page='NextSong', level='paid', phase=1),
 Row(userId='1138', firs

### Assignment #1:

Which page did user id "" (empty string) NOT visit? <br>
a.) About<br>
b.) Home<br>
c.) Login<br>
d.) NextSong

In [95]:
all_pages = set(user_log.select("page").distinct().collect())
user_visited = set(user_log.select("page").filter(user_log.userId == "").distinct().collect())

page_not_visited = all_pages.difference(user_visited)
print("""user with userID "" did not visit these below pages...\n""")
for webpage in page_not_visited:
    print(webpage.page)

user with userID "" did not visit these below pages...

Downgrade
Save Settings
NextSong
Logout
Submit Downgrade
Error
Upgrade
Settings
Submit Upgrade


In [87]:
sorted(all_pages)

[Row(page='About'),
 Row(page='Downgrade'),
 Row(page='Error'),
 Row(page='Help'),
 Row(page='Home'),
 Row(page='Login'),
 Row(page='Logout'),
 Row(page='NextSong'),
 Row(page='Save Settings'),
 Row(page='Settings'),
 Row(page='Submit Downgrade'),
 Row(page='Submit Upgrade'),
 Row(page='Upgrade')]

In [88]:
all_pages.difference(user_visited)

{Row(page='Downgrade'),
 Row(page='Error'),
 Row(page='Logout'),
 Row(page='NextSong'),
 Row(page='Save Settings'),
 Row(page='Settings'),
 Row(page='Submit Downgrade'),
 Row(page='Submit Upgrade'),
 Row(page='Upgrade')}

In [49]:

print("""user with the ID '' did not visited page - 'NextSong'""")

user with the ID '' did not visited page - 'NextSong'


### Assignment #2:
How many female users do we have in the data set?<br>
a.) 462<br>
b.) 501<br>
c.) 3820<br>
d.) 5844<br>

In [50]:
print(user_log_valid.columns)

['artist', 'auth', 'firstName', 'gender', 'itemInSession', 'lastName', 'length', 'level', 'location', 'method', 'page', 'registration', 'sessionId', 'song', 'status', 'ts', 'userAgent', 'userId', 'hour', 'downgraded', 'phase']


In [96]:
total_females = user_log_valid.select(["userId", "gender"]).distinct().filter(user_log_valid.gender == "F").count()
print("Number of female users: ", total_females)

Number of female users:  462


### Assignment #3:
How many songs were played from the most played artist?<br>
a.) 3<br>
b.) 53<br>
c.) 83<br>
d.) 113

In [52]:
most_played_artist = user_log_valid.dropna(how = "any")\
                     .groupBy("artist")\
                     .count().select(["artist", "count"]).orderBy(desc("count")).collect()[0][1]

most_played_artist

83

### Assignment #4:
How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.<br>
a.) 5<br>
b.) 7<br>
c.) 9<br>
d.) 11

In [53]:
user_log_valid.select(["userId", "page"]).filter(user_log_valid.userId == "1046").show()

+------+--------+
|userId|    page|
+------+--------+
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|    Home|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|  Logout|
|  1046|    Home|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
|  1046|NextSong|
+------+--------+
only showing top 20 rows



In [54]:
user_log_valid.select("page").groupBy("userId")

<pyspark.sql.group.GroupedData at 0x16ce7d98220>