In [1]:
import configparser
from datetime import datetime
import os
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format,to_timestamp,dayofweek,from_unixtime

In [2]:
# this will parse ur creds for later access
config = configparser.ConfigParser()
config.read('dl.cfg')

# u have to specify section in your cred in order 2 configure it accuratly. in our case we called it [aws_creds]
os.environ['AWS_ACCESS_KEY_ID']=config['AWS_CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_CREDS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
song_data = os.path.join("s3a://udacity-dend/", "song_data/*/*/*/*.json" )  # because of the large data sets needed to pick one path and go with it other wise it wouldnt have been processed

In [None]:
song_df = spark.read.json(song_data) 
# read song data file

In [None]:
songs_table = song_df.select('artist_id', 'duration', 'song_id', 'title', 'year').dropDuplicates()

In [None]:
output_songs_table = songs_table.write.partitionBy("year", "artist_id").parquet('s3/buckets/output-fact-table-data/' + "output-songs", "overwrite")


In [None]:
artists_table = song_df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

In [None]:
output_artists_table = artists_table.write.parquet('s3/buckets/output-fact-table-data/' + "output-artists", "overwrite")

In [None]:
log_data = "s3a://udacity-dend/" + "log_data/*.json" 

In [None]:
log_df = spark.read.json(log_data)

In [None]:
log_df = log_df.filter(log_df['page'] == 'NextSong')

In [None]:
users_table = log_df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()


In [None]:
output_users_table = users_table.write.parquet('s3/buckets/output-fact-table-data/' + "users", "overwrite")


In [None]:
get_datetime = udf(lambda time_stamp : datetime.fromtimestamp(time_stamp/1000.0).strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
time_table = log_df.withColumn("date_timestamp",get_datetime(col("ts")))
time_table.select('date_timestamp').show()

In [None]:
time_table = time_table.select('date_timestamp', hour('date_timestamp').alias('hour'),dayofmonth('date_timestamp').alias('day'), weekofyear('date_timestamp').alias('week'), month('date_timestamp').alias('month'),
year('date_timestamp').alias('year'))


In [None]:
time_table = time_table.select("date_timestamp", "hour", "day", "week", "month", "year", "weekday").dropDuplicates()

In [None]:
time_table = time_table.write.parquet('s3/buckets/output-fact-table-data/' + "output-times", "overwrite")


In [None]:
songplays_table = song_df.join(log_df, (song_df.title == log_df.song) & (song_df.duration == log_df.length) & (song_df.artist_name == log_df.artist ))


In [None]:
songplays_table = songplays_table.withColumn("songplay_id",monotonically_increasing_id()) 


In [None]:
songplays_table = songplays_table.withColumn('start_time', to_timestamp(date_format((col("ts") /1000).cast(dataType=TimestampType()), "yyyy/MM/dd HH:MM:ss z"),"yyyy/MM/dd HH:MM:ss z")).select("songplay_id","start_time",col("userId").alias("user_id"),"level","song_id","artist_id",col("sessionId").alias("session_id"),col("artist_location").alias("location"),"userAgent",month(col("start_time")).alias("month"),year(col("start_time")).alias("year"))

In [None]:
songplays_table = songplays_table.write.partitionBy("year", "month").parquet('s3/buckets/output-fact-table-data/',"output-songplays"),"overwrite")
