In [1]:
import configparser
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear
import pyspark.sql.functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
from datetime import datetime

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession \
        .builder \
        .appName("Data Lake with Spark Project") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.endpoint", "s3-us-west-2.amazonaws.com") \
        .getOrCreate()

In [5]:
spark

In [6]:
song_data = 's3a://udacity-dend/song_data/A/A/*/*.json'

In [7]:
song_df = spark.read.json(song_data)

In [8]:
song_df.take(1)

[Row(artist_id='ARSUVLW12454A4C8B8', artist_latitude=35.83073, artist_location='Tennessee', artist_longitude=-85.97874, artist_name='Royal Philharmonic Orchestra/Sir Thomas Beecham', duration=94.56281, num_songs=1, song_id='SOBTCUI12A8AE48B70', title='Faust: Ballet Music (1959 Digital Remaster): VI.     Variations du miroir (Allegretto)', year=0)]

In [9]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [10]:
songs_table = song_df.select("song_id", "title", "artist_id", "year", "duration").distinct()

In [11]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOHQZIB12A6D4F9FAF|N****_ What's Up ...|ARWAFY51187FB5C4EF|2006|196.85832|
|SOCOHAX12A8C13B6B2|Walking With The ...|ARE5F2F1187B9AB7E9|1966|152.16281|
|SOKNGDE12AB017CA4D| Step Into Your Skin|ARE4SDM1187FB4D7E4|   0|139.72853|
|SOQBZDP12AB0180E28|   Depths Of Bavaria|ARWRO6T1187B98C5D6|2008| 257.4624|
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [12]:
songs_table.write.partitionBy("year","artist_id").mode("overwrite").parquet("s3a://foustawsbucket/output/songs_table.parquet")

In [13]:
artists_table = song_df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude").distinct()

In [14]:
artists_table.show(5)

+------------------+-----------------+--------------------+---------------+----------------+
|         artist_id|      artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+-----------------+--------------------+---------------+----------------+
|AR1S3NH1187B98C2BC|        Anthony B|Clarks Town, Jamaica|           null|            null|
|ARPIKA31187FB4C233|       The Action|            New York|       40.71455|       -74.00712|
|ARYL56G11C8A41634E|    Mick Flannery|                    |           null|            null|
|AR1XL241187FB3F4AB|Nortec Collective|                    |           null|            null|
|ARMI4NV1187B99D55D|          Man Man|    Philadelphia, PA|       39.95227|       -75.16237|
+------------------+-----------------+--------------------+---------------+----------------+
only showing top 5 rows



In [15]:
artists_table.write.mode("overwrite").parquet("s3a://foustawsbucket/output/artists_table.parquet")

In [16]:
log_data = 's3a://udacity-dend/log_data/2018/11/*.json'

In [17]:
log_df = spark.read.json(log_data)

In [18]:
log_df.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [19]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [20]:
log_df = log_df.filter(log_df.page == 'NextSong')

In [21]:
users_table = log_df.select("userId", "firstName", "lastName", "gender", "level").distinct()

In [22]:
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    57|Katherine|     Gay|     F| free|
|    84|  Shakira|    Hunt|     F| free|
|    22|     Sean|  Wilson|     F| free|
|    52| Theodore|   Smith|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [23]:
users_table.write.mode("overwrite").parquet("s3a://foustawsbucket/output/users_table.parquet")

In [24]:
log_df = log_df.withColumn("ts", (F.round(col('ts')/1000)).cast(TimestampType()))

In [25]:
time_table = log_df.selectExpr("ts AS start_time") \
            .withColumn("hour", F.hour("start_time")) \
            .withColumn("day", F.dayofmonth("start_time")) \
            .withColumn("week", F.weekofyear("start_time")) \
            .withColumn("month", F.month("start_time")) \
            .withColumn("year", F.year("start_time")) \
            .withColumn("weekday", F.dayofweek("start_time")).distinct()

