In [5]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types 

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = input_data+ 'song_data/A/A/A/*.json'
    
    # read song data file
    df = spark.read.json(song_data)
    df.createOrReplaceTempView("songdata")

    # extract columns to create songs table
    songs_table = spark.sql(
             """SELECT DISTINCT song_id,
                title,
                artist_id,
                year,
                duration
                FROM   songdata """)
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs_table/")

    # extract columns to create artists table
    artists_table = spark.sql("""
    SELECT DISTINCT artist_id        AS artist_id,
                artist_name      AS name,
                artist_location  AS location,
                artist_latitude  AS latitude,
                artist_longitude AS longitude
    FROM   songdata
    """)
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(output_data+"artists_table/")


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data =input_data+'log_data'

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.filter(df.page=='NextSong')
    df.createOrReplaceTempView("events")

    # extract columns for users table    
    user_table =spark.sql("""SELECT DISTINCT userid,
                firstname,
                lastname,
                gender,
                level
FROM   events
WHERE  userid IS NOT NULL """) 
    
    # write users table to parquet files
    user_table.write.mode("overwrite").parquet(output_data+"user_table/")

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x:str(int(int(x)/1000)))
    df = df.withColumn('timestamp',get_timestamp(df.ts))
    
    
    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: datetime.fromtimestamp(int(int(x)/1000)))
    get_week = udf(lambda x: calendar.day_name[x.weekday()])
    get_weekday = udf(lambda x: x.isocalendar()[1])
    get_hour = udf(lambda x: x.hour)
    get_day = udf(lambda x : x.day)
    get_year = udf(lambda x: x.year)
    get_month = udf(lambda x: x.month)
    
    df = df.withColumn('start_time', get_datetime(df.ts))
    df = df.withColumn('hour', get_hour(df.start_time))
    df = df.withColumn('day', get_day(df.start_time))
    df = df.withColumn('week', get_week(df.start_time))
    df = df.withColumn('month', get_month(df.start_time))
    df = df.withColumn('year', get_year(df.start_time))
    df = df.withColumn('weekday', get_weekday(df.start_time))
    df.createOrReplaceTempView("timetemp")

    # extract columns to create time table
    time_table = """SELECT start_time,
       hour,
       day,
       week,
       month,
       year,
       weekday
FROM   timetemp """
    
    # write time table to parquet files partitioned by year and month
    time_table.write.mode("overwrite")\
    .partitionBy("year", "month")\
    .parquet(output_data+"time_table/")

    # read in song data to use for songplays table
    song_df = input_data+ 'song_data/A/A/A/*.json'
    song_df = spark.read.json(song_data)
    song_df.createOrReplaceTempView("songstemp")

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table =spark.sql("""SELECT monotonically_increasing_id() as songplay_id,
                                        timetemp.datetime AS start_time,
                                        month(timetemp.datetime) as month,
                                        year(timetemp.datetime) as year,
                                        timetemp.userId as user_id,
                                        timetemp.level as level,
                                        songstemp.song_id as song_id,
                                        songstemp.artist_id as artist_id,
                                        timetemp.sessionId as session_id,
                                        timetemp.location as location,
                                        timetemp.userAgent as user_agent
                                FROM timetemp 
                                JOIN songstemp  
                                on timetemp.artist = songstemp.artist_name and timetemp.song = songstemp.title""") 

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.mode("overwrite")\
    .partitionBy("year", "month")\
    .parquet(output_data+"songplays_table/")


def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://mybucketusingaws/"
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()


MissingSectionHeaderError: File contains no section headers.
file: 'dl.cfg', line: 1
"AWS_ACCESS_KEY_ID='ASIAUHRNVDAJ76GDWB5V'\n"