In [22]:
import configparser
from datetime import datetime
import os

from pyspark.sql import SparkSession
from pyspark.sql.functions  import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types  import TimestampType 


In [23]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID']= config.get("AWS","AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY']='config.get("AWS","AWS_SECRET_ACCESS_KEY")


In [24]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()


In [25]:
input_data = "s3a://udacity-dend/"
song_data = os.path.join(input_data, "song-data/A/B/A/*.json")

In [26]:
df_song_data = spark.read.json(song_data)

In [27]:
df_song_data = spark.read.format("json").load("s3a://udacity-dend/song_data/A/B/A")

print(df_song_data.count())

df_song_data.printSchema()

23
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [28]:
df_song_data.createOrReplaceTempView("song_data")

In [29]:
songs_table = spark.sql("""
    SELECT distinct song_id, title, artist_id, year, duration
    FROM song_data""")

In [11]:
output_data = "s3a://ywaranass3bucket/DataLake-Project/"
songs_table.write.mode('overwrite').partitionBy("year", "artist_id").parquet( os.path.join(output_data ,'songs'))

In [30]:
artists_table = spark.sql("""
    SELECT distinct artist_id,artist_name,artist_location,artist_latitude,artist_longitude
    FROM song_data""")

In [31]:
log_data = 's3a://udacity-dend/log_data/2018/11/2018-11-07-events.json'

In [59]:
df_log_data = spark.read.json(log_data)
df_log_data_filtered = df_log_data[df_log_data.page=="NextSong"]
df_log_data_filtered.createOrReplaceTempView("log_data_ftr")

In [60]:
users_table = spark.sql("""
    SELECT distinct userId,firstName,lastName,gender,level
    FROM log_data_ftr """)

In [78]:
get_timestamp = udf(lambda x: datetime.fromtimestamp((x/1000.0)), TimestampType())
df_log_data_filtered = df_log_data_filtered.withColumn("newts", get_timestamp(df_log_data_filtered.ts))

In [79]:
get_datetime = udf(lambda x: datetime.fromtimestamp((x/1000.0)).strftime('%Y-%m-%d %H:%M:%S'))
df_log_data_filtered = df_log_data_filtered.withColumn("datetime", get_datetime(df_log_data_filtered_timestamp.ts))

In [67]:
df_log_data_filtered_datetime.createOrReplaceTempView("time_data")
time_table = spark.sql("""
    SELECT ts as start_time,
           hour(datetime) as hour,
           dayofmonth(datetime) as day,
           weekofyear(datetime) as week,
           month(datetime) as month,
           year(datetime) as year,
           weekday(datetime) as weekday
      FROM time_data """)

In [23]:
time_table.write.mode('overwrite').partitionBy("year", "month").parquet( "time")

In [90]:
songplays_table = spark.sql("""
    SELECT 
         ROW_NUMBER() OVER (ORDER BY start_time,user_id,level,song_id,artist_id,session_id,location,user_agent) as songplay_id
        ,start_time
        ,month
        ,year
        ,user_id
        ,level
        ,song_id
        ,artist_id
        ,session_id
        ,location
        ,user_agent
      from 
            (select distinct
                    to_timestamp(log.ts/1000) as start_time
                   ,month(to_timestamp(log.ts/1000)) as month
                   ,year(to_timestamp(log.ts/1000)) as year
                   ,log.userid as user_id
                   ,log.level as level
                   ,song.song_id as song_id
                   ,song.artist_id as artist_id
                   ,log.sessionid as session_id
                   ,log.location as location
                   ,log.useragent as user_agent

                  FROM        log_data_ftr log 
                    left join song_data song
                         on    log.song = song.title
                           and log.length = song.duration
                            ) log_join_song
                   """)