# Spark SQL

A declarative approach to query and wrangle data in Spark is by using SQL-like language.

This approach, let us to make more easy and shareable our big data job with other data scientist, data engineering, data anslysts, etc.

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("using Spark SQL").getOrCreate()
# data path
DATA_PATH = '../../data/sparkify_log_small_2.json'
# read data
user_log = spark.read.json(DATA_PATH)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/12 03:55:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/12 03:55:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
# create a temporary view
user_log.createOrReplaceTempView("user_log_table")

## Question 1
### Which page did user id ""(empty string) NOT visit?

Fisrt lets see all page posibilities

In [36]:
pages = """
SELECT DISTINCT page
FROM user_log_table
"""

spark.sql(pages).show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



Then we show all pages that _empty user_ have visited

In [37]:
empty_user_visited_pages = """
    SELECT DISTINCT page
    FROM user_log_table
    WHERE userId == ''
"""
spark.sql(empty_user_visited_pages).show()

+-----+
| page|
+-----+
| Home|
|About|
|Login|
| Help|
+-----+



Find all pages in `pages` that are not in `empty_user_visited_pages`

In [40]:
not_visited_by_empty_user = """
    SELECT all_pages.page
    FROM ({pages}) all_pages
    LEFT JOIN ({empty_user_visited_pages}) blank_pages
    ON all_pages.page = blank_pages.page
    WHERE blank_pages.page IS NULL
    ORDER BY page ASC
""".format(pages=pages, empty_user_visited_pages=empty_user_visited_pages)
spark.sql(not_visited_by_empty_user).show()

+----------------+
|            page|
+----------------+
|       Downgrade|
|           Error|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



Thus, the pages that empty user did not visited was:
* Downgrade
* Error
* Logout
* Nextsong
* Save Settings
* Settings
* Submit Downgrade
* Submit Upgrade
* Upgrade

## Question 2 - Reflect
### Why might you prefer to use SQL over data frames? Why might you prefer data frames over SQL?

**Why SQL over dataframes?**
* Becuase it is easier to use when you already know SQL.
* Your teamates like data engineers, data analysts or data scientist can understand more

**Why dataframes over SQL?**
*

## Question 3
### How many female users do we have in the data set?

In [44]:
female_users_count = """
    SELECT COUNT(DISTINCT userID) AS distinct_females
    FROM user_log_table
    WHERE gender == 'F'
"""
spark.sql(female_users_count).show()

+----------------+
|distinct_females|
+----------------+
|             462|
+----------------+



## Question 4

### How many songs were played from the most played artist?

In [49]:
top_artist = """
    SELECT artist, COUNT(page) AS played_songs
    FROM user_log_table
    WHERE page = 'NextSong'
    GROUP BY artist
    ORDER BY played_songs DESC
    LIMIT 1
"""
result = spark.sql(top_artist)
result.show()

+--------+------------+
|  artist|played_songs|
+--------+------------+
|Coldplay|          83|
+--------+------------+



## Question 5 (challenge)
### How many songs do users listen to on average between visiting our home page? Please round your answer to the closest integer.

In [53]:
query = """
WITH cumsum AS(
    SELECT userId, page, ts,
    CASE WHEN page = 'Home' THEN 1 ELSE 0 END AS homevisit,
    SUM(CASE WHEN page = 'Home' THEN 1 ELSE 0 END) OVER(
        PARTITION BY userId ORDER BY ts DESC
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS period
    FROM user_log_table
    WHERE page IN ('Home', 'NextSong')
)

SELECT AVG(song_count)
FROM (
    SELECT userId, COUNT(period) AS song_count
    FROM cumsum
    WHERE page = 'NextSong'
    GROUP BY userId, period
)
"""
result5 = spark.sql(query)
result5.show()

+-----------------+
|  avg(song_count)|
+-----------------+
|6.898347107438017|
+-----------------+



In [50]:
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

