In [43]:
#!pip3 install pyspark
import findspark
findspark.init()
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType

In [2]:
config = configparser.ConfigParser()

config.read('dl.cfg')

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:3.1.1 pyspark-shell'

def create_spark_session():
    conf = SparkConf().setAppName('pyspark_aws').setMaster('local[*]')

    sc=SparkContext(conf=conf)

    hadoopConf = sc._jsc.hadoopConfiguration()
    hadoopConf.set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    hadoopConf.set('fs.s3a.access.key', config['AWS']['AWS_ACCESS_KEY_ID'])
    hadoopConf.set('fs.s3a.secret.key', config['AWS']['AWS_SECRET_ACCESS_KEY'])

    spark=SparkSession(sc)

    return spark


spark = create_spark_session()

In [23]:
input_data = "s3a://udacity-dend/"
output_data = "./data/output/"


In [24]:

df = spark.read.json("s3a://udacity-dend/song_data/A/A/A/*.json")


In [25]:
df.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [26]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [27]:
songs_table = df["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"].drop_duplicates()

In [28]:
songs_table.columns

['artist_id',
 'artist_name',
 'artist_location',
 'artist_latitude',
 'artist_longitude']

In [30]:
songdata = input_data + "song_data/A/A/A/*.json"

# read song data file
df = spark.read.json(songdata)
df.createOrReplaceTempView("songdata_view")


In [31]:
# extract columns to create songs table
songs_table = spark.sql("""SELECT DISTINCT song_id, title, artist_id, year, duration
                                FROM songdata_view
                            WHERE song_id IS NOT NULL 
                    """)

# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs_table/")

    

In [33]:
# extract columns to create artists table
artists_table = spark.sql("""SELECT DISTINCT 
                                        artist_id, artist_name as name, artist_location as location, 
                                        artist_latitude as lattitude, artist_longitude as longitude
                                   FROM songdata_view
                                  WHERE artist_id IS NOT NULL 
                              """)
    
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data+"artists_table/")

In [38]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*.json"

# read log data file
df = spark.read.json(log_data)
    

In [44]:

# filter by actions for song plays
df.filter(df.page=='NextSong').createOrReplaceTempView("log_data_view")

# extract columns for users table    
users_table = spark.sql("""SELECT DISTINCT userId as user_id, firstName as first_name, 
                                      lastName as last_name, gender, level
                                FROM log_data_view
                            WHERE userId IS NOT NULL
""")

# write users table to parquet files
users_table.write.mode("overwrite").parquet(output_data+"users_table/")

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(int(int(x)/1000)), TimestampType())
df = df.withColumn('datetime', get_datetime(df.ts))

In [46]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- datetime: timestamp (nullable = true)



In [59]:
# extract columns to create time table
df.createOrReplaceTempView("log_data_view")
time_table = spark.sql("""SELECT DISTINCT
                                    start_time, 
                                    hour(datetime) as hour, 
                                    day(datetime) as day, 
                                    weekofyear(datetime) as week, 
                                    month(datetime) as month, 
                                    year(datetime) as year, 
                                    dayofweek(datetime) as weekday
                            FROM log_data_view
                            WHERE datetime IS NOT NULL  
""")
    
time_table.show(5)

# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+"time_table/")

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-21 11:56:46|  11| 21|  47|   11|2018|      4|
|2018-11-21 19:49:29|  19| 21|  47|   11|2018|      4|
|2018-11-14 10:26:16|  10| 14|  46|   11|2018|      4|
|2018-11-14 11:30:29|  11| 14|  46|   11|2018|      4|
|2018-11-28 02:44:16|   2| 28|  48|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [54]:
# read in song data to use for songplays table
song_df = spark.read.parquet(output_data+"songs_table/")
song_df.createOrReplaceTempView("songs_table")

artist_df = spark.read.parquet(output_data+"artists_table/")
artist_df.createOrReplaceTempView("artists_table")

In [57]:
# extract columns from joined song and log datasets to create songplays table 
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = spark.sql("""SELECT monotonically_increasing_id() as songplay_id,
                                        l.start_time, year(l.datetime) as year, month(l.datetime) as month, 
                                        l.userId, l.level, l.song, s.song_id, l.artist, a.artist_id, 
                                        l.sessionId, l.location, l.userAgent
                                    FROM log_data_view l
                            LEFT JOIN songs_table s ON (s.title = l.song)
                            LEFT JOIN artists_table a ON (a.name = l.artist)   
                            """) 

In [58]:
songplays_table.show(5)

+-----------+-------------------+----+-----+------+-----+---------------+-------+-----------+---------+---------+--------------------+--------------------+
|songplay_id|         start_time|year|month|userId|level|           song|song_id|     artist|artist_id|sessionId|            location|           userAgent|
+-----------+-------------------+----+-----+------+-----+---------------+-------+-----------+---------+---------+--------------------+--------------------+
|          0|2018-11-15 01:30:26|2018|   11|    26| free|  Sehr kosmisch|   null|   Harmonia|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          1|2018-11-15 01:41:21|2018|   11|    26| free|The Big Gundown|   null|The Prodigy|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          2|2018-11-15 01:45:41|2018|   11|    26| free|       Marry Me|   null|      Train|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|          3|2018-11-15 02:57:51|2018|   11|     9| free|       

In [61]:
songplays_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+"songplays_table/")