In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [5]:
input_data = "s3a://udacity-dend/"

In [6]:
song_data = os.path.join(input_data, "song_data/A/A/*/*.json")

In [7]:
song_df = spark.read.json(song_data)

In [8]:
song_df.printSchema()
song_df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni..

In [9]:
songs_table = song_df.select('song_id', 'title', 'artist_id', 'year', 'duration')

In [10]:
songs_table.show(5)

+------------------+--------------------+------------------+----+----------+
|           song_id|               title|         artist_id|year|  duration|
+------------------+--------------------+------------------+----+----------+
|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|ARSUVLW12454A4C8B8|   0|  94.56281|
|SOXRPUH12AB017F769|Exodus: Part I: M...|ARXQC081187FB4AD42|   0|1047.71873|
|SOVNKJI12A8C13CB0D|Take It To Da Hou...|ARWUNH81187FB4A3E0|2001| 227.10812|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972| 301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|   192.522|
+------------------+--------------------+------------------+----+----------+
only showing top 5 rows



In [11]:
output_data = "s3a://aws-emr-resources-860223054232-us-east-1/"

In [12]:
songs_table.write.partitionBy("year","artist_id").mode('overwrite').parquet(os.path.join(output_data, "songs"))

In [13]:
artists_table = song_df.selectExpr("artist_name as name", "artist_location as location")
artists_table.show(5)

+--------------------+--------------------+
|                name|            location|
+--------------------+--------------------+
|Royal Philharmoni...|           Tennessee|
|William Shatner_ ...|                  UK|
|         Trick Daddy|     Miami , Florida|
|  The Bonzo Dog Band|Goldsmith's Colle...|
|     The Smithereens|Carteret, New Jersey|
+--------------------+--------------------+
only showing top 5 rows



In [14]:
artists_table = song_df.selectExpr("artist_id", "artist_name as name", "artist_location as location", \
                              "artist_latitude as lattitude", "artist_longitude as longitude")
artists_table.show(5)

+------------------+--------------------+--------------------+---------+---------+
|         artist_id|                name|            location|lattitude|longitude|
+------------------+--------------------+--------------------+---------+---------+
|ARSUVLW12454A4C8B8|Royal Philharmoni...|           Tennessee| 35.83073|-85.97874|
|ARXQC081187FB4AD42|William Shatner_ ...|                  UK| 54.31407| -2.23001|
|ARWUNH81187FB4A3E0|         Trick Daddy|     Miami , Florida|     null|     null|
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|  51.4536| -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey| 40.57885|-74.21956|
+------------------+--------------------+--------------------+---------+---------+
only showing top 5 rows



In [15]:
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, "artists"))

## log table

In [16]:
log_data = os.path.join(input_data, "log_data/2018/*/*.json")

In [17]:
df = spark.read.json(log_data)

In [18]:
df.printSchema()
df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            lo

In [19]:
df = df.filter(df.page =="NextSong")

## expression1, create temp view

In [27]:
df.createOrReplaceTempView("dfFilter")

In [30]:
users_table = spark.sql("""
SELECT distinct userId as user_id, firstName as first_name, lastName as last_name, gender, level
FROM dfFilter
"""
)

## expresson2, use dropDuplicates()

In [23]:
users_table = df.selectExpr("userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level")\
               .dropDuplicates()

In [24]:
user_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [25]:
users_table.write.mode('overwrite').parquet(os.path.join(output_data, "users"))

