# Data Lake on S3

In [1]:
from pyspark.sql import SparkSession
import databricks.koalas as ks

import os
import configparser



# Make sure that your AWS credentials are loaded as env vars

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create spark session with hadoop-aws package

In [3]:
spark = SparkSession.builder\
                    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

# Load data from S3

In [4]:
input_data = "s3a://udacity-dend/"
output_data = "s3a://project4dend/"
    

In [4]:
kdf = ks.read_json("data/song_data/A/B/C/*.json")# s3a://udacity-dend/song_data/*/*/*/*.json to read the whole data

In [5]:
kdf.dtypes

artist_id            object
artist_latitude     float64
artist_location      object
artist_longitude    float64
artist_name          object
duration            float64
num_songs             int64
song_id              object
title                object
year                  int64
dtype: object

In [6]:
kdf.to_spark().printSchema()
kdf.head()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARNF6401187FB57032,40.79086,"New York, NY [Manhattan]",-73.96644,Sophie B. Hawkins,305.162,1,SONWXQJ12A8C134D94,The Ballad Of Sleeping Beauty,1994
1,ARLTWXK1187FB5A3F8,32.74863,"Fort Worth, TX",-97.32925,King Curtis,326.00771,1,SODREIN12A58A7F2E5,A Whiter Shade Of Pale (Live @ Fillmore West),0
2,ARPFHN61187FB575F6,41.88415,"Chicago, IL",-87.63241,Lupe Fiasco,279.97995,1,SOWQTQZ12A58A7B63E,Streets On Fire (Explicit Album Version),0
3,AR0IAWL1187B9A96D0,8.4177,Panama,-80.11278,Danilo Perez,197.19791,1,SONSKXP12A8C13A2C9,Native Soul,2003
4,AREVWGE1187B9B890A,-13.442,Noci (BA),-41.9952,Bitter End,282.43546,1,SOFCHDR12AB01866EF,Living Hell,0


# saving the data

In [7]:
songs_table = (ks.sql('''
               SELECT 
               DISTINCT
               row_number() over (ORDER BY year,title,artist_id) id,
               title,
               artist_id,
               year,
               duration
               FROM 
                   {kdf}''')
              )

#songs_table.to_spark().withColumn("id", monotonicallyIncreasingId())
songs_table

Unnamed: 0,id,title,artist_id,year,duration
0,1,A Whiter Shade Of Pale (Live @ Fillmore West),ARLTWXK1187FB5A3F8,0,326.00771
1,2,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
2,3,Living Hell,AREVWGE1187B9B890A,0,282.43546
3,4,Midnight Star,ARULZCI1241B9C8611,0,335.51628
4,5,Music is what we love,AR051KA1187B98B2FF,0,261.51138
5,6,Streets On Fire (Explicit Album Version),ARPFHN61187FB575F6,0,279.97995
6,7,The Ballad Of Sleeping Beauty,ARNF6401187FB57032,1994,305.162
7,8,Prognosis,ARWB3G61187FB49404,2000,363.85914
8,9,Intro,AR558FS1187FB45658,2003,75.67628
9,10,Native Soul,AR0IAWL1187B9A96D0,2003,197.19791


In [8]:
songs_table.shape

(12, 5)

### Apache Parquet Introduction

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems.




### Spark Write DataFrame to Parquet file format

Using spark.write.parquet() function we can write Spark DataFrame to Parquet file.




### Spark parquet partition â€“ Improving performance

> Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

> Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as year followed by artist_id hence, it creates a artist_id folder inside the year folder.

In [9]:
    
(songs_table
 .to_spark()
 .write
 .partitionBy("year", "artist_id")
 .parquet('songs/')
)

AnalysisException: 'path file:/home/workspace/songs already exists.;'

## Artists

In [9]:
artists_table = (ks.sql('''
               SELECT 
               DISTINCT
               artist_id,
               artist_name,
               artist_location,
               artist_latitude,
               artist_longitude
               FROM 
                   {kdf}''')
              )

artists_table

Unnamed: 0,artist_id,artist_name,artist_location,artist_latitude,artist_longitude
0,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
1,ARJIE2Y1187B994AB7,Line Renaud,,,
2,ARNF6401187FB57032,Sophie B. Hawkins,"New York, NY [Manhattan]",40.79086,-73.96644
3,ARPFHN61187FB575F6,Lupe Fiasco,"Chicago, IL",41.88415,-87.63241
4,AREVWGE1187B9B890A,Bitter End,Noci (BA),-13.442,-41.9952
5,AR051KA1187B98B2FF,Wilks,,,
6,ARULZCI1241B9C8611,Luna Orbit Project,,,
7,AR8IEZO1187B99055E,Marc Shaiman,,,
8,ARWB3G61187FB49404,Steve Morse,"Hamilton, Ohio",,
9,ARLTWXK1187FB5A3F8,King Curtis,"Fort Worth, TX",32.74863,-97.32925


In [10]:
artists_table.shape

(12, 5)

In [15]:
# Write Artists:

(artists_table
     .to_spark()
     .write
     .parquet('artists/')
    )

# Log Data

In [11]:
kdfLog = ks.read_json("data/2018-11-*.json")

In [12]:
kdfLog.dtypes

artist            object
auth              object
firstName         object
gender            object
itemInSession      int64
lastName          object
length           float64
level             object
location          object
method            object
page              object
registration     float64
sessionId          int64
song              object
status             int64
ts                 int64
userAgent         object
userId            object
dtype: object

