# Data Wrangling with Spark SQL Quiz

This quiz uses the same dataset and most of the same questions from the earlier "Quiz - Data Wrangling with Data Frames Jupyter Notebook." For this quiz, however, use Spark SQL instead of Spark Data Frames.

In [12]:
from pyspark.sql import SparkSession

# TODOS: 
# 1) import any other libraries you might need
# 2) instantiate a Spark session 
# 3) read in the data set located at the path "data/sparkify_log_small.json"
# 4) create a view to use with your SQL queries
# 5) write code to answer the quiz questions 

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc,asc
from pyspark.sql.functions import sum as Fsum

import numpy as np
import pandas as pd

spark = SparkSession.builder.appName("Quiz_SparkSQL").getOrCreate()
path = "data/sparkify_log_small.json"
dt = spark.read.json(path)

In [13]:
dt.select(['artist']).show(3)

+--------------------+
|              artist|
+--------------------+
|       Showaddywaddy|
|          Lily Allen|
|Cobra Starship Fe...|
+--------------------+
only showing top 3 rows



In [14]:
dt.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [70]:
dt.createOrReplaceTempView("dt_table") #dt_table can be used only in SQL command

# Question 1

Which page did user id ""(empty string) NOT visit?

In [22]:
# TODO: write your code to answer question 1
spark.sql('''SELECT * FROM dt_table WHERE userId IS NULL''') #lazy transform, don't execute yet
spark.sql('''SELECT DISTINCT page FROM dt_table WHERE userId = ""''').show()
#Above are pages user id '' visited!

+-----+
| page|
+-----+
| Home|
|About|
|Login|
| Help|
+-----+



In [65]:
spark.sql('''
SELECT * FROM (SELECT DISTINCT page FROM dt_table WHERE userId = "") AS t1
RIGHT JOIN (SELECT DISTINCT page FROM dt_table) AS t2
ON t1.page = t2.page
WHERE t1.page IS NULL
''').show()

+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



In [63]:
spark.sql('''SELECT DISTINCT page FROM dt_table''').show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



# Question 2 - Reflect

Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

In [24]:
#depends on which one you are more familiar and who you work with
#dataframes are similar to pandas in Python

# Question 3

How many female users do we have in the data set?

In [46]:
# TODO: write your code to answer question 3
spark.sql('''SELECT gender, COUNT(DISTINCT userId) FROM dt_table GROUP BY gender''').show()

+------+----------------------+
|gender|count(DISTINCT userId)|
+------+----------------------+
|     F|                   462|
|  null|                     1|
|     M|                   501|
+------+----------------------+



# Question 4

How many songs were played from the most played artist?

In [75]:
# TODO: write your code to answer question 4
spark.sql('''
SELECT artist, COUNT(artist) as plays
FROM dt_table 
GROUP BY artist 
ORDER BY plays DESC
LIMIT 5
''').show()

+--------------------+-----+
|              artist|plays|
+--------------------+-----+
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
+--------------------+-----+



In [77]:
#using data frame
dt.select(['artist','song']).groupBy('artist').count().orderBy(desc('count')).show(6)

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|                null| 1653|
|            Coldplay|   83|
|       Kings Of Leon|   69|
|Florence + The Ma...|   52|
|            BjÃÂ¶rk|   46|
|       Dwight Yoakam|   45|
+--------------------+-----+
only showing top 6 rows



# Question 5 (challenge)

How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [83]:
# TODO: write your code to answer question 5

#for SQL, use CASE whenever you need to define a new column
#but it won't be saved to the origin table, so need to assign to a new table
is_home = spark.sql('''
SELECT userId, page, ts,
CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home
FROM dt_table
WHERE (page = 'NextSong' OR page = 'Home')
''')

is_home.show(10)