In [26]:
time_table.show(5)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 17:19:14|  17| 15|  46|   11|2018|      5|
|2018-11-15 18:31:39|  18| 15|  46|   11|2018|      5|
|2018-11-21 17:17:48|  17| 21|  47|   11|2018|      4|
|2018-11-14 00:53:44|   0| 14|  46|   11|2018|      4|
|2018-11-14 12:00:45|  12| 14|  46|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [27]:
time_table.write.partitionBy("year","month").mode("overwrite").parquet("s3a://foustawsbucket/output/time_table.parquet")

In [28]:
songplays_table = log_df.join(song_df, (log_df.artist == song_df.artist_name)\
                                        & (log_df.song == song_df.title)\
                                        & (log_df.length == song_df.duration))\
                              .join(time_table,(log_df.ts == time_table.start_time))\
                              .select(time_table.start_time,"userId", "level", "song_id", \
                                      "artist_id", "sessionId", "location", "userAgent",time_table.year,"month")   
    
songplays_table = songplays_table.withColumn("col_id", F.monotonically_increasing_id())
window = Window.orderBy(F.col('col_id'))
songplays_table = songplays_table.withColumn("songplay_id",F.row_number().over(window))\
                  .select("songplay_id","start_time","userId","level","song_id","artist_id","sessionId","location", "userAgent","year","month")

In [29]:
songplays_table.show(5)

+-----------+-------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|songplay_id|         start_time|userId|level|           song_id|         artist_id|sessionId|            location|           userAgent|year|month|
+-----------+-------------------+------+-----+------------------+------------------+---------+--------------------+--------------------+----+-----+
|          1|2018-11-21 21:56:48|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|      818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
|          2|2018-11-05 17:49:43|    73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|      255|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|2018|   11|
|          3|2018-11-13 22:39:40|    55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|      415|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|2018|   11|
|          4|2018-11-16 14:21:13|    85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|      436|       Red Bluff, 

In [30]:
songplays_table.write.partitionBy("year","month").mode("overwrite").parquet("s3a://foustawsbucket/output/songplays_table.parquet")

# SQL examples for song analysis:

In [31]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_longitude: double (nullable = true)



In [32]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [33]:
artists_table.createOrReplaceTempView("artists_view")

In [34]:
songs_table.createOrReplaceTempView("songs_view")

### List of artists and songs (up to 20 records)

In [36]:
spark.sql("SELECT * FROM artists_view JOIN songs_view on songs_view.artist_id = artists_view.artist_id").show()

+------------------+-----------------+--------------------+---------------+----------------+------------------+--------------------+------------------+----+---------+
|         artist_id|      artist_name|     artist_location|artist_latitude|artist_longitude|           song_id|               title|         artist_id|year| duration|
+------------------+-----------------+--------------------+---------------+----------------+------------------+--------------------+------------------+----+---------+
|AR1S3NH1187B98C2BC|        Anthony B|Clarks Town, Jamaica|           null|            null|SONHGLD12AB0188D47|          Our Father|AR1S3NH1187B98C2BC|1999| 202.4224|
|ARPIKA31187FB4C233|       The Action|            New York|       40.71455|       -74.00712|SOPVNTL12AB01854F9|               Brain|ARPIKA31187FB4C233|   0|180.32281|
|ARYL56G11C8A41634E|    Mick Flannery|                    |           null|            null|SOVSKKG12A81C21A77|       Grace's Waltz|ARYL56G11C8A41634E|   0|153.93914

### List of distinct titles for artist name "Jamie Cullum" 

In [37]:
spark.sql("SELECT distinct songs_view.title FROM songs_view JOIN artists_view on songs_view.artist_id = artists_view.artist_id " \
          "WHERE artists_view.artist_name = 'Jamie Cullum'").show()

+---------------+
|          title|
+---------------+
|         Oh God|
|It's About Time|
+---------------+

