In [13]:
#import configparser
#from datetime import datetime
#import os
#from pyspark.sql import SparkSession
#from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Ts
#from pyspark.sql.functions import udf, col
#from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [16]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, \
    dayofmonth, hour, weekofyear, date_format, \
    from_unixtime, dayofweek, dayofyear

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType

In [17]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

In [18]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:2.7.0') \
        .getOrCreate()
    return spark

In [45]:
def process_song_data(spark, input_data, output_data):
    """
    Process song data
    :param spark: SparkSession
    :param input_data: Song data directory
    :param output_data: Output directory to store as parquet
    :return: None
    """
    # get filepath to song data file
    #song_data = input_data + "./data/song_data/A/*/*.json"
    
    #song_data = input_data + "/song_data/A/A/A"
    song_data = input_data + "song_data/A/*/*/*.json"
    
    #song_data = input_data + "/song_data/A/A/A/TRAAAAK128F9318786.json"
    print(song_data)
    
    schema = StructType([
      StructField("num_songs",IntegerType(),True),
      StructField("artist_id",StringType(),True),
      StructField("artist_location",StringType(),True),
      StructField("artist_latitude",DoubleType(),True),
      StructField("artist_longitude",DoubleType(),True),
      StructField("duration",DoubleType(),True),
      StructField("artist_name",StringType(),True),
      StructField("song_id",StringType(),True),
      StructField("title",StringType(),True),
      StructField("year",IntegerType(),True)
    ])
    
    df = None
    
    # read song data file
    try:
        df = spark.read.schema(schema).json(song_data)
    except Exception as e:
        print(e)
    
    print('Number of data: ' + str(df.count()))

    # extract columns to create songs table
    songs_table = df.select('song_id', 'title', 'artist_id',
                            'year', 'duration').dropDuplicates(['song_id'])
    print(songs_table.describe())

    # write songs table to parquet files partitioned by year and artist
    songs_table.write.parquet(f'{output_data}/song_table',
                              mode='overwrite',
                              partitionBy=['year', 'artist_id'])

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name',
                              'artist_location', 'artist_latitude',
                              'artist_longitude').dropDuplicates(['artist_id'])
    # write artists table to parquet files
    artists_table.write.parquet(f'{output_data}/artist_table',
                                mode='overwrite')
    
    return df

In [37]:
def process_log_data(spark, input_data, output_data, song_dataset):
    """
    Process the log data from the log directory
    :param spark: SparkSession
    :param input_data: Input data directory
    :param output_data: output data directory
    :return: None
    """
    # get filepath to log data file
    log_data = input_data + 'log_data/*/*/*.json'
    print(log_data)
    
    schema = StructType([
      StructField("artist",StringType(),True),
      StructField("auth",StringType(),True),
      StructField("firstName",StringType(),True),
      StructField("gender",StringType(),True),
      StructField("lastName",StringType(),True),
      StructField("length",DoubleType(),True),
      StructField("level",StringType(),True),
      StructField("location",StringType(),True),
      StructField("method",StringType(),True),
      StructField("page",StringType(),True),
      StructField("registration",DoubleType(),True),
      StructField("sessionId",IntegerType(),True),
      StructField("song",StringType(),True),
      StructField("status",IntegerType(),True),
      StructField("ts",IntegerType(),True),
      StructField("userAgent",StringType(),True),
      StructField("userId",IntegerType(),True),
    ])
    
    df = None
    # read log data file
    try:
        df = spark.read.schema(schema).json(log_data)
    except Exception as e:
        print(e)
    print("log_data",df.count())

    # filter by actions for song plays
    df = df.filter(df['page'] == 'Next Song')

    # extract columns for users table
    user_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates(['userId'])
    # write users table to parquet files
    user_table.write.parquet(f'{output_data}/user_table', mode='overwrite')

    # create timestamp column from original timestamp column
    # get_timestamp = udf()
    df = df.withColumn('start_time', from_unixtime(col('ts') / 1000))
    # print(df)

    # create datetime column from original timestamp column
    # get_datetime = udf()
    df = df.withColumn('datetime', col('ts'))
    #print(df)

    # extract columns to create time table
    time_table = df.select('ts', 'start_time') \
        .withColumn('year', year('start_time')) \
        .withColumn('month', month('start_time')) \
        .withColumn('week', weekofyear('start_time')) \
        .withColumn('weekday', dayofweek('start_time')) \
        .withColumn('day', dayofyear('start_time')) \
        .withColumn('hour', hour('start_time')).dropDuplicates()

    # write time table to parquet files partitioned by year and month
    time_table.write.parquet(f'{output_data}/time_table', mode='overwrite', partitionBy=['year', 'month'])

    # read in song data to use for songplays table
    #song_dataset = spark.read.json(input_data + "./data/song_data/A/*/*")

    # extract columns from joined song and log datasets to create songplays table
    song_dataset.createOrReplaceTempView('song_dataset')
    time_table.createOrReplaceTempView('time_table')
    df.createOrReplaceTempView('log_dataset')

    songplays_table = spark.sql("""SELECT DISTINCT
                                       l.ts as ts,
                                       t.year as year,
                                       t.month as month,
                                       l.userId as user_id,
                                       l.level as level,
                                       s.song_id as song_id,
                                       s.artist_id as artist_id,
                                       l.sessionId as session_id,
                                       s.artist_location as artist_location,
                                       l.userAgent as user_agent
                                   FROM song_dataset s
                                   JOIN log_dataset l
                                       ON s.artist_name = l.artist
                                       AND s.title = l.song
                                       AND s.duration = l.length
                                   JOIN time_table t
                                       ON t.ts = l.ts
                                   """).dropDuplicates()

    print("songplays number : ", songplays_table.count())
    # write songplays table to parquet files partitioned by year and month
    #songplays_table.write.parquet(f'{output_data}/songplays_table/',
                                  #mode='overwrite',
                                  #partitionBy=['year', 'month'])
    songplays_table.write.partitionBy("year", "month").mode("overwrite").parquet(
        f"{output_data}/songplays_table"
    )

