# Data Wrangling with DataFrames Coding Quiz

Use this Jupyter notebook to find the answers to the quiz in the previous section. There is an answer key in the next part of the lesson.

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

21/08/21 17:58:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("MySpark") \
    .getOrCreate()

path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)



In [5]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Question 1

Which page did user id "" (empty string) NOT visit?

In [12]:
# TODO: write your code to answer question 1
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum

visited_pages = user_log.where(user_log.userId == "").select("page").dropDuplicates()
all_pages = user_log.select("page").dropDuplicates()
set(all_pages.collect()) - set(visited_pages.collect())

{Row(page='Downgrade'),
 Row(page='Error'),
 Row(page='Logout'),
 Row(page='NextSong'),
 Row(page='Save Settings'),
 Row(page='Settings'),
 Row(page='Submit Downgrade'),
 Row(page='Submit Upgrade'),
 Row(page='Upgrade')}

# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


In [2]:
# TODO: use this space to explore the behavior of the user with an empty string


# Question 3

How many female users do we have in the data set?

In [19]:
# TODO: write your code to answer question 3
# user_log.select("gender").drop_duplicates().show()

user_log.select(["userId", "gender"]).drop_duplicates().groupBy("gender").count().show()



+------+-----+
|gender|count|
+------+-----+
|     F|  462|
|  null|    1|
|     M|  501|
+------+-----+



# Question 4

How many songs were played from the most played artist?

In [20]:
# TODO: write your code to answer question 4

user_log.filter(user_log.page == 'NextSong').groupBy("artist").count().sort("count", ascending=False).show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
|       Justin Bieber|   43|
|      The Black Keys|   40|
|         OneRepublic|   37|
|                Muse|   36|
|        Jack Johnson|   36|
|           Radiohead|   31|
|        Taylor Swift|   29|
|Barry Tuckwell/Ac...|   28|
|          Lily Allen|   28|
|               Train|   28|
|           Metallica|   27|
|           Daft Punk|   27|
|          Nickelback|   27|
|          Kanye West|   26|
|Red Hot Chili Pep...|   24|
+--------------------+-----+
only showing top 20 rows



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [29]:
# TODO: write your code to answer question 5

user_log.filter(user_log.page == 'NextSong').groupBy(["userId", "sessionId"]).count().agg({'count':'avg'}).show()

from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = user_log.filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()