In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def createSparkSession():
    spark = SparkSession.builder\
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.6,com.amazonaws:aws-java-sdk:1.11.874") \
    .config("spark.hadoop.fs.s3a.access.key", "AKIAXUPHKDSH6VKMNPJY") \
    .config("spark.hadoop.fs.s3a.secret.key", "AKIAXUPHKDSH6VKMNPJY") \
    .getOrCreate()
    return spark


In [5]:
spark

23/10/11 11:33:26 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [None]:
# def createSparkSession():
#     spark = SparkSession.builder.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()
#     return spark

#### DESCRIPTION:

        Load data from song_data dataset and extract columns
        for songs and artist tables and write the data into parquet
        files which will be loaded on s3.
    
####    INPUT:
        spark (Object):       SparkSession Object To Work With Spark
        input_data (String):  Input Data Path
        output_data (String): OutPut Data Path

In [6]:
def process_song_data(spark, input_data, output_data):
    # get file path to song data file 
    song_data = os.path.join('data/song_data/A/A/A/*.json')
    df = spark.read.json(song_data)
    
    # Extract columns from data frame to create songs table
    songs_table = df.selectExpr(
    'song_id',
    'title',
    'artist_id',
    'year',
    'duration'
    ).orderBy('song_id').drop_duplicates()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year','artist_id').parquet(os.path.join(output_data,'songs'))
    
    # Extract columns from data frame to create artists table
    artists_table = df.selectExpr(
    "artist_id",
    "artist_name as name",
    "artist_location as location",
    "artist_latitude as latitude",
    "artist_longitude as longitude").orderBy("artist_id").drop_duplicates()
    
    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data,'artists'))


#### DESCRIPTION:
        Load data from log_data dataset and extract columns
        for users and time tables, reads both the log_data and song_data
        datasets and extracts columns for songplays table with the data.
        It writes the data into parquet files which will be loaded on s3.
        
####    INPUT:
        spark (Object):       SparkSession Object To Work With Spark
        input_data (String):  Input Data Path
        output_data (String): OutPut Data Path

In [7]:
def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data)
    
    # read data log data 
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.where('page = "NextSong"')
    
    # extract columns for users table
    users_table = df.selectExpr(
    'userId as user_id',
    "firstName as first_name",
    "lastName as last_name",
    "gender",
    "level").filter('user_id <> ""').orderBy('user_id').drop_duplicates()
    
    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data,'users'))
    
    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
    df2 = df.withColumn('timestamp',get_timestamp('ts'))
    
    # extract columns to create time table from timestamp column in log data files
    time_table = df2.select(
    col('timestamp').alias('start_time'),
    hour('timestamp').alias('hour'),
    dayofmonth('timestamp').alias('day'),
    weekofyear('timestamp').alias('week'),
    month('timestamp').alias('month'),
    year('timestamp').alias('year'),
    date_format('timestamp', 'u').alias('weekday')).orderBy("start_time").drop_duplicates()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy('year','month').parquet(os.path.join(output_data,'time'))
    
    # read in song data to use for songplays table
    df_song = spark.read.json(os.path.join(input_data,'song_data/*/*/*/*.json')).selectExpr(
    "song_id",
    "title",
    "artist_id",
    "artist_name",
    "year",
    "duration").drop_duplicates()
    
    # extract columns from joined song and log datasets to create songplays table 
    song_play_table = df.join(
    df_song,
    (df.song == df_song.title) &
    (df.artist == song_df.artist_name) &
    (df.length == song_df.duration) &
    (year(df.timestamp) == song_df.year),'left_outer').select(
    df.timestamp.alias("start_time"),
    df.userId.alias("user_id"),
    df.level,
    song_df.song_id,
    song_df.artist_id,
    df.sessionId.alias("session_id"),
    df.location,
    df.userAgent.alias("user_agent"),
    year(df.timestamp).alias('year'),
    month(df.timestamp).alias('month')).orderBy("start_time", "user_id").withColumn('songplay_id',F.monotonically_increasing_id())
    
    
    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'songplays'))

In [8]:
def main():
    spark = createSparkSession()
    input_data = "s3a://udacity-dend/"
    output_data = "s3a://outputdatalakeaws/Output/"
 
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)

In [None]:
if __name__ == "__main__":
    main()