In [21]:
spark = create_spark_session()

In [22]:
spark

In [23]:
input_data = 's3a://udacity-dend/'

In [24]:
output_data = "s3a://myawsbucket-tp-nd027"

In [46]:
song_dataset = process_song_data(spark, input_data, output_data)

s3a://udacity-dend/song_data/A/*/*/*.json


KeyboardInterrupt: 

In [39]:
song_dataset.head()

Row(num_songs=1, artist_id='ARTC1LV1187B9A4858', artist_location="Goldsmith's College, Lewisham, Lo", artist_latitude=51.4536, artist_longitude=-0.01802, duration=301.40036, artist_name='The Bonzo Dog Band', song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', year=1972)

In [27]:
song_dataset.count()

24

In [38]:
process_log_data(spark, input_data, output_data, song_dataset)

s3a://udacity-dend/log_data/*/*/*.json
log_data 8056
songplays number :  0


In [51]:
song_data = f'{input_data}song_data/A/*/*/*.json'

In [52]:
song_data

's3a://udacity-dend/song_data/A/*/*/*.json'

In [53]:
song_df = spark.read.json(song_data)

KeyboardInterrupt: 

In [None]:
song_data = f'{input_data}song_data/A/A/A/*.json'
    
# read song data file
song_df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = song_df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

# extract columns to create artists table
artists_table = song_df.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])

In [8]:
song_df.head()

Row(artist_id='ARTC1LV1187B9A4858', artist_latitude=51.4536, artist_location="Goldsmith's College, Lewisham, Lo", artist_longitude=-0.01802, artist_name='The Bonzo Dog Band', duration=301.40036, num_songs=1, song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', year=1972)

In [8]:
log_data = f'{input_data}log_data/2018/11/2018-11-12-events.json'

# read log data file
log_df = spark.read.json(log_data)

# filter by actions for song plays
log_df = log_df.where(log_df.page == 'NextSong')

# extract columns for users table    
users_table = log_df.selectExpr('userId as user_id', 'firstName as first_name', 'lastName as last_name', 'gender', 'level').dropDuplicates()

# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: x/1000, Int())
log_df = log_df.withColumn('timestamp', get_timestamp(log_df.ts))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x), Ts())
log_df = log_df.withColumn('datetime', get_datetime(log_df.timestamp))

# extract columns to create time table
time_table = log_df.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'), 
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format('datetime', 'E').alias('weekday')
)

In [None]:
time_table = log_df.select(
    col('datetime').alias('start_time'),
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'), 
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format('datetime', 'E').alias('weekday')
)

In [9]:
songs_table.take(5)

