In [1]:
import configparser
import os

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, from_unixtime, to_timestamp, date_format

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()

In [6]:
input_data = 's3a://udacity-dend/'
output_data = 's3a://udacity-sparkify-output/'

In [7]:
# def process_song_data(spark, input_data, output_data):
# get filepath to song data file
song_data = input_data +'song_data/A/A/A/*.json'

# read song data file
df = spark.read.json(song_data)

In [8]:
print(df.count())
df.printSchema()
df.show(5)

24
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Ban

In [9]:
# extract columns to create songs table
songs_table = df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])

songs_table.show(1)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
+------------------+--------------------+------------------+----+---------+
only showing top 1 row



In [10]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year','artist_id').parquet(output_data+'songs') 

In [11]:
# extract columns to create artists table
artists_table = df.select(col('artist_id'),
                          col('artist_name').alias('name'),
                          col('artist_location').alias('location'),
                          col('artist_latitude').alias('latitude'),
                          col('artist_longitude').alias('longitude')) \
                  .dropDuplicates()


In [12]:
artists_table.show(5)

+------------------+-------------+---------------+--------+----------+
|         artist_id|         name|       location|latitude| longitude|
+------------------+-------------+---------------+--------+----------+
|ARSVTNL1187B992A91|Jonathan King|London, England|51.50632|  -0.12714|
|ARXR32B1187FB57099|          Gob|               |    null|      null|
|ARZKCQM1257509D107|   Dataphiles|               |    null|      null|
|ARC1IHZ1187FB4E920| Jamie Cullum|               |    null|      null|
|AR1KTV21187B9ACD72|     Cristina|California - LA|34.05349|-118.24532|
+------------------+-------------+---------------+--------+----------+
only showing top 5 rows



In [13]:
artists_table.count()

24

In [14]:
# write artists table to parquet files
artists_table.write.parquet(output_data+'artists')

In [15]:
#def process_log_data(spark, input_data, output_data):
# get filepath to log data file

log_data = input_data+'log_data/*/*/*.json'

# read log data file
df = spark.read.json(log_data)

df.take(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [16]:
df.count()

8056

In [17]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [18]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

df.count()

6820

In [19]:
df.select('page').dropDuplicates().show()

+--------+
|    page|
+--------+
|NextSong|
+--------+



In [20]:
# extract columns for users table    
users_table = df.select(col('userId').alias('user_id'),
                        col('firstName').alias('first_name'),
                        col('lastName').alias('last_name'),
                        col('gender'), 
                        col('level')) \
                .dropDuplicates()

users_table.take(5)

[Row(user_id='26', first_name='Ryan', last_name='Smith', gender='M', level='free'),
 Row(user_id='7', first_name='Adelyn', last_name='Jordan', gender='F', level='free'),
 Row(user_id='71', first_name='Ayleen', last_name='Wise', gender='F', level='free'),
 Row(user_id='81', first_name='Sienna', last_name='Colon', gender='F', level='free'),
 Row(user_id='87', first_name='Dustin', last_name='Lee', gender='M', level='free')]

In [21]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [22]:
# write users table to parquet files
users_table.write.parquet(output_data+'users')


In [23]:
# create timestamp column from original timestamp column
#get_timestamp = udf(lambda x: date_format(datetime.fromtimestamp(x / 1000.0),''))

df = df.withColumn('start_time', to_timestamp(from_unixtime(df.ts/1000))) \
       .withColumn('hour', hour('start_time')) \
       .withColumn('day', dayofmonth('start_time')) \
       .withColumn('week', weekofyear('start_time')) \
       .withColumn('month', month('start_time')) \
       .withColumn('year', year('start_time')) \
       .withColumn('weekday', date_format('start_time', 'EEEE'))

In [24]:
# extract columns to create time table
time_table = df.select(['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']) \
               .dropDuplicates()

In [25]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year','month').parquet(output_data+'time')

In [26]:
# read in song data to use for songplays table
song_data = input_data +'song_data/A/A/A/*.json'
song_df = spark.read.json(song_data)


In [27]:
# extract columns from joined song and log datasets to create songplays table 
song_df.createOrReplaceTempView("song_staging")
df.createOrReplaceTempView("log_staging")

In [28]:
songplays_table = spark.sql("""
                            SELECT  row_number() OVER(ORDER BY ls.start_time, ls.UserId, ls.sessionId) AS songplay_id,
                                    ls.start_time, ls.UserId AS user_id, ls.level, 
                                    ss.song_id, ss.artist_id,
                                    ls.sessionId AS session_id, ls.location, ls.userAgent AS user_agent
                            FROM log_staging ls
                            LEFT JOIN song_staging ss
                              ON ls.artist = ss.artist_name AND
                                 ls.song = ss.title AND
                                 ls.length = ss.duration""")


In [29]:
# write songplays table to parquet files partitioned by year and month
songplays_table.printSchema()

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [30]:
songplays_table.show()

+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
|songplay_id|start_time|user_id|level|song_id|artist_id|session_id|location|user_agent|
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+
+-----------+----------+-------+-----+-------+---------+----------+--------+----------+



In [31]:
# write songplays table to parquet files partitioned by year and month
songplays_table \
.withColumn('year',year('start_time')) \
.withColumn('month',month('start_time')) \
.write.partitionBy('year', 'month').parquet(output_data+'songplays')


In [8]:
#check = spark.read.parquet(output_data+'songplays/*/*/*.parquet')
check = spark.read.parquet(output_data+'artists/*.parquet')

In [9]:
check.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [12]:
check.limit(5).show()

+------------------+------------------+--------------------+--------+----------+
|         artist_id|              name|            location|latitude| longitude|
+------------------+------------------+--------------------+--------+----------+
|ARTC1LV1187B9A4858|The Bonzo Dog Band|Goldsmith's Colle...| 51.4536|  -0.01802|
|ARA23XO1187B9AF18F|   The Smithereens|Carteret, New Jersey|40.57885| -74.21956|
|AR73AIO1187B9AD57B| Western Addiction|   San Francisco, CA|37.77916|-122.42005|
|ARSVTNL1187B992A91|     Jonathan King|     London, England|51.50632|  -0.12714|
|AR5LMPY1187FB573FE| Chaka Khan_ Rufus|         Chicago, IL|41.88415| -87.63241|
+------------------+------------------+--------------------+--------+----------+

