In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
# Tempo remove during local test

# config = configparser.ConfigParser()
# config.read('dl.cfg')

In [3]:
# Tempo remove during local test

# ARN_IAM_ROLE = config.get('IAM_ROLE', 'ARN')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS_SECRET_ACCESS_KEY']

# os.environ['AWS_ACCESS_KEY_ID']=config.get('IAM_ACCESS_KEY', 'AWS_ACCESS_KEY_ID')
# os.environ['AWS_SECRET_ACCESS_KEY']=config.get('IAM_ACCESS_KEY', 'AWS_SECRET_ACCESS_KEY')



In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    # song_data = input_data
    
    # read song data file
    #df = spark.read.json('data/song-data.zip')
    df = spark.read.json(input_data)
    
    #[!] with json file, automatic infer schema when read.json file. But for reading csv file might be different
    print("to see the original df read from json song data \n")
    df.printSchema()
    df.show(5)

    # extract columns to create songs table
    # see above needed functions from spark.sql.functions
    
    # extract dimensional table `songs_table`: df.select(), return a new dataframe from original one `df`
    print("extract from root schema to be  dimensional table songs_table \n")
    songs_table = df.select('song_id', \
                            'title', \
                            'artist_id', \
                            'year', \
                            'duration')
    songs_table.printSchema()
    songs_table.show(5)
    
    # write songs table to parquet files partitioned by year and artist
    
    # save df songs_table as Parquet files, maintaining the schema information
    songs_table.write \
               .partitionBy("year", "artist_id") \
               .mode('overwrite') \
               .parquet("songs.parquet")
    
    # This just for testing after saved the parquet files from df
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    # read the parquet files created above as a partitioning df
    parquetSongsFile = spark.read.parquet("songs.parquet")
    
    # creat tempo view from parquet df above
    parquetSongsFile.createOrReplaceTempView("parquetSongsFile")
    
    # to see the parquet view
    print("to see songs parque view, partitioned by artist_id and year \n")
    
    spark.sql("""
            SELECT *
            FROM parquetSongsFile
            ORDER by year
            limit 5
            """).show()
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    
    # extract columns to create artists table
    print("<###################################################################### \n")
    print("extract from root schema to be dimensional table artists_table \n")
    artists_table = df.select('artist_id', \
                              col('artist_name').alias('name'), \
                              col('artist_location').alias('location'), \
                              col('artist_latitude').alias('latitude'), \
                              col('artist_longitude').alias('longitude'))
    artists_table.printSchema()
    artists_table.show(5)
    
    # artists_table = 
    
    # write artists table to parquet files
    # partition by name and artist_id
    artists_table.write \
                 .partitionBy("name", "artist_id") \
                 .mode('overwrite') \
                 .parquet("artists.parquet")
    
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    # For test paquest df created above    
    # read back parquet files after saving above
    parquetArtistFile = spark.read.parquet("artists.parquet")
    
    # creat tempo parquet view
    print("######################################################################### \n ")
    parquetArtistFile.createOrReplaceTempView("parquetArtistFile")
    
    print("to see artists parque view, partitioned by name and artist_id \n")
    spark.sql("""
              SELECT *
              FROM parquetArtistFile
              ORDER BY artist_id
              LIMIT 5""").show()
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [6]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    # Here for test read local json files only
    log_data = input_data

    # read log data file
    df = spark.read.json(log_data)
    
    # filter whole read df by page= NextSong
    df = df.filter(df['page']=='NextSong')
    
    # drop any ele with at least NA, without dropna(), there are a lot of rows with null will be returned
    df = df.dropna()
    
    print("+++++####+++++####+++++####+++++####+++++####+++++####+++++####+++++####+++++####+++++####\n")
    print("to see the original df schema of log data \n")
    df.printSchema()
    df.show(5)
    
    # filter by actions for song plays
    #df = 
    
    # >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    print("extract from root schema to be  dimensional table songs_table \n")
    users_table = df.select(col('userId').alias('user_id'), \
                            col('firstName').alias('first_name'), \
                            col('lastName').alias('last_name'), \
                            'gender', \
                            'level')

    users_table.printSchema()
    users_table.show(5)
    
    # write users table to parquet files
    users_table.write \
               .partitionBy("user_id") \
               .mode('overwrite') \
               .parquet("users.parquet")

    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
     # read back parquet files after saving above
    parquetUsersFile = spark.read.parquet("users.parquet")
    
    # creat tempo parquet view
    print("<###################################################################### \n")
    parquetUsersFile.createOrReplaceTempView("parquetUsersFile")

    print("to see users parque view, partitioned by name and user_id \n")
    spark.sql("""
              SELECT *
              FROM parquetUsersFile
              ORDER BY user_id
              LIMIT 5""").show()
    
   # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    # raw table provide ts as epoch timestamps, need to convert to datetime_type (supported by spark)
    # create timestamp column from original timestamp column with interval 1s as start_time
    
    get_timestamp = udf(lambda x:  datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
    
    # extract the column start_time
    df = df.withColumn("start_time", get_timestamp(df.ts))
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
    df = df.withColumn("datetime", get_datetime(df.ts))
    
    # remaining columns of timetable will be used by `pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format`
    # extract columns to create time table
    time_table = df.select('start_time', \
                              hour(col("start_time")).alias('hour'), \
                             date_format(col("datetime"), 'd').alias('day'), \
                             weekofyear(col("datetime")).alias('week'), \
                             month(col("datetime")).alias('month'), \
                             year(col("datetime")).alias('year'), \
                              weekofyear(col("datetime")).alias('weekday')
                          )
    
    # write time table to parquet files partitioned by year and month
    time_table.write \
              .mode('overwrite') \
              .partitionBy('year', 'month') \
              .parquet("time_table.parquet")
    
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    # Just for see new time_table schema
    # read back parquet files after saving above
#     parquetTimeTableFile = spark.read.parquet("time_table.parquet")
    
#     print("<###################################################################### \n")
#     parquetTimeTableFile.createOrReplaceTempView("parquetTimeTableFile")

#     print("to see timetable parque view, partitioned by `year` and `month` \n")
#     spark.sql("""
#               SELECT *
#               FROM parquetTimeTableFile
#               ORDER BY start_time
#               LIMIT 5""").show()
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    ####################################################################################
    # read in song data to use for songplays table
    input_data_song_clone = 'data/song_data/A/A/A/*.json'
    song_df = spark.read.json(input_data_song_clone)
    
    #test see song_data inside log_data process
    print("see song_data gain inside log_data process \n")
    song_df.printSchema()
    song_df.show(3)


#     df.createOrReplaceTempView("log_df")
#     log_df_table = spark.sql(''' SELECT DISTINCT datetime, song, artist, userId, level, sessionId, location, userAgent
#                                  FROM log_df
#                              ''')
#     print("to see Schema of log_df_table view only.......................... \n")
#     log_df_table.printSchema()
#     log_df_table.show(2)
    
#     # creat view with same name as datafram song_df
#     song_df.createOrReplaceTempView("song_df")
#     song_df_table = spark.sql(''' SELECT DISTINCT song_id, title, artist_id, artist_name
#                                   FROM song_df
#                               ''')
    
#     ##DEUG ONLY
#     print("to see Schema of song_df_table view only.......................... \n")
#     song_df_table.printSchema()
#     song_df_table.show(2)
#     ##DEUG ONLY
#     print("========================================================= \n")
    
    # use pyspark.sql.functions.row_number() to make songplay_id
    # extract columns from joined song and log datasets to create songplays table
    
    #$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
    # just for easy debug marking log_df is log dataframe     
    log_df = df

    #two_table = log_df_table.join(song_df_table, on = (log_df_table.song == song_df_table.title) & (log_df_table.artist == song_df_table.artist_name), how = 'left_outer')
    two_table = log_df.join(song_df, \
                            (log_df.song == song_df.title) \
                            & (log_df.artist == song_df.artist_name) \
                            & (log_df.length == song_df.duration), \
                            'left_outer')
    
    ## DEUG ONLY ##############################
#     print("to see Schema of two_table only.......................... \n")
#     two_table.printSchema()
#     two_table.show(1)
    ## DEUG ONLY ##############################

    #$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
#     # Add songplay_id by monotonically_increasing_id() method
    two_table = two_table.withColumn('songplay_id', monotonically_increasing_id())
    
#      ##DEUG ONLY
#     print("to see Schema of two_table only.......................... \n")
#     two_table.printSchema()
#     two_table.show(3)
#     ##DEUG ONLY
#     #>-------------------------------------------------
    
#     #Creat songplays_table
#     # alias made inside df or song_df, is not exist yet
    songplays_table = two_table.select('songplay_id', \
                                        col("datetime").alias('start_time'), \
                                        col("userId").alias('user_id'), \
                                        col("level"), \
                                        song_df.song_id, \
                                        song_df.artist_id, \
                                        col("sessionId").alias('session_id'), \
                                        col("location"), \
                                        col("userAgent").alias('user_agent'))
#     ## DEUG ONLY ##############################
#     print("to see Schema of songplays_table only, without year month \n")
#     songplays_table.printSchema()
#     songplays_table.show(3)
#     ## DEUG ONLY ##############################

#     # Add more `year` , `month` prior- to creat parquet files
# #     songplays_table = songplays_table.withColumn('year',year('start_time')) \
# #                                      .withColumn('month',month('start_time'))
    songplays_table = songplays_table.withColumn('year', year('start_time'))\
                                      .withColumn('month', month('start_time'))
    
#     ## DEUG ONLY ##############################
#     print("to see Schema of songplays_table only, with year month \n")
#     songplays_table.printSchema()
#     songplays_table.show(3)
#     ## DEUG ONLY ##############################
    

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write\
                   .mode('overwrite')\
                   .partitionBy('year', 'month')\
                   .parquet("songplays.parquet")

    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
    # read back parquet files after saving above
    parquetSongPlaysFile = spark.read.parquet("songplays.parquet")

    print("<###################################################################### \n")
    
    #creat tempo songsplay parquet view
    parquetSongPlaysFile.createOrReplaceTempView("parquetSongPlaysFile")

    print("to see songsplay parque view, partitioned by `year` and `month` \n")
    spark.sql("""
              SELECT *
              FROM parquetSongPlaysFile
              ORDER BY month
              LIMIT 5""").show()
    # TO REVIEW ONLY <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [7]:
def main():
    spark = create_spark_session()
    
    # input_data = "s3a://udacity-dend/"

    # Test with small dataset in workspace
#     input_data_song = 'data/song-data.zip'
#     input_data_log = 'data/log-data.zip'
    input_data_song = 'data/song_data/A/A/A/*.json'
    input_data_log = 'data/log_data/*.json'

    output_data = ""
    
    process_song_data(spark, input_data_song, output_data)

    process_log_data(spark, input_data_log, output_data)


In [8]:
if __name__ == "__main__":
    main()

to see the original df read from json song data 

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARKFYS91187B98E58F|           null|          