# Data Lake on S3 with spark code execution on AWS-EMR

In [1]:
from pyspark.sql import SparkSession
import os
import configparser

# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# To run the code in local
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell"

# Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load songs data from S3

In [4]:
# Read all the json files under song-data repository
# since we know the directory structure we can use *
df = spark.read.json("data/song-data/*/*/*/*.json")

# print schema
df.printSchema()
# print 5 records
df.show(5)
# Print total number of records
df.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni..

904

In [5]:
# Create a dataframe songs_table with the required fields from existind dataframe
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
songs_table.show(5)

+------------------+--------------------+------------------+----+----------+
|           song_id|               title|         artist_id|year|  duration|
+------------------+--------------------+------------------+----+----------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|ARSUVLW12454A4C8B8|   0|  94.56281|
|SOXKFTF12A6D4FBF31|Isle Of Innisfree...|ARA04401187B991E6E|   0| 184.16281|
|SOXRPUH12AB017F769|Exodus: Part I: M...|ARXQC081187FB4AD42|   0|1047.71873|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|ARWUNH81187FB4A3E0|2001| 227.10812|
|SOPEJZP12A8C1369E6|He's Got The Whol...|ARNU0OM1187FB3F14A|   0|  90.04363|
+------------------+--------------------+------------------+----+----------+
only showing top 5 rows



In [6]:
# Partition the data frame based on year and artist_id and write it to parquet file
songs_table.write.partitionBy(["year", "artist_id"]).parquet("songs.parquet")

In [7]:
# Create a dataframe artists_table with the required fields from existind dataframe
artists_table = df.select("artist_id", "artist_name", "artist_longitude", "artist_latitude", "artist_location")
artists_table.show(5)

+------------------+--------------------+----------------+---------------+--------------------+
|         artist_id|         artist_name|artist_longitude|artist_latitude|     artist_location|
+------------------+--------------------+----------------+---------------+--------------------+
|ARSUVLW12454A4C8B8|Royal Philharmoni...|       -85.97874|       35.83073|           Tennessee|
|ARA04401187B991E6E|JOSEF LOCKE & ORC...|        -7.31923|       54.99241|Londonderry, Nort...|
|ARXQC081187FB4AD42|William Shatner_ ...|        -2.23001|       54.31407|                  UK|
|ARWUNH81187FB4A3E0|         Trick Daddy|            null|           null|     Miami , Florida|
|ARNU0OM1187FB3F14A|Larry Groce/Disne...|        -96.7954|       32.77815|          Dallas, TX|
+------------------+--------------------+----------------+---------------+--------------------+
only showing top 5 rows



In [8]:
# Write the data frame to the parquet file
artists_table.write.parquet("artists.parquet")

# Load logs data from S3

In [9]:
# Read all the json files under logs-data repository
# since we know the directory structure we can use *
df = spark.read.json("data/log-data/*/*/*.json")

In [10]:
df.printSchema()
df.show(5)
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

8056

In [11]:
# Filter records based on NextSong and rename the columns as per requirements
df = df.filter("page='NextSong'")\
        .withColumnRenamed("userId", "user_id")\
        .withColumnRenamed("firstName", "first_name")\
        .withColumnRenamed("lastName", "last_name")\
        .withColumnRenamed("sessionId", "session_id")\
        .withColumnRenamed("userAgent", "user_agent")

In [12]:
# Create a dataframe users_table with the required fields from existind dataframe
users_table = df.select("user_id", "first_name", "last_name", "gender", "level")
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [13]:
# Write the data frame to the parquet file
users_table.write.parquet("users.parquet")

# Convert the epoch milliseconds to timestamp

In [14]:
import  pyspark.sql.functions as F

# udf method to convert millisecnds to epoch unix time
get_timestamp = F.udf(lambda x: int(int(x)/1000))