In [26]:
get_timestamp = udf(lambda ts : datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d %H:%M:%S'))

In [27]:
df = df.withColumn("timestamp", get_timestamp(df.ts))
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          timestamp|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smi

In [28]:
 get_datetime = udf(lambda ts : datetime.fromtimestamp(ts/1000).strftime('%Y-%m-%d'))
 df = df.withColumn("datetime", get_datetime(df.ts))
 df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+----------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|          timestamp|  datetime|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+----------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|2018-11-15|
|The Prodigy|Log

In [29]:
 time_table = df.selectExpr("ts as start_time", "hour(timestamp) as hour", "dayofmonth(datetime) as day", "weekofyear(datetime) as week"\
                           , "month(datetime) as month", "year(datetime) as year", "dayofweek(datetime) as weekday")

In [30]:
time_table.show(5)

+-------------+----+---+----+-----+----+-------+
|   start_time|hour|day|week|month|year|weekday|
+-------------+----+---+----+-----+----+-------+
|1542241826796|   0| 15|  46|   11|2018|      5|
|1542242481796|   0| 15|  46|   11|2018|      5|
|1542242741796|   0| 15|  46|   11|2018|      5|
|1542253449796|   3| 15|  46|   11|2018|      5|
|1542260935796|   5| 15|  46|   11|2018|      5|
+-------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [31]:
time_table.write.partitionBy("year","month").mode('overwrite').parquet(os.path.join(output_data, "time"))

In [32]:
song_stage = song_df.select("title", "song_id", "artist_id", "artist_name").dropDuplicates()

In [33]:
song_stage.createOrReplaceTempView("songStage")

In [34]:
spark.sql("""
select * from songStage
""").show(5)

+--------------------+------------------+------------------+--------------------+
|               title|           song_id|         artist_id|         artist_name|
+--------------------+------------------+------------------+--------------------+
|I Got Over It (Al...|SOOSYMY12AB01888CD|AR7BMMV1187FB5B2D7|         Robben Ford|
|Song of Doing Lau...|SOAUIQZ12A8C13E7A2|AROEG4C1187B99DC4A|Gesangla Girls Ch...|
|               C'Mon|SOEZVWU12A8C138CE8|AR4CE6H1187FB4D5C3|                Poco|
|         Inolvidable|SOKEXTF12A8C13AE7E|ARTEGHH1187FB435C8|       Bebo & Cigala|
|Greatest of All Time|SOQBUPE12A8C13B9DC|ARYN2HS1187FB4CD67|     Archers Of Loaf|
+--------------------+------------------+------------------+--------------------+
only showing top 5 rows



In [35]:
df.createOrReplaceTempView("logStage")

In [36]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = spark.sql("""
    SELECT b.ts as start_time, b.userId as user_id, b.level, a.song_id, a.artist_id, b.sessionId as session_id, b.location, 
           b.userAgent as user_agent, month(b.datetime) as month, year(b.datetime) as year 
    FROM songStage a JOIN logStage b 
    ON a.title = b.song
    AND a.artist_name = b.artist
    AND b.page = 'NextSong'
    AND a.song_id IS NOT NULL
    AND a.artist_id IS NOT NULL
    """
)

In [37]:
songplays_table.show(5)

+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+----+
|   start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|month|year|
+-------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+----+
|1542837407796|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|   11|2018|
|1541440182796|     73| paid|SOHDWWH12A6D4F7F6A|ARC0IOF1187FB3F6E6|       255|Tampa-St. Petersb...|"Mozilla/5.0 (Mac...|   11|2018|
|1542148779796|     55| free|SOXQYSC12A6310E908|AR0L04E1187B9AE90C|       415|Minneapolis-St. P...|"Mozilla/5.0 (Mac...|   11|2018|
|1542378072796|     85| paid|SOLRYQR12A670215BF|ARNLO5S1187B9B80CC|       436|       Red Bluff, CA|"Mozilla/5.0 (Mac...|   11|2018|
|1542735998796|     49| paid|SOCHRXB12A8AE48069|ARTDQRC1187FB4EFD4|       75

In [38]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy("year","month").mode('overwrite').parquet(os.path.join(output_data, "songplays"))

In [3]:
print("Finish write parquet to songs_table.", datetime.now())

Finish write parquet to songs_table. 2022-12-27 06:58:17.637832
