# Data Lake
##Assignment 4 - Data Engineer Nanodegree


### Project Datasets
taken from http://millionsongdataset.com/ \
Song data: `s3://udacity-dend/song_data`\
Log data: `s3://udacity-dend/log_data`\
Log data json path: `s3://udacity-dend/log_json_path.json`\
Sample Song File:
`{
    "num_songs": 1,
    "artist_id": "ARJIE2Y1187B994AB7",
    "artist_latitude": null,
    "artist_longitude": null, 
    "artist_location": "", 
    "artist_name": "Line Renaud",
    "song_id": "SOUPIRU12A6D4FA1E1", 
    "title": "Der Kleine Dompfaff",
    "duration": 152.92036,
    "year": 0
}`

For testing reasons we have a folder with some sample data.

### Step 0 - Run Spark locally

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
# make sure openJDK is installed in your conda env
spark = SparkSession \
    .builder \
    .appName("local data lake test") \
    .getOrCreate()

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1613749961417'),
 ('spark.driver.host', 'mbio-mbpro-417.fritz.box'),
 ('spark.driver.port', '60150'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'local data lake test'),
 ('spark.ui.showConsoleProgress', 'true')]

In [4]:
spark

 ### Step 1 data exploration

In [None]:
# unzip if not done
# import zipfile
# with zipfile.ZipFile("data/song_data.zip","r") as zip_ref:
#    zip_ref.extractall("data/song_data")
# with zipfile.ZipFile("data/log_data.zip","r") as zip_ref:
#    zip_ref.extractall("data/log_data")

In [31]:
path = "data/song_data/*/*/*/*.json"
song_data = spark.read.json(path)
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [32]:
song_data.describe()

DataFrame[summary: string, artist_id: string, artist_latitude: string, artist_location: string, artist_longitude: string, artist_name: string, duration: string, num_songs: string, song_id: string, title: string, year: string]

In [33]:
song_data.count()

71

In [34]:
path = "data/log-data/*.json"
log_data = spark.read.json(path)
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [35]:
log_data.describe()

DataFrame[summary: string, artist: string, auth: string, firstName: string, gender: string, itemInSession: string, lastName: string, length: string, level: string, location: string, method: string, page: string, registration: string, sessionId: string, song: string, status: string, ts: string, userAgent: string, userId: string]

In [36]:
log_data.count()

8056

### Step 2 - Create Tables
#### Fact Table
*songplays* - records in log data associated with song plays i.e. records with page NextSong
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#### Dimension Tables
*users* - users in the app
user_id, first_name, last_name, gender, level

*songs* - songs in music database
song_id, title, artist_id, year, duration

*artists* - artists in music database
artist_id, name, location, lattitude, longitude

*time* - timestamps of records in songplays broken down into specific units
start_time, hour, day, week, month, year, weekday

In [37]:
from pyspark.sql import functions as F

# create time table
time_data = log_data.withColumn('start_time', F.from_unixtime(F.col('ts')/1000))
time_data = time.select('ts', 'start_time') \
        .withColumn('year', F.year('start_time')) \
        .withColumn('month', F.month('start_time')) \
        .withColumn('week', F.weekofyear('start_time')) \
        .withColumn('weekday', F.dayofweek('start_time')) \
        .withColumn('day', F.dayofyear('start_time')) \
        .withColumn('hour', F.hour('start_time')) \

time_data.printSchema()

root
 |-- ts: long (nullable = true)
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



In [38]:
# create temporary views of the table on memory - not persistent

song_data.createOrReplaceTempView('song_data')
log_data.createOrReplaceTempView('log_data')
time_data.createOrReplaceTempView('time_data')

In [41]:
# songplays table
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays = spark.sql("""
SELECT DISTINCT
    l.ts as songplay_id,
    l.ts as start_time,
    l.userId as user_id,
    l.level as level,
    s.song_id as song_id,
    s.artist_id as artist_id,
    l.sessionId as session_id,
    l.location as location,
    l.userAgent as user_agent
FROM song_data s
JOIN log_data l
    ON s.artist_name = l.artist
    AND s.title = l.song
    ANd s.duration = l.length
JOIN time_data t
    ON t.ts = l.ts    
""").dropDuplicates()

In [42]:
songplays.count()

1

In [43]:
songplays.show(1, vertical = True)

-RECORD 0---------------------------
 songplay_id | 1542837407796        
 start_time  | 1542837407796        
 user_id     | 15                   
 level       | paid                 
 song_id     | SOZCTXZ12AB0182364   
 artist_id   | AR5KOSW1187FB35FF4   
 session_id  | 818                  
 location    | Chicago-Napervill... 
 user_agent  | "Mozilla/5.0 (X11... 



In [60]:
# users: table - user_id, first_name, last_name, gender, level

# if renaming desired
#users = spark.sql("""
#SELECT DISTINCT
#    l.userId as user_id,
#    l.firstName as first_name,
#    l.lastName as last_name,
#    l.gender as gender,
#    l.level as level
#FROM log_data l
#""")

users = log_data.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

In [61]:
users.count()

107

In [62]:
users.show(1, vertical = True)

-RECORD 0--------------
 userId    | 57        
 firstName | Katherine 
 lastName  | Gay       
 gender    | F         
 level     | free      
only showing top 1 row



In [63]:
#songs: song_id, title, artist_id, year, duration
songs = song_data.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()

In [64]:
songs.count()

71

In [65]:
songs.show(1, vertical = True)

-RECORD 0-----------------------
 song_id   | SOGOSOV12AF72A285E 
 title     | ¿Dónde va Chichi?  
 artist_id | ARGUVEV1187B98BA17 
 year      | 1997               
 duration  | 313.12934          
only showing top 1 row



In [66]:
# artists: artist_id, name, location, lattitude, longitude
artists = song_data.select('artist_id', 'artist_name', 'title', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

In [67]:
artists.count()

71

In [68]:
artists.show(1, vertical=True)

-RECORD 0------------------------------
 artist_id        | ARWB3G61187FB49404 
 artist_name      | Steve Morse        
 title            | Prognosis          
 artist_location  | Hamilton, Ohio     
 artist_latitude  | null               
 artist_longitude | null               
only showing top 1 row



In [None]:
### Step 3 - Export Data to S3
artists.write.parquet('f{output_data}/artists_table', mode='overwrite')