# Add the required columns for time table in the existing data frame
df = df.withColumn("timestamp", get_timestamp("ts"))\
        .withColumn("datetime", F.from_unixtime("timestamp", "MM-dd-yyyy HH:mm:ss"))\
        .withColumn("start_time", F.to_timestamp("datetime", "MM-dd-yyyy HH:mm:ss"))\
        .withColumn("month", F.month("start_time"))\
        .withColumn("year", F.year("start_time"))\
        .withColumn("week", F.weekofyear("start_time"))\
        .withColumn("day", F.dayofmonth("start_time"))\
        .withColumn("weekday", F.dayofweek("start_time"))\
        .withColumn("hour", F.hour("start_time"))

In [15]:
# Create a dataframe time_table with the required fields from existind dataframe
time_table = df.select("start_time", "month", "year", "week", "day", "weekday", "hour")
time_table.show(5, truncate = False)

+-------------------+-----+----+----+---+-------+----+
|start_time         |month|year|week|day|weekday|hour|
+-------------------+-----+----+----+---+-------+----+
|2018-11-14 17:30:26|11   |2018|46  |14 |4      |17  |
|2018-11-14 17:41:21|11   |2018|46  |14 |4      |17  |
|2018-11-14 17:45:41|11   |2018|46  |14 |4      |17  |
|2018-11-14 20:44:09|11   |2018|46  |14 |4      |20  |
|2018-11-14 22:48:55|11   |2018|46  |14 |4      |22  |
+-------------------+-----+----+----+---+-------+----+
only showing top 5 rows



In [16]:
# Partition the data frame based on year and month and write it to parquet file
time_table.write.partitionBy(["year", "month"]).parquet("time.parquet")

In [17]:
songs_df = spark.read.parquet("songs.parquet")
songs_df.show(5)

+------------------+--------------------+---------+----+------------------+
|           song_id|               title| duration|year|         artist_id|
+------------------+--------------------+---------+----+------------------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...| 94.56281|   0|ARSUVLW12454A4C8B8|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|227.10812|2001|ARWUNH81187FB4A3E0|
|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|221.70077|   0|ARLRWBW1242077EB29|
|SODBHKO12A58A77F36|Fingers Of Love (...|335.93424|   0|ARKGS2Z1187FB494B5|
|SOGXFIF12A58A78CC4|Hanging On (Mediu...|204.06812|   0|AR5LZJD1187FB4C5E5|
+------------------+--------------------+---------+----+------------------+
only showing top 5 rows



# Create song plays table by joining songs_table and current dataframe

In [18]:
# Create views to perform sql query
songs_df.createOrReplaceTempView("songs_data")
df.createOrReplaceTempView("logs_data")

# SQL query to join data from different dataframes and create a new data frame from it
songplays_table = spark.sql("""
                    SELECT DISTINCT start_time, user_id, level, song_id, artist_id,
                                    session_id, location, user_agent, logs.year, month
                    FROM logs_data as logs
                    LEFT OUTER JOIN songs_data as songs
                    ON logs.song = songs.title
                    AND logs.length = songs.duration
                    """)

# Create primary key for the songplays_table

In [19]:
# Create a column songplays_id and assign it values using monotonically_increasing_id method
songplays_table = songplays_table.withColumn("songplays_id", F.monotonically_increasing_id())

songplays_table.show(5)

+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+------------+
|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|songplays_id|
+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+------------+
|2018-11-15 04:08:08|     80| paid|   null|     null|       611|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|           0|
|2018-11-15 05:14:26|     80| paid|   null|     null|       611|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|           1|
|2018-11-15 16:46:20|     44| paid|   null|     null|       637|Waterloo-Cedar Fa...|Mozilla/5.0 (Maci...|2018|   11|           2|
|2018-11-20 20:16:43|     80| paid|   null|     null|       774|Portland-South Po...|"Mozilla/5.0 (Mac...|2018|   11|           3|
|2018-11-21 11:29:12|     97| paid|   null|     null|       817|Lansing-East Lans..

In [20]:
# Partition the data frame based on year and month and write it to parquet file
songplays_table.write.partitionBy(["year", "month"]).parquet("songplays.parquet")