# Create Tables: Data Lake with Spark
## Import Packages

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import TimestampType, DateType, IntegerType
from pyspark.sql.functions import (year, month, dayofmonth, hour,
                                   weekofyear, dayofweek, date_format,
                                   monotonically_increasing_id)

In [2]:
os.environ['AWS_ACCESS_KEY_ID'] = ''
os.environ['AWS_SECRET_ACCESS_KEY'] = ''

# uncomment the next two lines if you would like to use config files
#os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

## Creates a Spark Session

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://lake-bucket-sparkify/"

## Process Song Data

In [5]:
#song_data = os.path.join(input_data, 'song_data', '*', '*', '*', '*')
song_data = os.path.join(input_data, 'song_data/A/A/*/*.json')

In [6]:
song_df = spark.read.json(song_data)

In [7]:
song_df.count()

604

In [8]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
song_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARSUVLW12454A4C8B8,35.83073,Tennessee,-85.97874,Royal Philharmonic Orchestra/Sir Thomas Beecham,94.56281,1,SOBTCUI12A8AE48B70,Faust: Ballet Music (1959 Digital Remaster): V...,0
1,ARXQC081187FB4AD42,54.31407,UK,-2.23001,William Shatner_ David Itkin_ The Arkansas Sym...,1047.71873,1,SOXRPUH12AB017F769,Exodus: Part I: Moses and Pharaoh,0
2,ARWUNH81187FB4A3E0,,"Miami , Florida",,Trick Daddy,227.10812,1,SOVNKJI12A8C13CB0D,Take It To Da House (Featuring The Slip N' Sli...,2001
3,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972
4,ARA23XO1187B9AF18F,40.57885,"Carteret, New Jersey",-74.21956,The Smithereens,192.522,1,SOKTJDS12AF72A25E5,Drown In My Own Tears (24-Bit Digitally Remast...,0


### Create Song Table

In [10]:
songs_table = song_df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOHQZIB12A6D4F9FAF,N****_ What's Up [Featuring 50 Cent] (Album Ve...,ARWAFY51187FB5C4EF,2006,196.85832
1,SOCOHAX12A8C13B6B2,Walking With The Duke (LP Version),ARE5F2F1187B9AB7E9,1966,152.16281
2,SOKNGDE12AB017CA4D,Step Into Your Skin,ARE4SDM1187FB4D7E4,0,139.72853
3,SOQBZDP12AB0180E28,Depths Of Bavaria,ARWRO6T1187B98C5D6,2008,257.4624
4,SODZYPO12A8C13A91E,Burn My Body (Album Version),AR1C2IX1187B99BF74,0,177.99791


In [11]:
songs_table.count()

604

In [20]:
songs_table.toPandas().duplicated().sum()

0

### Create Artist Table

In [21]:
artists_table = song_df.select('artist_id', 'artist_name', 'artist_location',
                               'artist_latitude', 'artist_longitude') \
                            .withColumnRenamed("artist_name", "name") \
                            .withColumnRenamed("artist_location", "location") \
                            .withColumnRenamed("artist_latitude", "latitude") \
                            .withColumnRenamed("artist_longitude", "longitude").dropDuplicates()

In [22]:
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
1,ARY0RQP1187FB48B93,Mickey 3D,,,
2,ARPIKA31187FB4C233,The Action,New York,40.71455,-74.00712
3,ARX16TQ1187B9899C9,Oysterhead,"New Orleans, LA",29.95369,-90.07771
4,ARE50SC1187B98C04A,The 69 Eyes,"Helsinki, Finland",60.17116,24.93258


In [23]:
artists_table.toPandas().duplicated().sum()

0

## Process Log Data

In [24]:
log_data = os.path.join(input_data, "log_data/*/*/*events.json")

In [25]:
df = spark.read.json(log_data)

In [26]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [27]:
df = df.where('page == "NextSong"')

In [28]:
df.count()

6820

In [29]:
df.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [32]:
df.toPandas().duplicated().sum()

0

### Create Users Table

In [33]:
users_table = df.selectExpr('userId as user_id', 'firstName as first_name', 
                                'lastName as last_name', 'gender', 'level').dropDuplicates()
users_table.limit(10).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,7,Adelyn,Jordan,F,free
2,71,Ayleen,Wise,F,free
3,81,Sienna,Colon,F,free
4,87,Dustin,Lee,M,free
5,23,Morris,Gilmore,M,free
6,75,Joseph,Gutierrez,M,free
7,16,Rylan,George,M,paid
8,2,Jizelle,Benjamin,F,free
9,3,Isaac,Valdez,M,free


In [34]:
users_table.toPandas().duplicated().sum()

0

### Create Time Table

In [35]:
get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
df = df.withColumn("timestamp", get_timestamp(df.ts))

get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000), TimestampType())
df = df.withColumn("datetime", get_timestamp(df.ts))

