# SETUP

In [1]:
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring_index, col
import os

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get("AWS","AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY']=config.get("AWS","AWS_SECRET_ACCESS_KEY")


def create_spark_session():
    """
    This function creates a spark session. 
    
    NO INPUTS
    
    OUTPUT: returns a variable referencing the created session.
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark



In [3]:
spark = create_spark_session()

In [6]:
# Read the processed files in S3 output bucket
output_data = "s3a://udacityvitorsparkfy/"
songplay_df = spark.read.parquet(output_data+'songplay/')
song_df= spark.read.parquet(output_data+'songs/')

## Example query

### Query 1: What song was most played in the state with the most users?

In [9]:
#To ease the query, extract state from location column
songplay_df=songplay_df.withColumn('State', substring_index(col('location'),',',-1))

In [10]:
songplay_df.show(5)

+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+-----+
|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|State|
+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+-----+
|2018-11-11 15:00:...|     67| free|SOCHRXB12A8AE48069|ARTDQRC1187FB4EFD4|       414|Nashville-Davidso...|"Mozilla/5.0 (Mac...|2018|   11|   TN|
|2018-11-15 13:17:...|     30| paid|SOUPKAB12AB0185DF9|ARAGJTD1187B9A8646|       324|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|2018|   11|   CA|
|2018-11-15 14:46:...|     30| paid|SOTCOTZ12A8C136BCB|AR7WK5411A348EF5EA|       324|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|2018|   11|   CA|
|2018-11-15 16:19:...|     97| paid|SOBLFFE12AF72AA5BA|ARJNIUY12298900C91|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|201

In [11]:
songplay_df.createOrReplaceTempView('songplay')
song_df.createOrReplaceTempView('songs')

In [13]:
spark.sql(""" SELECT s.title, COUNT(*) as times_played, sp.State
                FROM songplay sp
                JOIN songs s ON s.song_id = sp.song_id
                WHERE sp.State= (
                                SELECT State 
                                FROM(
                                        SELECT State, COUNT(DISTINCT user_id)
                                        FROM songplay
                                        GROUP BY 1
                                        ORDER BY 2 DESC
                                    )
                                LIMIT 1
                                )
                GROUP BY 1
                ORDER BY 2 DESC
        """).show(5)

+--------------------+------------+
|               title|times_played|
+--------------------+------------+
|Let's Get It Started|           2|
|The Boy With The ...|           1|
|                Sick|           1|
|               Smile|           1|
|        Intermission|           1|
+--------------------+------------+
only showing top 5 rows

