# Data Lake for Data Song Analysis

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

song_data = config['LOCAL']['INPUT_LOCAL_SONG_DATA']
log_data = config['LOCAL']['INPUT_LOCAL_LOG_DATA']
output_data = config['LOCAL']['OUTPUT_LOCAL_DATA']

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
spark

## Process song data

In [5]:
# get filepath to log data file
input_data = "s3a://udacity-dend/"
log_data = 'log_data/*/*/*.json'

# read log data file
df = spark.read.json(f"{input_data}{log_data}")

In [8]:
df.count()

8056

In [6]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [9]:
# extract columns to create songs table
songs_table = df.select(['song_id','title','artist_id','year','duration'])

AnalysisException: "cannot resolve '`song_id`' given input columns: [itemInSession, lastName, auth, sessionId, firstName, userId, location, registration, gender, status, level, artist, ts, userAgent, page, length, song, method];;\n'Project ['song_id, 'title, 'artist_id, 'year, 'duration]\n+- Relation[artist#6,auth#7,firstName#8,gender#9,itemInSession#10L,lastName#11,length#12,level#13,location#14,method#15,page#16,registration#17,sessionId#18L,song#19,status#20L,ts#21L,userAgent#22,userId#23] json\n"

In [7]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode('overwrite').partitionBy('year','artist_id').parquet(output_data + 'songs')

In [8]:
# extract columns to create artists table
artists_table = df.select(['artist_id','artist_name','artist_location',\
                           'artist_latitude','artist_longitude'])

In [9]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data + 'artists')

## Process log data

In [None]:
# get filepath to log data file
#log_data = 

In [10]:
# read log data file
df2 = spark.read.json(log_data)

In [11]:
df2.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [12]:
# filter by actions for song plays
df2 = df2.where('page = "NextSong"')

In [68]:
df2.createOrReplaceTempView('log_schema')

In [13]:
# extract columns for users table  
users_table = df2.select(['userId','firstName','lastName','gender','level'])

In [14]:
# write users table to parquet files
users_table.write.mode('overwrite').parquet(output_data + 'users')

In [15]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts: str(int(ts/1000.0)))
df2 = df2.withColumn('timestamp',get_timestamp(df2.ts))

In [16]:
# create datetime column from original timestamp column
get_datetime = udf(lambda ts: str(datetime.fromtimestamp(int(ts)/1000.0)))
df2 = df2.withColumn("datetime", get_datetime(df2.ts))

In [17]:
# extract columns to create time table
time_table = df2.select(
col('datetime').alias('start_time'),
hour('datetime').alias('hour'),
dayofmonth('datetime').alias('day'),
weekofyear('datetime').alias('week'),
month('datetime').alias('month'),
year('datetime').alias('year')
).dropDuplicates(['start_time'])

In [18]:
 # write time table to parquet files partitioned by year and month
time_table.write.mode('overwrite').partitionBy('year','month').parquet(output_data + 'time')

In [22]:
 # read in song data to use for songplays table
song_df = spark.read.json(song_data)
songplays_df = df2.join(song_df, (song_df.title == df2.song))

In [27]:
songplays_df.columns

['artist',
 'auth',
 'firstName',
 'gender',
 'itemInSession',
 'lastName',
 'length',
 'level',
 'location',
 'method',
 'page',
 'registration',
 'sessionId',
 'song',
 'status',
 'ts',
 'userAgent',
 'userId',
 'timestamp',
 'datetime',
 'artist_id',
 'artist_latitude',
 'artist_location',
 'artist_longitude',
 'artist_name',
 'duration',
 'num_songs',
 'song_id',
 'title',
 'year']

In [32]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = songplays_df.select(
col('ts').alias('start_time'),
col('userId').alias('user_id'),
col('level').alias('level'),
col('song_id').alias('song_id'),
col('artist_id').alias('artist_id'),
col('sessionId').alias('session_id'),
col('artist_location').alias('location'),
col('userAgent').alias('user_agent'),
year('datetime').alias('year'),
month('datetime').alias('month')
)

In [37]:
songplays_table = songplays_table.withColumn('songplay_id', monotonically_increasing_id())

In [38]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode('overwrite').partitionBy('year','month').parquet(output_data + 'songplays')