In [36]:
time_table = df.select('datetime') \
                   .withColumn('start_time', df.datetime) \
                   .withColumn('hour', hour('datetime')) \
                   .withColumn('day', dayofmonth('datetime')) \
                   .withColumn('week', weekofyear('datetime')) \
                   .withColumn('month', month('datetime')) \
                   .withColumn('year', year('datetime')) \
                   .withColumn('weekday', dayofweek('datetime')) \
                   .dropDuplicates()

In [52]:
time_table.limit(5).toPandas()

Unnamed: 0,datetime,start_time,hour,day,week,month,year,weekday
0,2018-11-15 10:45:04.796,2018-11-15 10:45:04.796,10,15,46,11,2018,5
1,2018-11-21 11:07:38.796,2018-11-21 11:07:38.796,11,21,47,11,2018,4
2,2018-11-21 11:34:42.796,2018-11-21 11:34:42.796,11,21,47,11,2018,4
3,2018-11-14 08:23:12.796,2018-11-14 08:23:12.796,8,14,46,11,2018,4
4,2018-11-14 08:47:08.796,2018-11-14 08:47:08.796,8,14,46,11,2018,4


In [37]:
time_table.toPandas().duplicated().sum()

0

### Create Songplays Table

In [38]:
song_df = spark.read.json(song_data)

In [41]:
songplays_table = df.join(
    song_df, 
    song_df.artist_name == df.artist, 
    'inner'
) \
.distinct() \
.select(
    col('timestamp'), 
    col('userId'), 
    col('level'), 
    col('sessionId'),
    col('location'), 
    col('userAgent'), 
    col('song_id'), 
    col('artist_id')) \
.withColumn('songplay_id', monotonically_increasing_id()) \
.withColumn('month', month('timestamp')) \
.withColumn('year', year('timestamp')).dropDuplicates()

In [42]:
songplays_table.limit(5).toPandas()

Unnamed: 0,timestamp,userId,level,sessionId,location,userAgent,song_id,artist_id,songplay_id,month,year
0,2018-11-16 00:15:12.796,44,paid,637,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,SOLGHDZ12AB0183B11,ARF2EHS1187B994F4E,0,11,2018
1,2018-11-13 10:22:14.796,44,paid,474,"Waterloo-Cedar Falls, IA",Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r...,SOLGHDZ12AB0183B11,ARF2EHS1187B994F4E,1125281431553,11,2018
2,2018-11-21 19:08:21.796,97,paid,817,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",SOLGHDZ12AB0183B11,ARF2EHS1187B994F4E,292057776128,11,2018
3,2018-11-21 04:57:02.796,97,paid,797,"Lansing-East Lansing, MI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",SOLGHDZ12AB0183B11,ARF2EHS1187B994F4E,652835028992,11,2018
4,2018-11-15 17:33:22.796,42,paid,404,"New York-Newark-Jersey City, NY-NJ-PA","""Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebK...",SOJREJG12AB0180438,ARO81SS1187B996629,1623497637888,11,2018


In [43]:
songplays_table.count()

430