In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
#instantiate a Spark Session
spark = SparkSession.builder.appName("Data Wrangling with SQL").getOrCreate()

In [4]:
#read in data set located at the path JSON file
path="Data/sparkify_log_small.json"
user_log=spark.read.json(path)

In [6]:
user_log.printSchema()


root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
user_log.take(1)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')]

# Create a View and Run Queries
The code below create a temporary view against which you can run SQL


In [8]:
user_log.createOrReplaceTempView("user_log_table")

In [9]:
spark.sql("SELECT * from user_log_table LIMIT 2").show()

+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+------+
|Showaddywaddy|Logged In|  Kenneth|     M|          112|Matthews|232.93342| paid|Charlotte-Concord...|   PUT|NextSong|1509380319284|     5132|Christmas Tears W...|   200|1513720872284|"Mozilla/5.0 (Win...|  1046|
|   Lily Allen|Logged In|Elizabeth|     F|            7|   Chase|195.23873| free|Shreveport-Bossie...|   PUT|NextSong|1512718541284|     5027|      

In [10]:
spark.sql("SELECT COUNT(*) FROM user_log_table").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [11]:
spark.sql('''
          SELECT userID, firstname, page, song
          FROM user_log_table 
          WHERE userID == '1046'
          '''
          ).collect()

[Row(userID='1046', firstname='Kenneth', page='NextSong', song='Christmas Tears Will Fall'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Be Wary Of A Woman'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Public Enemy No.1'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Reign Of The Tyrants'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Father And Son'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='No. 5'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Seventeen'),
 Row(userID='1046', firstname='Kenneth', page='Home', song=None),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='War on war'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Killermont Street'),
 Row(userID='1046', firstname='Kenneth', page='NextSong', song='Black & Blue'),
 Row(userID='1046', firstname='Kenneth', page='Logout', song=None),
 Row(userID='1046', firstname='Kenneth'

In [12]:
spark.sql('''
          SELECT DISTINCT page
          FROM user_log_table 
          ORDER BY page ASC
          '''
          ).show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



# User Define Functions


In [13]:
spark.udf.register("get_hour",lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

<function __main__.<lambda>(x)>

In [14]:
spark.sql("SELECT *,get_hour(ts) AS hour FROM user_log_table LIMIT 1").collect()

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='0')]

In [15]:
song_in_hour = spark.sql("""
    SELECT get_hour(ts) AS hour, count(*) as plays_per_hour
    FROM user_log_table
    WHERE page = "NextSong"
    GROUP by hour
    ORDER BY cast(hour as int) ASC
""")
song_in_hour.show()

+----+--------------+
|hour|plays_per_hour|
+----+--------------+
|   0|           369|
|   1|           375|
|   2|           456|
|   3|           454|
|   4|           382|
|   5|           302|
|   6|           352|
|   7|           276|
|   8|           348|
|   9|           358|
|  10|           375|
|  11|           249|
|  12|           216|
|  13|           228|
|  14|           251|
|  15|           339|
|  16|           462|
|  17|           479|
|  18|           484|
|  19|           430|
+----+--------------+
only showing top 20 rows



# convert result to Pandas


In [16]:
song_in_hour_pd = song_in_hour.toPandas()

In [17]:
print(song_in_hour_pd)

   hour  plays_per_hour
0     0             369
1     1             375
2     2             456
3     3             454
4     4             382
5     5             302
6     6             352
7     7             276
8     8             348
9     9             358
10   10             375
11   11             249
12   12             216
13   13             228
14   14             251
15   15             339
16   16             462
17   17             479
18   18             484
19   19             430
20   20             362
21   21             295
22   22             257
23   23             248


In [5]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [25]:
#q1: Which page did user empty string NOT visit
#create two columns, 1 page is from a table where user ID = '' and check 'page' with
# another table where page of that table list with distint value (no duplicates)
spark.sql("SELECT * \
            FROM ( \
                SELECT DISTINCT page \
                FROM user_log_table \
                WHERE userID='') AS user_pages\
                RIGHT JOIN (\
                SELECT DISTINCT page\
                FROM user_log_table) AS all_pages\
                ON user_pages.page=all_pages.page\
                WHERE user_pages.page IS NULL").show()


+----+----------------+
|page|            page|
+----+----------------+
|null|Submit Downgrade|
|null|       Downgrade|
|null|          Logout|
|null|   Save Settings|
|null|        Settings|
|null|        NextSong|
|null|         Upgrade|
|null|           Error|
|null|  Submit Upgrade|
+----+----------------+



In [32]:
#q2: How many female users do we have in the data set?
spark.sql("SELECT count(*)\
            FROM (\
                SELECT DISTINCT userID\
                FROM user_log_table\
                WHERE gender = 'F')").show()

+--------+
|count(1)|
+--------+
|     462|
+--------+



In [37]:
#q3: How many songs were played from the most played artist?
spark.sql("""
SELECT count(artist) as plays
FROM user_log_table
GROUP BY artist
ORDER BY plays desc
LIMIT 1
""").show()

+-----+
|plays|
+-----+
|   83|
+-----+



In [40]:
#another solution 
#Get the artist play counts
play_counts = spark.sql("SELECT Artist, COUNT(Artist) AS plays \
        FROM user_log_table \
        GROUP BY Artist")

# save the results in a new view
play_counts.createOrReplaceTempView("artist_counts")

# use a self join to find where the max play equals the count value
spark.sql("SELECT a2.Artist, a2.plays FROM \
          (SELECT max(plays) AS max_plays FROM artist_counts) AS a1 \
          JOIN artist_counts AS a2 \
          ON a1.max_plays = a2.plays \
          ").show()

+--------+-----+
|  Artist|plays|
+--------+-----+
|Coldplay|   83|
+--------+-----+



In [44]:
#q4: How many songs do users listen to on average between visiting our home page? 
# Please round your answer to the closest integer.
# SELECT CASE WHEN 1 > 0 THEN 1 WHEN 2 > 0 THEN 2.0 ELSE 1.2 END;
is_home = spark.sql("SELECT userID, page, ts, CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS is_home FROM user_log_table \
            WHERE (page = 'NextSong') or (page = 'Home') \
            ")

# keep the results in a new view
is_home.createOrReplaceTempView("is_home_table")

# find the cumulative sum over the is_home column
cumulative_sum = spark.sql("SELECT *, SUM(is_home) OVER \
    (PARTITION BY userID ORDER BY ts DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS period \
    FROM is_home_table")

# keep the results in a view
cumulative_sum.createOrReplaceTempView("period_table")

# find the average count for NextSong
spark.sql("SELECT ROUND(AVG(count_results)) as Average_Num_Song FROM \
          (SELECT COUNT(*) AS count_results FROM period_table \
GROUP BY userID, period, page HAVING page = 'NextSong') AS counts").show()

+----------------+
|Average_Num_Song|
+----------------+
|             7.0|
+----------------+

