# Data Wrangling with Spark SQL

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)
user_log.createOrReplaceTempView("user_log_table")

Which page did user id ""(empty string) NOT visit?

In [38]:
rows = spark.sql("""
                    SELECT DISTINCT ul1.page FROM user_log_table ul1
                    LEFT ANTI JOIN (
                        SELECT DISTINCT page FROM user_log_table
                        WHERE user_log_table.userId = ''
                    ) ul2 ON ul1.page = ul2.page
                 """).collect()
pages = [row.page for row in rows]
print('userId "" did not visit pages: {}'.format(pages))

userId "" did not visit pages: ['Submit Downgrade', 'Downgrade', 'Logout', 'Save Settings', 'Settings', 'NextSong', 'Upgrade', 'Error', 'Submit Upgrade']


Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

# How many female users do we have in the data set?

In [29]:
row = spark.sql("""
                    SELECT COUNT(DISTINCT(userId)) AS count FROM user_log_table
                    WHERE gender='F'
                 """).collect()
count = row[0][0]
print('There are {} female users'.format(count))

There are 462 female users


# How many songs were played from the most played artist?

In [30]:
row = spark.sql("""
                    (SELECT artist, COUNT(song) AS count FROM user_log_table
                    GROUP BY artist
                    ORDER BY count DESC LIMIT 1)
                 """).collect()
count = row[0][0]
print('{} songs were played from the most played artist: {}'.format(row[0][1], row[0][0]))

83 songs were played from the most played artist: Coldplay


# How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [32]:
# SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
is_home = spark.sql("SELECT userID, page, ts, CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home FROM user_log_table \
            WHERE (page = 'NextSong') or (page = 'Home') \
            ")

# keep the results in a new view
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
cumulative_sum = spark.sql("SELECT *, SUM(is_home) OVER \
    (PARTITION BY userID ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period \
    FROM is_home_table")

# keep the results in a view
cumulative_sum.createOrReplaceTempView("period_table")

# find the average count for NextSong
spark.sql("SELECT AVG(count_results) FROM \
          (SELECT COUNT(*) AS count_results FROM period_table \
GROUP BY userID, period, page HAVING page = 'NextSong') AS counts").show()

+------------------+
|avg(count_results)|
+------------------+
| 6.898347107438017|
+------------------+

