In [1]:
# 1. Create 3 directories song_data , log_data and output_files by typing : mkdir output_files
# 2. Unzip the the song data and log data to the above file : 
#      - unzip /home/workspace/data/song-data.zip -d /home/workspace/song_data    
#      - unzip /home/workspace/data/log-data.zip -d /home/workspace/log_data
# 3. Run the etl script: spark-submit etl.py

In [16]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format 
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import to_date

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [11]:
input_song = "./song_data"
input_log = "./"
output_data = "./output_files"   

In [5]:
spark = create_spark_session()

In [6]:
df_song = spark.read.json(os.path.join(input_song, 'song_data/*/*/*/*.json'))

In [7]:
df_song.createOrReplaceTempView("staging_songs")

In [8]:
 songs_df= spark.sql ("""
        SELECT DISTINCT
            song_id     ,
            title       ,
            artist_id   ,
            duration    ,
            year
        FROM staging_songs
    """).collect()
df1 = spark.createDataFrame(songs_df)
df1.write.partitionBy("year", "artist_id").parquet(os.path.join(output_data, 'songs'))

In [9]:
artists_df= spark.sql ("""
        SELECT DISTINCT
            artist_id          AS artist_id ,
            artist_name        AS name      ,
            artist_location    AS location  ,
            artist_latitude    AS latitude  ,
            artist_longitude   AS longtitude
        FROM staging_songs  
    """).collect()
df2 = spark.createDataFrame(artists_df)
df2.write.parquet(os.path.join(output_data, 'artists'))

In [13]:
df_log = spark.read.json(os.path.join(input_log, "log_data/*.json"))

In [17]:
get_timestamp = udf(lambda x: datetime.fromtimestamp((x/1000.0)), TimestampType())
df_log = df_log.withColumn('timestamp', get_timestamp(df_log.ts))
df_log = df_log.withColumn('date', to_date("timestamp"))
df_log = df_log.withColumn('start_time', date_format("timestamp", 'HH:mm:ss'))
df_log = df_log.withColumn('hour', hour("timestamp"))
df_log = df_log.withColumn('day', dayofmonth("date"))
df_log = df_log.withColumn('week', weekofyear("date"))
df_log = df_log.withColumn('month', month("date"))
df_log = df_log.withColumn('year', year("date"))
df_log = df_log.withColumn('weekday', date_format('date', 'E'))

In [18]:
df_log.createOrReplaceTempView("staging_events")

In [19]:
user_df = spark.sql ("""
        SELECT DISTINCT
                userid       AS user_id    ,
                firstname    AS first_name ,
                lastname     AS last_name  ,
                gender       AS gender     ,  
                level        AS level
        FROM staging_events
        WHERE page = 'NextSong'
    """).collect()
df3 = spark.createDataFrame(user_df)
df3.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))

In [20]:
time_df= spark.sql ("""
        SELECT DISTINCT
            start_time,
            hour      , 
            day       ,
            week      , 
            month     ,
            year      , 
            weekday
        FROM staging_events
    """).collect()
df4 = spark.createDataFrame(time_df)
df4.write.partitionBy("year", "month").mode('overwrite').parquet(os.path.join(output_data, 'time'))

In [22]:
songplays_df = spark.sql ("""
        SELECT DISTINCT
            staging_events.userId       AS user_id   ,
            staging_songs.song_id       AS song_id   ,
            staging_songs.artist_id     AS artist_id ,
            staging_events.start_time   AS startime  ,
            staging_events.level        AS level     ,
            staging_events.sessionId    AS session_id,
            staging_events.location     AS location  ,
            staging_events.userAgent    AS user_agent,
            staging_events.year         AS year      ,
            staging_events.month        AS month

        FROM staging_events          staging_events

        JOIN staging_songs           staging_songs
        ON   staging_events.artist = staging_songs.artist_name
        AND  staging_events.song   = staging_songs.title
        AND  staging_events.length = staging_songs.duration

        WHERE staging_events.page  = 'NextSong'
    """).collect()
df5 = spark.createDataFrame(songplays_df)
df5.write.partitionBy("year", "month").mode('overwrite').parquet(os.path.join(output_data, 'songplays'))