# Data Wrangling with DataFrames Coding Quiz

Use this Jupyter notebook to find the answers to the quiz in the previous section. There is an answer key in the next part of the lesson.

In [13]:
import findspark
findspark.init('C:\\Spark\\spark-3.0.1-bin-hadoop2.7')

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) write code to answer the quiz questions 
spark = SparkSession \
        .builder \
        .appName("Wrangling Data") \
        .getOrCreate()
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

# Question 1

Which page did user id "" (empty string) NOT visit?

In [14]:
# TODO: write your code to answer question 1
user_log.select(["userId", "firstname", "page", "song"]).where(user_log.userId == "").select("page").dropDuplicates().sort("page").show() # look at each category once


+-----+
| page|
+-----+
|About|
| Help|
| Home|
|Login|
+-----+



In [15]:
# filter for users with blank user id
blank_pages = user_log.filter(user_log.userId == '') \
    .select(col('page') \
    .alias('blank_pages')) \
    .dropDuplicates()

# get a list of possible pages that could be visited
all_pages = user_log.select('page').dropDuplicates()

# find values in all_pages that are not in blank_pages
# these are the pages that the blank user did not go to
for row in set(all_pages.collect()) - set(blank_pages.collect()):
    print(row.page)

Upgrade
Error
Downgrade
Logout
Submit Upgrade
NextSong
Settings
Submit Downgrade
Save Settings


# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


In [5]:
# TODO: use this space to explore the behavior of the user with an empty string
user_log.select(["registration"]).where(user_log.userId == "").select("registration").dropDuplicates().sort("registration").show()

+------------+
|registration|
+------------+
|        null|
+------------+



# Question 3

How many female users do we have in the data set?

In [6]:
# TODO: use this space to explore the behavior of the user with an empty string
user_log.select(["gender"]).where(user_log.userId == "")

DataFrame[gender: string]

In [7]:
user_log.select("gender").dropDuplicates().show() # look at each category once

+------+
|gender|
+------+
|     F|
|  null|
|     M|
+------+



In [8]:
user_log.select(["userId"]).where(user_log.gender == "F").count()

3820

In [9]:
user_log.filter(user_log.gender == 'F') \
    .select('userId', 'gender') \
    .dropDuplicates() \
    .count()

462

# Question 4

How many songs were played from the most played artist?

In [42]:
count_songs = user_log.groupby(user_log.artist).count().sort(desc("count"))
count_songs.show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|                null| 1653|
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
|       Justin Bieber|   43|
|      The Black Keys|   40|
|         OneRepublic|   37|
|                Muse|   36|
|        Jack Johnson|   36|
|           Radiohead|   31|
|        Taylor Swift|   29|
|Barry Tuckwell/Ac...|   28|
|          Lily Allen|   28|
|               Train|   28|
|           Daft Punk|   27|
|           Metallica|   27|
|          Nickelback|   27|
|          Kanye West|   26|
+--------------------+-----+
only showing top 20 rows



In [47]:
# count page requests by hour
user_log.filter(user_log.artist == "Coldplay").count()

83

# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [52]:
user_log.filter(user_log.page == "Home").groupby(user_log.userId).mean().show()

+------+------------------+-----------+-----------------+-----------------+-----------+--------------------+
|userId|avg(itemInSession)|avg(length)|avg(registration)|   avg(sessionId)|avg(status)|             avg(ts)|
+------+------------------+-----------+-----------------+-----------------+-----------+--------------------+
|   691|               3.0|       null|1.503986110284E12|            690.0|      200.0|   1.513744793284E12|
|  2294|              33.4|       null|1.487337122284E12|           2293.0|      200.0|   1.513779834084E12|
|  2162|              17.5|       null|1.503425779284E12|           4814.0|      200.0|   1.513776694284E12|
|  2088|              13.0|       null|1.507122158284E12|           2087.0|      200.0|   1.513805972284E12|
|  2275|               0.0|       null|1.505228090284E12|           4876.0|      200.0|   1.513765360284E12|
|  2756|               0.0|       null|1.511124600284E12|           6447.5|      200.0|   1.513785274784E12|
|   800|           

In [10]:
# TODO: filter out 0 sum and max sum to get more exact answer

function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

NameError: name 'Window' is not defined