In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from sql_queries import song_query, artist_query, filter_songplays_query, user_query, time_query, songplay_query


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [3]:
def read_song_data(spark, input_data):
    # get filepath to song data file
    song_data = input_data + "song_data/A/A/B/*.json"
    
    # read song data file
    df = spark.read.json(song_data)
    df.createOrReplaceTempView("songs")
    
    return df

In [4]:
def write_song_data(spark, df, output_data):
    df.createOrReplaceTempView("songs")

    # extract columns to create songs table
    songs_table = spark.sql(song_query)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy("year", "artist_id").parquet(path = output_data + "/songs.parquet", mode = "overwrite")

In [5]:
def read_log_data(spark, input_data):
    # get filepath to log data file
    log_data = input_data + "log_data/2018/11/*"

    # read log data file
    df = spark.read.json(log_data)
    df.createOrReplaceTempView("staging_events")
    
    # filter by actions for song plays
    df = spark.sql(filter_songplays_query)
    
    return df

In [6]:
input_data = "s3a://udacity-dend/"
df = read_song_data(spark, input_data)    

In [7]:
df.show(10)

+------------------+---------------+--------------------+----------------+---------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|    artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+---------------+---------+---------+------------------+--------------------+----+
|ARKIQCZ1187B9A7C7C|       52.23974|Northampton, Nort...|        -0.88576|        Bauhaus|248.65914|        1|SOSIJKW12A8C1330E3|A God In An Alcov...|   0|
|ARI9DQS1187B995625|       44.93746|  Chippewa Falls, WI|       -91.39251|    Judy Henske|139.78077|        1|SODVVEL12A6D4F9EA0|Good Old Wagon (L...|1964|
|AR2L9A61187B9ADDBC|       52.51607|     Berlin, Germany|        13.37698|Tangerine Dream|492.30322|        1|SOKPKMV12A8C14125E|Catwalk (Black In...|1995|
|ARQTC851187B9B03AF|       39.08166|       Rockville, MD|       

In [8]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
write_song_data(spark, df, "s3a://knd-udacity-data-lake-project/test-bucket/")

In [10]:
input_data = "s3a://udacity-dend/"
df_log = read_log_data(spark, "s3a://udacity-dend/")

In [11]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [13]:
df_log.createOrReplaceTempView("staging_events_filtered")

In [14]:
time = spark.sql("SELECT cast(ts/1000 as timestamp) as timestamp FROM staging_events_filtered")

In [15]:
time.show(5)

+--------------------+
|           timestamp|
+--------------------+
|2018-11-15 00:30:...|
|2018-11-15 00:41:...|
|2018-11-15 00:45:...|
|2018-11-15 03:44:...|
|2018-11-15 05:48:...|
+--------------------+
only showing top 5 rows



In [16]:
time.printSchema()

root
 |-- timestamp: timestamp (nullable = true)



In [17]:
time_table = spark.sql("""
            SELECT DISTINCT timestamp as start_time, 
                            hour(timestamp) as hour,
                            day(timestamp) as day,
                            weekofyear(timestamp) as week,
                            month(timestamp) as month,
                            year(timestamp) as year,
                            weekday(timestamp) as weekday
            FROM (
                SELECT CAST(ts/1000 AS Timestamp) AS timestamp
                FROM staging_events_filtered
                )
            """)

In [18]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 20:56:...|  20| 15|  46|   11|2018|      3|
|2018-11-21 05:29:...|   5| 21|  47|   11|2018|      2|
|2018-11-21 09:04:...|   9| 21|  47|   11|2018|      2|
|2018-11-21 20:55:...|  20| 21|  47|   11|2018|      2|
|2018-11-14 07:10:...|   7| 14|  46|   11|2018|      2|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [44]:
test_songplays = spark.sql("""
                SELECT month(CAST(events.ts/1000 AS Timestamp)) as month,
                       year(CAST(events.ts/1000 AS Timestamp)) as year,
                       CAST(events.ts/1000 AS Timestamp) AS timestamp,
                       events.userId, 
                       events.level, 
                       events.sessionId, 
                       events.location, 
                       events.userAgent,
                       events.song,
                       songs.song_id, 
                       songs.artist_id
                FROM staging_events_filtered as events 
                FULL JOIN songs ON events.song = songs.title
                """)

In [45]:
test_songplays.show(100)

+-----+----+--------------------+------+-----+---------+--------------------+--------------------+--------------------+-------+---------+
|month|year|           timestamp|userId|level|sessionId|            location|           userAgent|                song|song_id|artist_id|
+-----+----+--------------------+------+-----+---------+--------------------+--------------------+--------------------+-------+---------+
|   11|2018|2018-11-16 18:45:...|    36| paid|      461|Janesville-Beloit...|"Mozilla/5.0 (Win...|Can't Smile Witho...|   null|     null|
|   11|2018|2018-11-30 05:48:...|    49| paid|     1079|San Francisco-Oak...|Mozilla/5.0 (Wind...|Chloe Dancer/Crow...|   null|     null|
|   11|2018|2018-11-26 19:01:...|    80| paid|      933|Portland-South Po...|"Mozilla/5.0 (Mac...|       Crossing Over|   null|     null|
|   11|2018|2018-11-15 20:55:...|    44| paid|      619|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|Evolution (the Gr...|   null|     null|
|   11|2018|2018-11-20 02:34:...| 

In [19]:
songplays = spark.sql(songplay_query)

In [20]:
songplays.show(20)

+-----+----+---------+------+-----+---------+--------+---------+-------+---------+
|month|year|timestamp|userId|level|sessionId|location|userAgent|song_id|artist_id|
+-----+----+---------+------+-----+---------+--------+---------+-------+---------+
+-----+----+---------+------+-----+---------+--------+---------+-------+---------+



In [22]:
staging_events_filtered = spark.sql("SELECT * FROM staging_events_filtered")

In [24]:
staging_events_filtered.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [31]:
staging_titles = staging_events_filtered.select("song")
staging_titles.createOrReplaceTempView("temp_staging_titles")

In [32]:
song_titles = df.select("title")
song_titles.createOrReplaceTempView("temp_song_titles")

In [33]:
print(staging_titles.printSchema())
print(song_titles.printSchema())

root
 |-- song: string (nullable = true)

None
root
 |-- title: string (nullable = true)

None


In [38]:
staging_titles.show(50)

+--------------------+
|                song|
+--------------------+
|       Sehr kosmisch|
|     The Big Gundown|
|            Marry Me|
|           Blackbird|
|Best Of Both Worl...|
|Call Me If You Ne...|
|                Home|
|                 OMG|
| Candle On The Water|
|            Our Song|
|Baby Boy [feat. B...|
|      Black Hole Sun|
|               Human|
|            Addicted|
|                 Air|
|          Superbeast|
|          I Remember|
|Don't Be Stupid (...|
|We throw parties_...|
|How Can I Live (S...|
|       Thugs Like Me|
| Killing In The Name|
|            Epilogue|
|           Endlessly|
|             Torches|
|The Good Times Ar...|
|     Te Conozco Bien|
|The Calculation (...|
| Goodbye (Interlude)|
|              Canada|
|                 Ven|
|Do You Remember T...|
|I Want Your (Hand...|
|           Chill Out|
|Something (Radio ...|
|What's A Broken H...|
|             Society|
| Christians In Black|
|            Uprising|
|Might Like You Be...|
|   Sheila 

In [39]:
song_titles.show(50)

+--------------------+
|               title|
+--------------------+
|A God In An Alcov...|
|Good Old Wagon (L...|
|Catwalk (Black In...|
|One Shot (Album V...|
|            Floating|
|      Excelsior Lady|
|         Laser Light|
|     A Grande Cacada|
|The Urgency (LP V...|
|What Child Is Thi...|
|           High Tide|
|     Arms and Armour|
|    Auburn and Ivory|
|Sex Is On Everyon...|
|          Our Father|
|       Walk the Walk|
|         Without You|
|       Blood In Face|
|         Siempre Ana|
|               Brain|
|          I Know You|
|        En Que Mujer|
|           China Boy|
|   Caught In A Dream|
|             Monóico|
|            La Culpa|
+--------------------+



In [35]:
spark.sql("""
            SELECT staging.song, song.title
            FROM temp_staging_titles as staging
            JOIN temp_song_titles as song
                ON staging.song = song.title
            """).show(10)

+----+-----+
|song|title|
+----+-----+
+----+-----+