[Row(song_id='SOAFBCP12A8C13CC7D', title='King Of Scurf (2007 Digital Remaster)', artist_id='ARTC1LV1187B9A4858', year=1972, duration=301.40036),
 Row(song_id='SOKTJDS12AF72A25E5', title='Drown In My Own Tears (24-Bit Digitally Remastered 04)', artist_id='ARA23XO1187B9AF18F', year=0, duration=192.522),
 Row(song_id='SOEKAZG12AB018837E', title="I'll Slap Your Face (Entertainment USA Theme)", artist_id='ARSVTNL1187B992A91', year=2001, duration=129.85424),
 Row(song_id='SOQPWCR12A6D4FB2A3', title='A Poor Recipe For Civic Cohesion', artist_id='AR73AIO1187B9AD57B', year=2005, duration=118.07302),
 Row(song_id='SOBRKGM12A8C139EF6', title='Welcome to the Pleasuredome', artist_id='ARXQBR11187B98A2CC', year=1985, duration=821.05424)]

In [10]:
artists_table.take(5)

[Row(artist_id='ARTC1LV1187B9A4858', artist_name='The Bonzo Dog Band', artist_location="Goldsmith's College, Lewisham, Lo", artist_latitude=51.4536, artist_longitude=-0.01802),
 Row(artist_id='ARA23XO1187B9AF18F', artist_name='The Smithereens', artist_location='Carteret, New Jersey', artist_latitude=40.57885, artist_longitude=-74.21956),
 Row(artist_id='ARSVTNL1187B992A91', artist_name='Jonathan King', artist_location='London, England', artist_latitude=51.50632, artist_longitude=-0.12714),
 Row(artist_id='AR73AIO1187B9AD57B', artist_name='Western Addiction', artist_location='San Francisco, CA', artist_latitude=37.77916, artist_longitude=-122.42005),
 Row(artist_id='ARXQBR11187B98A2CC', artist_name='Frankie Goes To Hollywood', artist_location='Liverpool, England', artist_latitude=None, artist_longitude=None)]

In [11]:
users_table.take(5)

[Row(user_id='26', first_name='Ryan', last_name='Smith', gender='M', level='free'),
 Row(user_id='80', first_name='Tegan', last_name='Levine', gender='F', level='paid'),
 Row(user_id='34', first_name='Evelin', last_name='Ayala', gender='F', level='free'),
 Row(user_id='97', first_name='Kate', last_name='Harrell', gender='F', level='paid'),
 Row(user_id='73', first_name='Jacob', last_name='Klein', gender='M', level='paid')]

In [12]:
time_table.take(5)

[Row(start_time=datetime.datetime(2018, 11, 12, 2, 37, 38, 796000), hour=2, day=12, week=46, month=11, year=2018, weekday='Mon'),
 Row(start_time=datetime.datetime(2018, 11, 12, 2, 37, 44, 796000), hour=2, day=12, week=46, month=11, year=2018, weekday='Mon'),
 Row(start_time=datetime.datetime(2018, 11, 12, 2, 42, 21, 796000), hour=2, day=12, week=46, month=11, year=2018, weekday='Mon'),
 Row(start_time=datetime.datetime(2018, 11, 12, 2, 45, 52, 796000), hour=2, day=12, week=46, month=11, year=2018, weekday='Mon'),
 Row(start_time=datetime.datetime(2018, 11, 12, 2, 47, 22, 796000), hour=2, day=12, week=46, month=11, year=2018, weekday='Mon')]

In [12]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

In [13]:
songplays_table = log_df.join(song_df, log_df.song == song_df.title, 'left')\
                    .select(
                        row_number().over(Window.partitionBy().orderBy([log_df.datetime])).alias('songplay_id'), 
                        log_df.datetime.alias('start_time'),
                        log_df.userId.alias('user_id'),
                        log_df.level,
                        song_df.song_id,
                        song_df.artist_id,
                        log_df.sessionId.alias('session_id'),
                        log_df.location,
                        log_df.userAgent.alias('user_agent')
                    )

In [14]:
songplays_table.take(5)

[Row(songplay_id=1, start_time=datetime.datetime(2018, 11, 12, 2, 37, 38, 796000), user_id='10', level='free', song_id=None, artist_id=None, session_id=345, location='Washington-Arlington-Alexandria, DC-VA-MD-WV', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"'),
 Row(songplay_id=2, start_time=datetime.datetime(2018, 11, 12, 2, 37, 44, 796000), user_id='53', level='free', song_id=None, artist_id=None, session_id=438, location='Klamath Falls, OR', user_agent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"'),
 Row(songplay_id=3, start_time=datetime.datetime(2018, 11, 12, 2, 42, 21, 796000), user_id='53', level='free', song_id=None, artist_id=None, session_id=438, location='Klamath Falls, OR', user_agent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"'),
 Row(songplay_id=4, sta