### Step 0: Run Spark locally

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [3]:
# make sure openJDK is installed in your conda env
spark = SparkSession \
    .builder \
    .appName("local data lake") \
    .getOrCreate()

In [4]:
spark

In [5]:
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1641812157336'),
 ('spark.app.name', 'local data lake'),
 ('spark.driver.host', 'b72eb30748d6'),
 ('spark.driver.port', '36931'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.startTime', '1641812156624'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.sql.warehouse.dir', 'file:/home/jovyan/work/spark-warehouse'),
 ('spark.executor.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.ui.showConsoleProgress', 'true')]

Step 1: Data exploration

In [6]:
import os
from dotenv import load_dotenv
import boto3

load_dotenv()
aws_key = os.getenv('aws_access_key_id')
aws_secret = os.getenv('aws_secret_access_key')

In [7]:
import boto3

# TODO find out whats wrong here
#s3_client = boto3.client(
#    's3',
#    aws_access_key_id=aws_key ,
#    aws_secret_access_key=aws_secret,
#)

#ok = s3_client.download_file('udacity-dend-barbara', 'log-data', '2018-11-01-events.json')
#print(ok)

### 1-A. Log Data

In [8]:
# Todo read directly from s3
df_log = spark.read.json("log-data/*.json")
print('reading')
df_log.printSchema()

reading
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



                                                                                

In [9]:
df_log.describe()

                                                                                

DataFrame[summary: string, artist: string, auth: string, firstName: string, gender: string, itemInSession: string, lastName: string, length: string, level: string, location: string, method: string, page: string, registration: string, sessionId: string, song: string, status: string, ts: string, userAgent: string, userId: string]

In [10]:
df_log.count()

8056

In [11]:
df_log.show(n=3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [12]:
df_log.take(3)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [13]:
import pandas

In [14]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


### 1-B. Song Data

In [15]:
df_song = spark.read.json("song_data/*/*/*/*.json")
print('reading')
df_song.printSchema()

reading
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [16]:
df_song.describe()

DataFrame[summary: string, artist_id: string, artist_latitude: string, artist_location: string, artist_longitude: string, artist_name: string, duration: string, num_songs: string, song_id: string, title: string, year: string]

In [17]:
df_song.count()

71

In [18]:
df_song.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


### Step2 - Create Tables

Create Tables according to project instructions: 

#### Fact Table
*songplays* - records in log data associated with song plays i.e. records with page NextSong
songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
#### Dimension Tables
*users* - users in the app
user_id, first_name, last_name, gender, level

*songs* - songs in music database
song_id, title, artist_id, year, duration

*artists* - artists in music database
artist_id, name, location, lattitude, longitude

*time* - timestamps of records in songplays broken down into specific units
start_time, hour, day, week, month, year, weekday

#### 2b - create a time table

In [22]:
from pyspark.sql import functions as F
# https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

time = df_log.withColumn('start time', F.from_unixtime(F.col('ts')/1000))
time = time.select('ts', 'start time') \
        .withColumn('year', F.year('start time')) \
        .withColumn('month', F.month('start time')) \
        .withColumn('week', F.weekofyear('start time')) \
        .withColumn('weekday', F.dayofweek('start time')) \
        .withColumn('day', F.dayofyear('start time')) \
        .withColumn('hour', F.hour('start time')) \

time.printSchema()

root
 |-- ts: long (nullable = true)
 |-- start time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



2c - create temporary views of the table on memory - not persistent

In [21]:
df_log.createOrReplaceTempView('df_log')
df_song.createOrReplaceTempView('df_song')
time.createOrReplaceTempView('time')

2d - create tables

In [23]:
# songplays table
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays = spark.sql("""
                        SELECT DISTINCT
                            l.ts as songplay_id,
                            l.ts as start_time,
                            l.userId as user_id,
                            l.level as level,
                            s.song_id as song_id,
                            s.artist_id as artist_id,
                            l.sessionId as session_id,
                            l.location as location,
                            l.userAgent as user_agent
                        FROM df_song s
                        JOIN df_log l
                            ON s.artist_name = l.artist
                            AND s.title = l.song
                            ANd s.duration = l.length
                        JOIN time t
                            ON t.ts = l.ts    
                        """).dropDuplicates()

In [24]:
songplays.count()

1

In [25]:
#check if everything is in place
songplays.show(1, vertical = True)

-RECORD 0---------------------------
 songplay_id | 1542837407796        
 start_time  | 1542837407796        
 user_id     | 15                   
 level       | paid                 
 song_id     | SOZCTXZ12AB0182364   
 artist_id   | AR5KOSW1187FB35FF4   
 session_id  | 818                  
 location    | Chicago-Napervill... 
 user_agent  | "Mozilla/5.0 (X11... 



In [34]:
# users: table - user_id, first_name, last_name, gender, level
# drop duplicates needed, as we are creating the users table based on the log data
users = df_log.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()

In [35]:
users.count()

107

In [36]:
users.show(1, vertical = True)

-RECORD 0--------------
 userId    | 57        
 firstName | Katherine 
 lastName  | Gay       
 gender    | F         
 level     | free      
only showing top 1 row



In [41]:
#songs: song_id, title, artist_id, year, duration
songs = df_song.select('song_id', 'title', 'artist_id', 'year', 'duration')

In [42]:
songs.count()

71

In [43]:
songs.show(1, vertical = True)

-RECORD 0-------------------------
 song_id   | SOBAYLL12A8C138AF9   
 title     | Sono andati? Fing... 
 artist_id | ARDR4AC1187FB371A1   
 year      | 0                    
 duration  | 511.16363            
only showing top 1 row



In [48]:
# artists: artist_id, name, location, lattitude, longitude
artists = df_song.select('artist_id', 'artist_name', 'title', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

In [49]:
artists.count()

71

In [44]:
artists.show(1, vertical=True)

NameError: name 'artists' is not defined

### Step3 - export tables

In [None]:
# Export data as parquet files to S3
output_path = os.getenv('output_path')

sonplays.write.parquet('f{output_data}/songplays', mode='overwrite')

time.write.parquet('f{output_data}/time', mode='overwrite')
artists.write.parquet('f{output_data}/artists', mode='overwrite')
songs.write.parquet('f{output_data}/songs', mode='overwrite')
users.write.parquet('f{output_data}/users', mode='overwrite')