# Data Wrangling with Spark SQL Quiz

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [40]:
import findspark
findspark.init('/usr/local/spark/spark-3.1.2-bin-hadoop3.2')
from pyspark.sql import SparkSession


# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) create a view to use with your SQL queries
# 5) write code to answer the quiz questions 

In [5]:
spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

In [6]:
path = "sparkify_log_small.json"
user_log = spark.read.json(path)

                                                                                

In [9]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
user_log.createOrReplaceTempView("user_log_table")

In [11]:
spark.sql('''
            SELECT *
            FROM user_log_table
            LIMIT 2
            ''').show()

                                                                                

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|      

# Question 1

Which page did user id ""(empty string) NOT visit?

In [27]:
# TODO: write your code to answer question 1

spark.sql('''
            SELECT COUNT(DISTINCT userId), gender
            FROM user_log_table
            WHERE gender = 'F'
            GROUP BY gender
            ''').show()

spark.sql('''
          SELECT *
          FROM (
              SELECT DISTINCT page
              FROM user_log_table
              WHERE userId = '') AS user_pages
          RIGHT JOIN(
              SELECT DISTINCT page
              FROM user_log_table) AS all_pages
          ON user_pages.page = all_pages.page
          WHERE user_pages.page IS NULL
          '''
          ).show()

# DataFrame
# guest_pages = df.filter(df.userId == '')\
#    .select(col('page'))\
#    .alias('guest_pages')\
#    .dropDuplicates()

# all_pages = df.select('page').dropDuplicates()

# for i in set(all_pages.collect()) - set(guest_pages.collect()):
#   print(i.page)

                                                                                

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

In [28]:
# I prefer to use SQL as I am already familiar with it. 

# Question 3

How many female users do we have in the data set?

In [31]:
# TODO: write your code to answer question 3

spark.sql('''
            SELECT COUNT(DISTINCT userId), gender
            FROM user_log_table
            WHERE gender = 'F'
            GROUP BY gender
            ''').show()


                                                                                

+----------------------+------+
|count(DISTINCT userId)|gender|
+----------------------+------+
|                   462|     F|
+----------------------+------+



# Question 4

How many songs were played from the most played artist?

In [33]:
# TODO: write your code to answer question 4

spark.sql('''
            SELECT artist, COUNT(artist) as artist_ct
            FROM user_log_table
            GROUP BY artist
            ORDER BY artist_ct DESC
            LIMIT 1
          ''').show()
            

[Stage 107:>                                                        (0 + 1) / 1]

+--------+---------+
|  artist|artist_ct|
+--------+---------+
|Coldplay|       83|
+--------+---------+



                                                                                

# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [56]:
# TODO: write your code to answer question 5

# is_home
is_home = spark.sql('''
                SELECT userId, page, ts,
                     CASE WHEN page = 'Home' 
                         THEN 1
                         ELSE 0
                         END AS is_home
                 FROM user_log_table
                 WHERE (page = 'NextSong') or (page = 'Home')                
                 ''')


# keep the results in a new view
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
Cumulative_sum = spark.sql('''
                    SELECT *, SUM(is_home) OVER
                        (PARTITION BY userId ORDER BY ts DESC 
                        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period
                    FROM is_home_table
                    '''     
                    )

# keep the results in a view
Cumulative_sum.createOrReplaceTempView("period_table")

# find the average count for NextSong
spark.sql('''
          SELECT ceil(AVG(ct_results))
          FROM 
              (SELECT COUNT(*) AS ct_results 
               FROM period_table
               GROUP BY userID, period, page HAVING page = 'NextSong') AS counts
          ''').show()

[Stage 138:>                                                        (0 + 1) / 1]

+---------------------+
|CEIL(avg(ct_results))|
+---------------------+
|                    7|
+---------------------+



                                                                                