In [1]:
#importing all the packages that I will need.
import configparser
import datetime
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import monotonically_increasing_id
#import pyarrow.parquet as pq


In [2]:
#set up the configuration and provide the AWS credential to start the project
config = configparser.ConfigParser()
config.read('dwh.cfg')

config.sections()
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

KeyError: 'AWS'

In [1]:
#CREATE SPARK SEASSION
#Create a spark session with hadoop-aws package
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

print('Creating spark session on AWS')
spark = create_spark_session()

#DATA
input_data = "s3a://udacity-dend/"
song_input_data = "data/song-data/song_data/A/A/A/*.json"
log_input_data = "data/log-data/*.json"
output_data = "s3a://ashley-dend-udacity-p4"

Creating spark session on AWS


NameError: name 'SparkSession' is not defined

In [None]:
#PROCESS SONG DATA
# get filepath to song data file
print('Read song data from json file')
song_data = spark.read.json(song_input_data)
    
# read song data file
print('Print song data schema')
df = song_data
print(df.count())
df.printSchema()

In [None]:
# extract columns to create songs table
print('Extract columns to create song table')
artist_id = "artist_id"
artist_latitude = "artist_latitude"
artist_location = "artist_location"
artist_longitude = "artist_longitude"
artist_name = "artist_name"
duration = "duration"
num_songs = "num_songs"
song_id = "song_id"
title = "title"
year = "year"
    
#print('Songs table: ')
songs_table = df.select(song_id, title, artist_id, year, duration)
print(songs_table.limit(5).toPandas())

In [None]:
#check the 'year' type so it can be used as the partition key
df_songs_table = songs_table.toPandas()
year_list = list(set(df_songs_table['year'].tolist()))
type(year_list[1])

In [None]:
#check the 'arist_id' type so it can be used as the partition key
artist_id_list = list(set(df_songs_table['artist_id'].tolist()))
type(arist_id_list[1])

In [None]:
# combine 
df_songs_table.loc[(df_songs_table['year']==int(1982)) & (df_songs_table['artist_id']==str('AR7G5I41187FB4CE6C'))]

In [None]:
# write songs table to parquet files partitioned by year and artist
print('Writing to parquet')
#songs_table.write.parquet("{}songs_table.parquet".format(output_data))
for year in year_list:
    for artist_id in artist_id_list:
        df_to_parquet = df_songs_table.loc[(df_songs_table['year']==int(year)) & (df_songs_table['artist_id']==str(artist_id))]
        df_to_parquet.to_parquet("{}/songs_table/{}/{}/songs_table.parquet".format(output_data,year,artist_id))

In [None]:
# extract columns to create artists table
print('Artist table: ')
artists_table = df.select(artist_id, artist_name, artist_location, artist_latitude, artist_longitude)
print(artists_table.limit(5).toPandas())

In [None]:
# write artists table to parquet files
#print('Writing to parquet')
#artist_table.write.parquet("artist_table.parquet")

In [None]:
# get filepath to log data file
log_data = spark.read.json(log_input_data)

# read log data file
print('Print song data schema')
log_df = log_data
print(df.count())
log_df.printSchema()
print(log_df.limit(5).toPandas())

In [None]:
# filter by actions for song plays
# using df = 

# extract columns for users table  
#print('Extract columns to create log table')
artist= 'artist'
auth= 'auth'
firstName= 'firstName'
gender= 'gender'
itemInSession= 'itemInSession'
lastName= 'lastName'
length= 'length'
level= 'level'
location= 'location'
method= 'method'
page= 'page'
registration= 'registration'
sessionId= 'sessionId'
song= 'song'
status= 'status'
ts= 'ts'
userAgent= 'userAgent'
userId= 'userId'
timestamp='timestamp'
start_time='start_time'
hour = 'hour'
day='day'
week='week'
month='month'
year='year'
weekday='weekday'

In [None]:
print('Users table: ')
users_table = log_df.select(firstName, lastName, gender, level, userId)
print(users_table.limit(5).toPandas())

#write users table to parquet files
#print('Writing to parquet')
#users_table.write.parquet("users_table.parquet")

In [None]:
#create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000), TimestampType())
log_df = log_df.withColumn("timestamp", get_timestamp(log_df.ts))
log_df.printSchema()

In [None]:
log_df.head(1)

In [None]:

# create datetime column from original timestamp column
get_datetime = udf(lambda x: F.to_date(x), TimestampType())
log_df = log_df.withColumn("start_time", get_timestamp(log_df.ts))
log_df.printSchema()
log_df.head(1)

In [None]:
# extract columns to create time table
log_df = log_df.withColumn("hour", F.hour("timestamp"))
log_df = log_df.withColumn("day", F.dayofweek("timestamp"))
log_df = log_df.withColumn("week", F.weekofyear("timestamp"))
log_df = log_df.withColumn("month", F.month("timestamp"))
log_df = log_df.withColumn("year", F.year("timestamp"))
log_df = log_df.withColumn("weekday", F.dayofweek("timestamp"))
#log_df.printSchema()
print(log_df.limit(5).toPandas())

In [None]:
time_table = log_df.select(start_time, hour, day, week, month, year, weekday)
# write time table to parquet files partitioned by year and month
#print('Writing to parquet')
#time_table.write.parquet("time_table.parquet")


In [None]:
# read in song data to use for songplays table
song_df = spark.read.json(song_input_data)

In [None]:
song_df.printSchema()

In [None]:
log_df.printSchema()

In [None]:
log_df.createOrReplaceTempView("log_df_table")
song_df.createOrReplaceTempView("song_df_table")


In [None]:
# extract columns from joined song and log datasets to create songplays table 
# songplay_id, 
# start_time, 
# user_id, 
# level, 
# song_id, 
# artist_id, 
# session_id, 
# location, 
# user_agent

songplays_table = spark.sql(
    """SELECT log_df_table.start_time, log_df_table.userId, log_df_table.level, log_df_table.sessionId, log_df_table.location, log_df_table.userAgent, song_df_table.song_id, song_df_table.artist_id 
    FROM log_df_table 
    INNER JOIN song_df_table 
    ON song_df_table.artist_name = log_df_table.artist 
    """)
#ON song_df_table.title = log_df_table.song
#ON song_df_table.artist_name = log_df_table.artist

In [None]:
songplays_table = songplays_table.withColumn("songplay_id", monotonically_increasing_id())

In [None]:
#check the data to make sure it is valid

songplays_table.show(5)

In [None]:
# write songplays table to parquet files partitioned by year and month
print('Writing to parquet')
songplays_table.write.parquet("songplays_table.parquet")

In [None]:
#below is for my personal use

#Unzip folder for jupyter lab try
# import zipfile
# path_to_zip_file = 'data/log-data.zip'
# directory_to_extract_to = 'data/log-data'
# zip_ref = zipfile.ZipFile(path_to_zip_file, 'r')
# zip_ref.extractall(directory_to_extract_to)
# zip_ref.close()