In [4]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql import Window
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [5]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [9]:
# read song data file
song_data_path = 'data/songs/*.json'
#song_data = spark.read.json(song_data_path)
# extract columns to create songs table
df = spark.read.json(song_data_path)
df = df.distinct()
songs_table = df.select("song_id","title","artist_id","year","duration")
songs_table = songs_table.distinct()
output_data = songs_table.write.partitionBy("year","artist_id").format("parquet").save("Songs.parquet")
# extract columns to create artists table
#artists_table = 

# write artists table to parquet files
#artists_table

In [10]:
#os.rmdir("song_data.parquet")
import shutil
shutil.rmtree('Songs.parquet')

In [14]:
# extract columns to create artists table
output_data = 'data/'
artist_data_path = 'data/songs/*.json'
artists_table = df.select("artist_id","artist_name","artist_location","artist_latitude","artist_longitude")
artists_table = artists_table.distinct()
output_data = artists_table.write.format("parquet").save(os.path.join(output_data,"artiz.parquet"))

In [4]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Double, StringType as String, IntegerType as Int, DateType as date , TimestampType as Timestamp
# get filepath to log data file
log_data = 'data/Log_data/*.json'

log_dataSchema = R([
       Fld("artist",   String()),
   Fld("auth",  String() ),
   Fld("firstName",  String() ),
   Fld("gender",  String() ),
   Fld("itemInSession",  String() ),
   Fld("lastName",   String()),
   Fld("length",  Double() ),
   Fld("level",  String() ),
   Fld("location",  String() ),
   Fld("method",  String() ),
   Fld("page",  String() ),
   Fld("Rregistration",  String() ),
   Fld("sessionId",  String() ),
   Fld("song",  String() ),
   Fld("status",  Int()),
   Fld("ts",  Timestamp()),
   Fld("userAgent",  String()),
   Fld("userId",  Int()),
])

# read log data file
log_dataSchema = spark.read.json(log_data) 
# filter by actions for song plays
#df =log_dataSchema.select("page").dropDuplicates().sort("page").show()
df = log_dataSchema.filter(log_dataSchema.page =='NextSong')
users = df.select('userId','firstName','lastName','gender','level').dropDuplicates().sort("userId")

In [123]:
# create timestamp column from original timestamp column
df = log_dataSchema.filter(log_dataSchema.page =='NextSong')
df = df.select("ts")
df = df.toPandas()
time_data = df['ts'].apply(pd.to_datetime)
time_data= pd.DataFrame(time_data) 
time_data= { 'Time':time_data['ts'],'hour':time_data['ts'].dt.hour, 'day':time_data['ts'].dt.day_name(), 
                     'week of year':time_data['ts'].dt.weekofyear,'month':time_data['ts'].dt.month,
                     'year':time_data['ts'].dt.year,'weekday':time_data['ts'].dt.weekday}
time_df = pd.DataFrame(time_data)
time_df
# create datetime column from original timestamp column
time_Schema =  R([
       Fld("Time",Timestamp()),
    Fld("hour",Int()),
    Fld("day",String()),
    Fld("week_of_year",Int()),
    Fld("month",Int()),
    Fld("year",Int()),
    Fld("weekday",Int()),
])

df = spark.createDataFrame(time_df,schema=time_Schema)

In [8]:
# create timestamp column from original timestamp column
df_filter = log_dataSchema.filter(log_dataSchema.page =='NextSong')
df_column = df_filter.select("ts")
df = df_column.toPandas()
time_data = df['ts'].apply(pd.to_datetime)
time_data= pd.DataFrame(time_data) 
time_data= { 'Time':time_data['ts'],'hour':time_data['ts'].dt.hour, 'day':time_data['ts'].dt.day_name(), 
                     'week of year':time_data['ts'].dt.weekofyear,'month':time_data['ts'].dt.month,
                     'year':time_data['ts'].dt.year,'weekday':time_data['ts'].dt.weekday}
time_df = pd.DataFrame(time_data)
time_Schema =  R([
       Fld("Time",Timestamp()),
    Fld("hour",Int()),
    Fld("day",String()),
    Fld("week_of_year",Int()),
    Fld("month",Int()),
    Fld("year",Int()),
    Fld("weekday",Int()),
])
df = spark.createDataFrame(time_df,schema=time_Schema)
df = df.select ('Time','hour','day','week_of_year','month','year','weekday')
tiem_table = df.write.partitionBy("year","month").format("parquet").save("tiem_table.parquet")


In [7]:
s_df = spark.read.parquet("Songs.parquet")
song_df = s_df.createOrReplaceTempView("Songs_table")

a_df = spark.read.parquet("artists_table.parquet")
artist_df = a_df.createOrReplaceTempView("artists_table")

path = 'data/Log_data/*.json'
songs_data = spark.read.json(path)
stg_df = songs_data.createOrReplaceTempView("stg_songs")

df =spark.sql(''' select distinct row_number() over (order by "sg.userId") as songplay_id,sg.ts as ts,sg.userId,sg.level,b.song_id,b.artist_id,sg.sessionId,sg.location,sg.userAgent
              from stg_songs as sg left join 
              (
              SELECT s.song_id,a.artist_id,a.artist_name from Songs_table as s 
              left JOIN artists_table as a ON s.artist_id = a.artist_id)as b
              on sg.artist = b.artist_name  where sg.page = 'NextSong' 
              group by sg.ts,sg.userId,sg.level,b.song_id,b.artist_id,sg.sessionId,sg.location,sg.userAgent ''')
df = df.toPandas()
df = pd.DataFrame(df)

t_df= pd.DataFrame({'songplay_id':df['songplay_id'],'ts':df['ts'].apply(pd.to_datetime),
       'userId':df['userId'],'level':df['level'],
       'song_id':df['song_id'],'artist_id':df['artist_id'],'sessionId':df['sessionId'],'location':df['location'],
       'userAgent':df['userAgent']})

df = spark.createDataFrame(t_df)
df.select('songplay_id','ts','userId','level','song_id','artist_id','sessionId','location','userAgent')


prat_year_and_month = (df
    .withColumn("year", year(col("ts").cast("timestamp")))
    .withColumn("month", month(col("ts").cast("timestamp"))))
prat_year_and_month.write.partitionBy("year","month").format("parquet").save("songplays_table.parquet")