# Data Wrangling with DataFrames Coding Quiz

Helpful resources:
https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html

In [66]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, desc, udf, sum
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) write code to answer the quiz questions 

In [4]:
spark = SparkSession \
.builder \
.appName("Data Wrangling with DataFrames") \
.getOrCreate()

In [5]:
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

In [6]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# Question 1

Which page did user id "" (empty string) NOT visit?

In [31]:
blankPages = user_log.filter(user_log.userId == '') \
            .select(col('page') \
            .alias("blank_pages")) \
            .dropDuplicates()
blankPages.head(5)


[Row(blank_pages='Home'),
 Row(blank_pages='About'),
 Row(blank_pages='Login'),
 Row(blank_pages='Help')]

In [32]:
allPages = user_log.select('page').dropDuplicates()
for PageNotVisit in set(allPages.collect()) - set(blankPages.collect()):
    print(PageNotVisit.page)

Save Settings
NextSong
Downgrade
Logout
Submit Downgrade
Error
Submit Upgrade
Upgrade
Settings


# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


Answer:
Users who have not signed up yet or who are signed out and are about to log in.

# Question 3

How many female users do we have in the data set?

In [33]:
 user_log.select(['UserId','gender']).where(user_log.gender == 'F').dropDuplicates().count()

462

# Question 4

How many songs were played from the most played artist?

In [55]:
user_log.select('Artist') \
        .where(user_log.page == 'NextSong') \
        .groupBy('Artist') \
        .agg({'Artist':'count'}) \
        .withColumnRenamed('count(Artist)','countArtist') \
        .sort(desc('countArtist')) \
        .show(1)

+--------+-----------+
|  Artist|countArtist|
+--------+-----------+
|Coldplay|         83|
+--------+-----------+
only showing top 1 row



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [67]:
function = udf(lambda isHome: int(isHome == 'Home'), IntegerType())
user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = user_log.filter((user_log.page == 'NextSong') | (user_log.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', sum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+

