# Data Wrangling with DataFrames Coding Quiz

Use this Jupyter notebook to find the answers to the quiz in the previous section. There is an answer key in the next part of the lesson.

In [85]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

import datetime as dt

In [60]:
spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [61]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Question 1

Which page did user id "" (empty string) NOT visit?

In [77]:
allpages_df = user_log.select("page").\
dropDuplicates().orderBy("page").\
withColumnRenamed("page", "Allpages")

blankpages_df = user_log.\
filter(user_log.userId == "").\
select("page").dropDuplicates().\
orderBy("page").\
withColumnRenamed("page", "Blankpages")

NonVisitedPages = []
for row in set(allpages_df.collect()) - set(blankpages_df.collect()):
    NonVisitedPages.append(row.Allpages)
    
NonVisitedPages = sorted(NonVisitedPages)

print(NonVisitedPages)
print("{} pages were not visited".format(len(NonVisitedPages)))

['Downgrade', 'Error', 'Logout', 'NextSong', 'Save Settings', 'Settings', 'Submit Downgrade', 'Submit Upgrade', 'Upgrade']
9 pages were not visited


# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


In [21]:
# TODO: use this space to explore the behavior of the user with an empty string
#probably unregistred user

#see pages
user_log.filter(user_log.userId == "").\
select("sessionId").dropDuplicates().\
count()
#186 unregistered user sessions

186

In [22]:
user_log.registerTempTable("dful")

In [30]:
user_log = spark.sql("SELECT *, CAST(ts / 1000.0 AS TIMESTAMP) as ts2 FROM dful")
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts2: timestamp (nullable = true)



In [33]:
tsdf = user_log.filter(user_log.userId == "").\
groupby("sessionId").\
agg(F.min("ts2").alias("MinTS"),
    F.max("ts2").alias("MaxTS"))
tsdf.printSchema()

root
 |-- sessionId: long (nullable = true)
 |-- MinTS: timestamp (nullable = true)
 |-- MaxTS: timestamp (nullable = true)



In [39]:
#challenge for later: How long do unregsitred user spend time on the webiste?

Column<b'ts2'>





# Question 3

How many female users do we have in the data set?

In [78]:
user_log.filter(user_log.userId != '').\
select("userId", "gender").\
dropDuplicates().\
filter(user_log.gender == "F").count()

462

# Question 4

How many songs were played from the most played artist?

In [79]:
user_log.filter(user_log.page == "NextSong").\
groupby("artist").count().\
sort(desc("count")).show(1)

+--------+-----+
|  artist|count|
+--------+-----+
|Coldplay|   83|
+--------+-----+
only showing top 1 row



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [92]:
function = F.udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = user_log.\
    filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(F.col('page'))) \
    .withColumn('period', F.sum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg(F.round(F.avg("count(period)"), 0)).show()

+----------------------------+
|round(avg(count(period)), 0)|
+----------------------------+
|                         7.0|
+----------------------------+