+------+--------+-------------+-------+
|userId|    page|           ts|is_home|
+------+--------+-------------+-------+
|  1046|NextSong|1513720872284|      0|
|  1000|NextSong|1513720878284|      0|
|  2219|NextSong|1513720881284|      0|
|  2373|NextSong|1513720905284|      0|
|  1747|    Home|1513720913284|      1|
|  1162|NextSong|1513720955284|      0|
|  1061|NextSong|1513720959284|      0|
|   748|    Home|1513720959284|      1|
|   597|    Home|1513720980284|      1|
|  1806|NextSong|1513720983284|      0|
+------+--------+-------------+-------+
only showing top 10 rows



In [96]:
# keep the results in a new view (to be called again in SQL command)
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
cumulative_sum = spark.sql('''SELECT *, SUM(is_home) 
OVER (PARTITION BY userID ORDER BY ts DESC) AS period
FROM is_home_table''')

cumulative_sum.filter('userId=2162').show(25) #see example

+------+--------+-------------+-------+------+
|userId|    page|           ts|is_home|period|
+------+--------+-------------+-------+------+
|  2162|NextSong|1513781246284|      0|     0|
|  2162|NextSong|1513781065284|      0|     0|
|  2162|NextSong|1513780860284|      0|     0|
|  2162|NextSong|1513780569284|      0|     0|
|  2162|NextSong|1513780371284|      0|     0|
|  2162|NextSong|1513780156284|      0|     0|
|  2162|NextSong|1513779800284|      0|     0|
|  2162|NextSong|1513779578284|      0|     0|
|  2162|NextSong|1513779348284|      0|     0|
|  2162|NextSong|1513778915284|      0|     0|
|  2162|NextSong|1513778740284|      0|     0|
|  2162|NextSong|1513778523284|      0|     0|
|  2162|NextSong|1513778348284|      0|     0|
|  2162|NextSong|1513778117284|      0|     0|
|  2162|NextSong|1513777906284|      0|     0|
|  2162|NextSong|1513777669284|      0|     0|
|  2162|NextSong|1513777419284|      0|     0|
|  2162|NextSong|1513777095284|      0|     0|
|  2162|NextS

In [114]:
cumulative_sum.createOrReplaceTempView("period_table")

spark.sql('''
SELECT userId,page,period,COUNT(*) FROM period_table GROUP BY userId,page,period HAVING page ='NextSong'
''').show(10)

+------+--------+------+--------+
|userId|    page|period|count(1)|
+------+--------+------+--------+
|  1436|NextSong|     0|       2|
|  2088|NextSong|     1|      13|
|  2162|NextSong|     0|      19|
|  2162|NextSong|     2|      15|
|  2294|NextSong|     0|       4|
|  2294|NextSong|     1|      17|
|  2294|NextSong|     2|       3|
|  2294|NextSong|     3|      16|
|  2294|NextSong|     4|       4|
|  2294|NextSong|     5|      11|
+------+--------+------+--------+
only showing top 10 rows



In [129]:
spark.sql('''
SELECT userId,page,period,COUNT(*) FROM period_table GROUP BY userId,page,period HAVING page ='NextSong'
''').show(10)

+------+--------+------+--------+
|userId|    page|period|count(1)|
+------+--------+------+--------+
|  1436|NextSong|     0|       2|
|  2088|NextSong|     1|      13|
|  2162|NextSong|     0|      19|
|  2162|NextSong|     2|      15|
|  2294|NextSong|     0|       4|
|  2294|NextSong|     1|      17|
|  2294|NextSong|     2|       3|
|  2294|NextSong|     3|      16|
|  2294|NextSong|     4|       4|
|  2294|NextSong|     5|      11|
+------+--------+------+--------+
only showing top 10 rows



In [115]:
spark.sql('''
SELECT AVG(countresults)
FROM (SELECT COUNT(*) AS countresults FROM period_table GROUP BY userId,page,period HAVING page ='NextSong')
''').show()

+-----------------+
|avg(countresults)|
+-----------------+
|6.898347107438017|
+-----------------+

