# Running Data Lake using Sparks

Within this exercise, I will be showing you how easy it is to use Spark to process the data from S3, wrangle the data using Spark and load it back to an S3 bucket by using a partition. The great thing about this process is the fact that the data is not living in a database, instead, it is within an S3 bucket.

## Step 1: Import the necessary libraries such as pyspark, configparser, and os

In [1]:
import configparser
from datetime import datetime
import os
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth as day, hour, weekofyear as week, dayofweek as weekday, date_format
from pyspark.sql.types import StructType as ST, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, StringType as Str, LongType as Lng, TimestampType as Tst

config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()
    return spark

spark = create_spark_session()

## Step 2: Set up the Log Data and Song Data Schemas

Within this process, I set up the schema and the data types for each of the column. This will ensure that everything will load up properly with the correct data types.

In [3]:
logSchema = ST([
 Fld('artist', Str()),
 Fld('auth', Str()),
 Fld('firstName', Str()),
 Fld('gender', Str()),
 Fld('itemInSession', Lng()),
 Fld('lastName', Str()),
 Fld('length', Dbl()),
 Fld('level', Str()),
 Fld('location', Str()),
 Fld('method', Str()),
 Fld('page', Str()),
 Fld('registration', Dbl()),
 Fld('sessionId', Lng()),
 Fld('song', Str()),
 Fld('status', Lng()),
 Fld('ts', Lng()),
 Fld('userAgent', Str()),
 Fld('userId', Str())
])

songSchema = ST([
    Fld("num_songs",Int()),
    Fld("artist_id",Str()),
    Fld("artist_latitude",Dbl()),
    Fld("artist_longitude",Dbl()),
    Fld("artist_location",Str()),
    Fld("artist_name",Str()),
    Fld("song_id",Str()),
    Fld("title",Str()),
    Fld("duration",Dbl()),
    Fld("year",Int())
])

# Step 3: Start the data ingestions

Here, we will import all of the json files. For this exercise, we will import the data locally instead of the S3. Reason being is because it took quite awhile to finish, while the local is a whole lot faster. Go ahead and uncomment some of the line to try to download some of the data from several different options (S3, Local with small songs data, or Local with all of the songs data).

In [5]:
# UNCOMMENT BELOW FOR LOADING FROM S3 ALL SONGS DATA
# raw_song_df = spark.read.json('s3a://udacity-dend/song-data', songSchema)

# UNCOMMENT BELOW FOR LOADING FROM LOCAL ALL SONGS DATA
# raw_song_df = spark.read.json('/home/workspace/data/Songs_Data/*.json', songSchema)

# UNCOMMENT BELOW FOR LOADING FROM LOCAL SMALL SONGS DATA
raw_song_df = spark.read.json('/home/workspace/data/song_data/*/*/*/*.json', songSchema)


# UNCOMMENT BELOW FOR LOADING FROM S3 ALL LOG DATA
# raw_log_df = spark.read.json('s3a://udacity-dend/log-data', logSchema)

# UNCOMMENT BELOW FOR LOADING FROM LOCAL SMALL LOG DATA
raw_log_df = spark.read.json('/home/workspace/data/log_data', logSchema)


# Let's verify that the data came in by checking the number of rows
count_songs = raw_song_df.count()
count_logs = raw_log_df.count()

print(count_songs, ' ', count_logs)

71   8056


## Step 4a: Using raw_song_df data to create songs_table and artists_table

Below, we will be creating the songs_table and artists_table by using raw_song_df that we have created earlier before this step

In [6]:
songs_table = raw_song_df.select(raw_song_df.song_id, \
                                 raw_song_df.title, \
                                 raw_song_df.artist_id, \
                                 raw_song_df.year.cast(Int()), \
                                 raw_song_df.duration.cast(Dbl()))

artists_table = raw_song_df.select(raw_song_df.artist_id , \
                                  raw_song_df.artist_latitude.alias('latitude'), \
                                  raw_song_df.artist_location.alias('location'), \
                                  raw_song_df.artist_longitude.alias('longitude'), \
                                  raw_song_df.artist_name.alias('name')).dropDuplicates(['artist_id','name'])

## Step 4b: Using raw_log_df data to create users_table, time_table, and songplays_table

Below, we will be creating the users_table, time_table, and songplays_table by using raw_log_df that we have created earlier before this step

In [7]:
# We'll be filtering the raw_log_df data frame to only include rows and columns where the page shows 'NextSong'
df_l = raw_log_df.where(raw_log_df.page == 'NextSong')

# During this phase, we will convert the 'ts' column to a timestamp column type and convert the epoch time 
# by using pyspark function called to_timestamp
df_l = df_l.withColumn('ts',f.to_timestamp(df_l.ts / 1000).cast(Tst()))

