# Data Wrangling with Spark SQL Quiz

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) create a view to use with your SQL queries
# 5) write code to answer the quiz questions 

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark SQL Quiz") \
    .getOrCreate()

user_log = spark.read.json("data/sparkify_log_small.json")

user_log.createOrReplaceTempView("log_table")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/29 22:56:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/08/29 22:56:18 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
                                                    

# Question 1

Which page did user id ""(empty string) NOT visit?

In [3]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
spark.sql(
    "SELECT DISTINCT page \
    FROM log_table \
    WHERE userId=''")

DataFrame[page: string]

In [6]:
spark.sql(
    '''SELECT DISTINCT page 
    FROM log_table''')

DataFrame[page: string]

In [8]:
# TODO: write your code to answer question 1
spark.sql(
    '''
    SELECT * 
    FROM (
        SELECT DISTINCT page
        FROM log_table
        WHERE userId = ''
        ) AS user_pages
    RIGHT JOIN (
        SELECT DISTINCT page
        FROM log_table
        ) AS all_pages 
        ON user_pages.page = all_pages.page
    WHERE user_pages.page IS NULL;
    '''
).show()

                                                                                

+----+----------------+
|page|            page|
+----+----------------+
|NULL|Submit Downgrade|
|NULL|       Downgrade|
|NULL|          Logout|
|NULL|   Save Settings|
|NULL|        Settings|
|NULL|        NextSong|
|NULL|         Upgrade|
|NULL|           Error|
|NULL|  Submit Upgrade|
+----+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

# Question 3

How many female users do we have in the data set?

In [9]:
# TODO: write your code to answer question 3
spark.sql(
    '''
    SELECT 
        COUNT(DISTINCT userId)
    FROM log_table
    WHERE 
        gender = 'F';
    '''
).show()

+----------------------+
|count(DISTINCT userId)|
+----------------------+
|                   462|
+----------------------+



# Question 4

How many songs were played from the most played artist?

In [10]:
# TODO: write your code to answer question 4
spark.sql(
    '''
    SELECT Artist, COUNT(Artist) AS plays
    FROM log_table
    GROUP BY Artist
    ORDER BY plays DESC
    LIMIT 10
    '''
).show()

+--------------------+-----+
|              Artist|plays|
+--------------------+-----+
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
|       Justin Bieber|   43|
|      The Black Keys|   40|
|         OneRepublic|   37|
|        Jack Johnson|   36|
|                Muse|   36|
+--------------------+-----+



In [11]:
# Here is an alternative solution
# Get the artist play counts
play_counts = spark.sql("SELECT Artist, COUNT(Artist) AS plays \
        FROM log_table \
        GROUP BY Artist")

# save the results in a new view
play_counts.createOrReplaceTempView("artist_counts")

# use a self join to find where the max play equals the count value
spark.sql("SELECT a2.Artist, a2.plays FROM \
          (SELECT max(plays) AS max_plays FROM artist_counts) AS a1 \
          JOIN artist_counts AS a2 \
          ON a1.max_plays = a2.plays \
          ").show()

+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [12]:
# TODO: write your code to answer question 5
# SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
is_home = spark.sql("SELECT userID, page, ts, CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home FROM log_table \
            WHERE (page = 'NextSong') or (page = 'Home') \
            ")

# keep the results in a new view
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
cumulative_sum = spark.sql("SELECT *, SUM(is_home) OVER \
    (PARTITION BY userID ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period \
    FROM is_home_table")

# keep the results in a view
cumulative_sum.createOrReplaceTempView("period_table")

# find the average count for NextSong
spark.sql("SELECT AVG(count_results) FROM \
          (SELECT COUNT(*) AS count_results FROM period_table \
GROUP BY userID, period, page HAVING page = 'NextSong') AS counts").show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+

