In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
input_data = "s3a://udacity-dend/"
song_data = input_data+'song_data/A/B/*/*.json'    

In [4]:
    # read song data file
df = spark.read.json(song_data)
df.printSchema()
df.head(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



[Row(artist_id='ARLTWXK1187FB5A3F8', artist_latitude=32.74863, artist_location='Fort Worth, TX', artist_longitude=-97.32925, artist_name='King Curtis', duration=326.00771, num_songs=1, song_id='SODREIN12A58A7F2E5', title='A Whiter Shade Of Pale (Live @ Fillmore West)', year=0),
 Row(artist_id='ARIOZCU1187FB3A3DC', artist_latitude=None, artist_location='Hamlet, NC', artist_longitude=None, artist_name='JOHN COLTRANE', duration=220.44689, num_songs=1, song_id='SOCEMJV12A6D4F7667', title='Giant Steps (Alternate Version_ Take 5_ Alternate)', year=0),
 Row(artist_id='ARPFHN61187FB575F6', artist_latitude=41.88415, artist_location='Chicago, IL', artist_longitude=-87.63241, artist_name='Lupe Fiasco', duration=279.97995, num_songs=1, song_id='SOWQTQZ12A58A7B63E', title='Streets On Fire (Explicit Album Version)', year=0),
 Row(artist_id='AR5S9OB1187B9931E3', artist_latitude=34.05349, artist_location='Los Angeles, CA', artist_longitude=-118.24532, artist_name='Bullet Boys', duration=156.62975, num

In [5]:
songs_table = df.select("song_id","title","artist_id","year","duration")
songs_table.head()

Row(song_id='SODREIN12A58A7F2E5', title='A Whiter Shade Of Pale (Live @ Fillmore West)', artist_id='ARLTWXK1187FB5A3F8', year=0, duration=326.00771)

In [6]:
output_data = '/'
songs_table.write.mode('overwrite').partitionBy("artist_id","year").parquet(output_data+'songs')

In [7]:
    # extract columns to create artists table
    artists_table = df.select("artist_id", col("artist_name").alias('name'), "artist_location",col("artist_latitude").alias('latitude'),col("artist_longitude").alias('longitude'))
    
    # write artists table to parquet files
    artists_table.write.mode('overwrite').parquet(output_data+'artists')

In [17]:
    log_data = input_data + 'log_data/*/*/*.json'

    # read log data file
    df = spark.read.json(log_data)
    df.head()


Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

In [18]:
df = df.filter("page == 'NextSong'")
df.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')

In [19]:
users_table = df.select(col("userId").alias('user_id'), col("firstName").alias('first_name'), col("lastName").alias('last_name'), "gender", "level").dropDuplicates()
users_table.head(5)

[Row(user_id='26', first_name='Ryan', last_name='Smith', gender='M', level='free'),
 Row(user_id='7', first_name='Adelyn', last_name='Jordan', gender='F', level='free'),
 Row(user_id='71', first_name='Ayleen', last_name='Wise', gender='F', level='free'),
 Row(user_id='81', first_name='Sienna', last_name='Colon', gender='F', level='free'),
 Row(user_id='87', first_name='Dustin', last_name='Lee', gender='M', level='free')]

In [20]:
    get_timestamp = udf(lambda x: str(int(int(x/1000))))
    df = df.withColumn('timestamp', get_timestamp(df.ts))
    df.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp='1542241826')]

In [21]:
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000)))
df = df.withColumn('start_time', get_datetime(df.timestamp))
df.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp='1542241826', start_time='1970-01-18 20:24:01.826000')]

In [13]:
time_table =df.select("start_time") \
    .withColumn('year',year(df.start_time)) \
    .withColumn('month',month(df.start_time)) \
    .withColumn('weekofyear',weekofyear(df.start_time)) \
    .withColumn('dayofmonth',dayofmonth(df.start_time)) \
    .withColumn('hour',hour(df.start_time)) \
    .dropDuplicates()
time_table.head(10)

[Row(start_time='1970-01-18 20:24:23.158000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:32:57.255000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:33:36.994000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:22:51.840000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:22:56.156000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:10:33.727000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:10:41.637000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:21:44.735000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:22:06.915000', year=1970, month=1, weekofyear=3, dayofmonth=18, hour=20),
 Row(start_time='1970-01-18 20:22:12.590000', year=1970

In [14]:
song_table = output_data + 'songs'
artist_table = output_data + 'artists'
song_df = spark.read.parquet(song_table)
artist_df = spark.read.parquet(artist_table)
temp2_df = song_df.join(artist_df, song_df['artist_id'] == artist_df['artist_id'], "left").drop(artist_df['artist_id'])
temp2_df.head(1)


[Row(song_id='SOCEMJV12A6D4F7667', title='Giant Steps (Alternate Version_ Take 5_ Alternate)', duration=220.44689, artist_id='ARIOZCU1187FB3A3DC', year=0, name='JOHN COLTRANE', artist_location='Hamlet, NC', latitude=None, longitude=None)]

In [22]:
df = df.join(temp2_df, [df['song'] == temp2_df['title'], df['artist'] == temp2_df['name']], 'left')


In [23]:
df.head()

Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', timestamp='1542241826', start_time='1970-01-18 20:24:01.826000', song_id=None, title=None, duration=None, artist_id=None, year=None, name=None, artist_location=None, latitude=None, longitude=None)

In [24]:
songplay_table = df.select(monotonically_increasing_id().alias('songplay_id'), 
                           col('start_time'), 
                           col('userId').alias('user_id'),
                           'level',
                           'song_id',
                           'artist_id', 
                           col('sessionId').alias('session_id'),
                           'location', 
                           col('userAgent').alias('user_agent'),
                           year('start_time').alias('year'),
                           month('start_time').alias('month'))
songplay_table.head(5)


[Row(songplay_id=0, start_time='1970-01-18 20:12:29.869000', user_id='97', level='paid', song_id=None, artist_id=None, session_id=293, location='Lansing-East Lansing, MI', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', year=1970, month=1),
 Row(songplay_id=1, start_time='1970-01-18 20:40:29.554000', user_id='15', level='paid', song_id=None, artist_id=None, session_id=834, location='Chicago-Naperville-Elgin, IL-IN-WI', user_agent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', year=1970, month=1),
 Row(songplay_id=2, start_time='1970-01-18 20:24:39.379000', user_id='80', level='paid', song_id=None, artist_id=None, session_id=611, location='Portland-South Portland, ME', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', year=1970, month=1)

In [25]:
songplay_table.write.mode('overwrite').partitionBy("year", "month").parquet(output_data + 'songplay')

In [4]:
    print('Welcome to Sparkify Etl from Data Lake. Please, indicate where do you want to store the results: ')
    output = input()
    print(output+'artists')

Welcome to Sparkify Etl from Data Lake. Please, indicate where do you want to store the results: 


 /


/artists