# Once the code above completed, we will now join df_l and the raw_song_df to create the rest of the tables
j_tbl = df_l.join(raw_song_df, [raw_song_df.title == df_l.song, df_l.artist == raw_song_df.artist_name])

users_table = df_l.select(df_l.userId.alias('user_id').cast(Int()), \
                          df_l.firstName.alias('first_name'), \
                          df_l.lastName.alias('last_name'), \
                          df_l.gender, \
                          df_l.level).dropDuplicates(['user_id'])

time_table = df_l.select(df_l.ts.alias('start_time'), \
                        hour('ts').alias('hour'), \
                        day('ts').alias('day'), \
                        week('ts').alias('week'), \
                        month('ts').alias('month'), \
                        year('ts').alias('year'), \
                        weekday('ts').alias('weekday')).distinct()

songplays_table = j_tbl.select(f.monotonically_increasing_id().alias('songplay_id'), \
                               j_tbl.ts.alias('start_time'), \
                               year(j_tbl.ts).alias('year'), \
                               month(j_tbl.ts).alias('month'), \
                               j_tbl.userId.alias('user_id').cast(Int()), \
                               j_tbl.level, \
                               j_tbl.song_id, \
                               j_tbl.artist_id, \
                               j_tbl.sessionId.alias('session_id').cast(Int()), \
                               j_tbl.artist_location.alias('location'), \
                               j_tbl.userAgent.alias('user_agent'))

## Step 5: Printing Schema and the data for each of the tables that we've created

In [8]:
songplays_table.printSchema()
songplays_table.limit(2).toPandas()

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,0,2018-11-21 21:56:47.796,2018,11,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [9]:
users_table.printSchema()
users_table.limit(2).toPandas()

root
 |-- user_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



Unnamed: 0,user_id,first_name,last_name,gender,level
0,85,Kinsley,Young,F,paid
1,65,Amiya,Davidson,F,paid


In [10]:
artists_table.printSchema()
artists_table.limit(2).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- location: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- name: string (nullable = true)



Unnamed: 0,artist_id,latitude,location,longitude,name
0,ARB29H41187B98F0EF,41.88415,Chicago,-87.63241,Terry Callier
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran)


In [11]:
songs_table.printSchema()
songs_table.limit(2).toPandas()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,ARDR4AC1187FB371A1,0,511.16363
1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),AREBBGV1187FB523D2,0,173.66159


In [12]:
time_table.printSchema()
time_table.limit(2).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 14:09:23.796,14,15,46,11,2018,5
1,2018-11-15 15:24:07.796,15,15,46,11,2018,5


## Step 6: Saving the files to Parquet

Line 13 - 17 shows you how we can save all of the files as parquet, partition them where it is necessary, and load them back to S3

In [13]:
songplays_table.write.mode('overwrite').partitionBy('year','month').parquet('s3a://result-dan-udacity-bucket/Data/songplays')

In [14]:
users_table.write.mode('overwrite').parquet('s3a://result-dan-udacity-bucket/Data/users')

In [15]:
artists_table.write.mode('overwrite').parquet('s3a://result-dan-udacity-bucket/Data/artist')

In [16]:
songs_table.write.mode('overwrite').partitionBy('year','artist_id').parquet('s3a://result-dan-udacity-bucket/Data/songs')

In [17]:
time_table.write.mode('overwrite').partitionBy('year','month').parquet('s3a://result-dan-udacity-bucket/Data/time')

# Verifying data

The last process is to show that the files are loaded and can be read via spark in this notebook

In [22]:
songplays_p = spark.read.parquet('s3a://result-dan-udacity-bucket/Data/songplays')

In [23]:
songplays_p.toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,Dubai UAE,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11


In [24]:
songs_p = spark.read.parquet('s3a://result-dan-udacity-bucket/Data/songs')

In [25]:
songs_p.toPandas()

Unnamed: 0,song_id,title,duration,year,artist_id
0,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,43.36281,2000,ARPBNLO1187FB3D52F
1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,186.48771,2005,ARDNS031187B9924F0
2,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),326.00771,0,ARLTWXK1187FB5A3F8
3,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),267.70240,0,ARKFYS91187B98E58F
4,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),279.97995,0,ARPFHN61187FB575F6
5,SOUDSGM12AC9618304,Insatiable (Instrumental Version),266.39628,0,ARNTLGG11E2835DDB9
6,SOPEGZN12AB0181B3D,Get Your Head Stuck On Your Neck,45.66159,0,AREDL271187FB40F44
7,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,511.16363,0,ARDR4AC1187FB371A1
8,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,207.77751,2004,ARMAC4T1187FB3FA4C
9,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),173.66159,0,AREBBGV1187FB523D2