In [13]:
kdfLog.to_spark().printSchema()
kdfLog.head()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


### Users Table

In [14]:
users_table = (ks.sql('''
                SELECT
                DISTINCT
                userId,
                firstName,
                lastName,
                gender,
                level
                FROM
                    {kdfLog}
                WHERE page = 'NextSong'
                ORDER BY userId ASC
                ''')
              )
users_table.head()

Unnamed: 0,userId,firstName,lastName,gender,level
0,10,Sylvie,Cruz,F,free
1,100,Adler,Barrera,M,free
2,101,Jayden,Fox,M,free
3,11,Christian,Porter,F,free
4,12,Austin,Rosales,M,free


In [15]:
# write users table
#users_table.write.parquet(output_data + "users/")
users_table.to_spark().write.parquet('users/')

In [15]:
ts = ks.to_datetime(kdfLog.ts, unit='ms')
ts.head()

0   2018-11-15 00:30:26.796
1   2018-11-15 00:41:21.796
2   2018-11-15 00:45:41.796
3   2018-11-15 01:57:51.796
4   2018-11-15 03:29:37.796
Name: ts, dtype: datetime64[ns]

In [16]:
ts.dt.hour.head()

0    0
1    0
2    0
3    1
4    3
Name: ts, dtype: int64

In [17]:
#start_time, hour, day, week, month, year, weekday

time_table = (ks.sql('''
                SELECT
                ts as start_time,
                HOUR(ts) as hour,
                DAY(ts) as day,
                EXTRACT(week FROM ts) as week,
                MONTH(ts) as month,
                YEAR(ts) as year,
                WEEKDAY(ts) as weekday
                FROM
                    {ts}
                ''')
              )
time_table.head()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-15 00:30:26.796,0,15,46,11,2018,3
1,2018-11-15 00:41:21.796,0,15,46,11,2018,3
2,2018-11-15 00:45:41.796,0,15,46,11,2018,3
3,2018-11-15 01:57:51.796,1,15,46,11,2018,3
4,2018-11-15 03:29:37.796,3,15,46,11,2018,3


In [65]:
# write time table:
time_table.to_spark().write.partitionBy('year', 'month').parquet("time/")

In [18]:
# read in song data to use for songplays table
song_data = 'data/song_data/A/A/A/*.json'
song_df = ks.read_json(song_data)

In [59]:
song_df.head()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARKFYS91187B98E58F,,,,Jeff And Sheri Easter,267.7024,1,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),0
1,AR10USD1187B99F3F1,,"Burlington, Ontario, Canada",,Tweeterfriendly Music,189.57016,1,SOHKNRJ12A6701D1F8,Drop of Rain,0
2,ARGSJW91187B9B1D6B,35.21962,North Carolina,-80.01955,JennyAnyKind,218.77506,1,SOQHXMF12AB0182363,Young Boy Blues,0
3,ARMJAGH1187FB546F3,35.14968,"Memphis, TN",-90.04892,The Box Tops,148.03546,1,SOCIWDW12A8C13D406,Soul Deep,1969
4,AR7G5I41187FB4CE6C,,"London, England",,Adam Ant,233.40363,1,SONHOTT12A8C13493C,Something Girls,1982


In [24]:
import pandas as pd

In [102]:
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

songplays_table = (ks.sql('''
                SELECT
                row_number() over (ORDER BY userId) AS songplay_id,
                to_timestamp(ts / 1000) AS start_time,
                YEAR(to_timestamp(ts / 1000)) AS year,
                MONTH(to_timestamp(ts / 1000)) AS month,
                userId AS user_id,
                dfl.level,
                sdf.song_id,
                sdf.artist_id,
                sessionId AS session_id,
                location,
                userAgent AS user_agent
                FROM {kdfLog} dfl JOIN {song_df} sdf
                ON dfl.artist = sdf.artist_name
                WHERE page = 'NextSong' 
                ''')
                )
songplays_table.head()


Unnamed: 0,songplay_id,start_time,year,month,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-19 15:36:04.796,2018,11,49,paid,SOFSOCN12A8C143F5D,ARXR32B1187FB57099,724,"San Francisco-Oakland-Hayward, CA",Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20...


In [103]:
# write songplays table:
songplays_table.to_spark().write.partitionBy('year', 'month').parquet("songplays/")

## Deployment

1. Install `awscli`

2. run `aws configure` 
    * AWS Access Key ID : 
    * AWS Secret Access Key : 
    * Default region name: `us-west-2`
    * Default output format : `json`
    
3. **copy all the necessary files to an s3 bucket**

    * Ex: `aws s3 cp <filename> s3://<bucket_name>`


4. **Run EMR create script with the etl job**

```
aws emr create-cluster --name "Spark cluster with step" \
    --release-label emr-5.30.1 \
    --applications Name=Spark \
    --log-uri s3://dendsparktutorial/logs/ \
    --ec2-attributes KeyName=emr-key \
    --instance-type m5.xlarge \
    --instance-count 3 \
    --bootstrap-actions Path=s3://dendsparktutorial/emr_bootstrap.sh \
    --steps Type=Spark,Name="Spark program",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://dendsparktutorial/src/koalas_etl.py] \
    --use-default-roles \
    --auto-terminate
```