# Data Wrangling with DataFrames Coding Quiz

Use this Jupyter notebook to find the answers to the quiz in the previous section. There is an answer key in the next part of the lesson.

In [2]:
from pyspark.sql import SparkSession

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) write code to answer the quiz questions 
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import sum, count, avg, col, udf, asc, desc, sort_array
from pyspark.sql.window import Window

In [3]:
spark = SparkSession \
    .builder \
    .appName("Spark Exercises 1") \
    .getOrCreate()

df = spark.read.json("data/sparkify_log_small.json")

22/04/18 11:29:50 WARN Utils: Your hostname, Erichs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.15.5 instead (on interface en0)
22/04/18 11:29:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/18 11:29:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

# Question 1

Which page did user id "" (empty string) NOT visit?

In [7]:
# TODO: write your code to answer question 1

# prepare dataset with pages that usedId = '' visited
visits = df.filter(df.userId == '') \
    .select(col('page') \
    .alias('visited')) \
    .dropDuplicates()

# get all pages
all_pages = df.select('page').dropDuplicates()

# prepare join condition
cond = (all_pages['page'] == visits['visited'])

# left join datasets to find records that exist in
# all_pages but not in visits
result = all_pages.join(visits, cond, 'left') \
            .select(all_pages.page, visits.visited)

result.filter(result.visited.isNull()) \
        .select(col('page')) \
        .show()


# result.show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|       Downgrade|
|          Logout|
|   Save Settings|
|        Settings|
|        NextSong|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



22/04/15 12:56:52 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 260827 ms exceeds timeout 120000 ms
22/04/15 12:56:52 WARN SparkContext: Killing executors is not supported by current scheduler.


# Question 2 - Reflect

What type of user does the empty string user id most likely refer to?


It should represent users who are not signed up yet.

# Question 3

How many female users do we have in the data set?

In [6]:
# TODO: write your code to answer question 3
df.filter(df.gender == 'F') \
    .select('userId', 'gender') \
    .distinct() \
    .count()

462

# Question 4

How many songs were played from the most played artist?

In [12]:
# TODO: write your code to answer question 4

df.filter(df.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artist_qty') \
    .sort(desc('Artist_qty')) \
    .show(1)

+--------+----------+
|  Artist|Artist_qty|
+--------+----------+
|Coldplay|        83|
+--------+----------+
only showing top 1 row



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.



In [14]:
# TODO: write your code to answer question 5

ishome = udf(lambda homepg : int(homepg == 'Home'), IntegerType())

window_open = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

checker = df.filter((df.page == 'Home') | (df.page == 'NextSong')) \
    .select('userID', 'page', 'ts') \
    .withColumn('fl_home_visit', ishome(col('page'))) \
    .withColumn('period', sum('fl_home_visit').over(window_open))

checker.filter((checker.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}) \
    .show()

[Stage 13:>                                                         (0 + 2) / 2]

+------------------+
|avg(count(period))|
+------------------+
| 6.898347107438017|
+------------------+



                                                                                