In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:

os.environ['AWS_ACCESS_KEY_ID']=config['AWS CREDS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS CREDS']['AWS_SECRET_ACCESS_KEY']


In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [5]:
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config['AWS CREDS']['AWS_ACCESS_KEY_ID'])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config['AWS CREDS']['AWS_SECRET_ACCESS_KEY'])

In [9]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data+'song_data/*/*/*/*.json'
    
    # read song data file
    df = spark.read.json(song_data)
    df.printSchema()

    # create temporary view to execute SQL queries
#     df.createorReplaceTempview("songs_staging_table")

    # extract columns to create songs table
    songs_table = df.selectExpr(["song_id","title","artist_id","cast(year as int) year","duration"]).orderBy("song_id")
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs_table/")

    # extract columns to create artists table
    artists_table = df.selectExpr(["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]).orderBy("artist_id")
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(output_data+"artists_table/")


In [5]:
input_data = "s3a://udacity-dend/"
song_data = input_data +"song_data/*/*/*/*.json"

In [30]:
songs_table = df.selectExpr(["song_id","title","artist_id","cast(year as int) year","duration"]).orderBy("song_id")

In [None]:
spark.sql(''' SELECT * FROM root''')

In [None]:
songs_table = spark.sql(""" SELECT DISTINCT s.song_id,s.title,s.artist_id,s.year,s.duration
FROM songs_staging_table AS s
WHERE s.song_id is NOT NULL """)

In [None]:
artists_table = spark.sql("""SELECT DISTINCT s.artist_id,s.artist_name,s.artist_location,s.artist_latitude,s.artist_longitude 
FROM songs_staging_table AS s WHERE s.artist_id is NOT NULL""")

In [37]:
output_data = ''
song_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs_table/")

In [39]:
artists_table = df.selectExpr(["artist_id","artist_name","artist_location","artist_latitude","artist_longitude"]).orderBy("artist_id")
artists_table.write.mode("overwrite").parquet(output_data+"artists_table/")

In [11]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = input_data +"log_data/*/*/*.json"

    # read log data file
    df = spark.read.json(log_data)
    df.printSchema()
    
    # filter by actions for song plays
    df = df.filter(df.page=="NextSong")

    # extract columns for users table    
    user_table = df.selectExpr(["userId","firstName","lastName","gender","level"]).orderBy("userId")
    
    # write users table to parquet files
    user_table.write.mode("overwrite").parquet(output_data+"users_table/")

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x:  datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
    df = df.withColumn("start_time", get_timestamp(df.ts))
    
    get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
    df = df.withColumn("datetime", get_datetime(df.ts))
    
    # extract columns to create time table
    time_table = df.select('ts','datetime','start_time',year(df.datetime).alias('year'),month(df.datetime).alias('month')).dropDuplicates()
   
    # write time table to parquet files partitioned by year and month
    time_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data +"time_table/")
#     time_table.write.mode('overwrite').partitionBy("year","month").parquet(output_data+'time_table/')

    # read in song data to use for songplays table
    song_data = input_data+'song_data/*/*/*/*.json'
    song_df = spark.read.json(song_data)

    # read in song data to use for songplays table
    song_df = spark.read.parquet(output_data + "songs.parquet")
                
    # extract columns from joined song and log datasets to create songplays table
    songplays_table = df.join(song_df, df.song == song_df.title, how='inner').select(monotonically_increasing_id().alias("songplay_id"),col("start_time").alias("start_time"),col("userId").alias("user_id"), col("level").alias("level"), col("song_id").alias("song_id"),col("artist_id").alias("artist_id"),col("sessionId").alias("session_id"),col("location").alias("location"),col("userAgent").alias("user_agent"))
    songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner").select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent", "year", "month")


    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+"songplays_table/")



In [52]:
log_data = input_data +"log_data/*/*/*.json"

    # read log data file
df2 = spark.read.json(log_data)
df2.printSchema()
    
    # filter by actions for song plays
df2 = df2.filter(df2.page == 'NextSong')

    # extract columns for users table    
user_table = df2.selectExpr(["userId","firstName","lastName","gender","level"]).orderBy("userId")

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [74]:
get_timestamp = udf(lambda x:  datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
df2 = df2.withColumn("start_time", get_timestamp(df2.ts))
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
df2 = df2.withColumn("datetime", get_datetime(df2.ts))
# extract columns to create time table
time_table = df2.select('ts','datetime','start_time',
                           year(df2.datetime).alias('year'),
                           month(df2.datetime).alias('month')
                          ).dropDuplicates()

In [75]:
time_table.write.mode('overwrite').partitionBy("year","month").parquet(output_data+'time_table/')

In [None]:
song_data = input_data+'song_data/*/*/*/*.json'
song_df = spark.read.json(song_data)

# read in song data to use for songplays table
song_df = spark.read.parquet(output_data + "songs.parquet")

In [None]:
# extract columns from joined song and log datasets to create songplays table
songplays_table = df.join(song_df, df.song == song_df.title, how='inner').select(monotonically_increasing_id().alias("songplay_id"),col("start_time").alias("start_time"),col("userId").alias("user_id"), col("level").alias("level"), col("song_id").alias("song_id"),col("artist_id").alias("artist_id"),col("sessionId").alias("session_id"),col("location").alias("location"),col("userAgent").alias("user_agent"))
songplays_table = songplays_table.join(time_table, songplays_table.start_time == time_table.start_time, how="inner").select("songplay_id", songplays_table.start_time, "user_id", "level", "song_id", "artist_id", "session_id", "location", "user_agent", "year", "month")


# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+"songplays_table/")


In [None]:
user_table = df.selectExpr(["userId","firstName","lastName","gender","level"]).orderBy("userId")
user_table.write.mode("overwrite").parquet(output_data+"users_table/")

In [15]:
def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = ""
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)

In [7]:
input_data = "s3a://udacity-dend/"
output_data = ""

In [None]:
process_song_data(spark, input_data, output_data)  

In [None]:
process_log_data(spark, input_data, output_data)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

