In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os
import configparser

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, LongType

In [2]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"

## Each of the five tables are written to parquet files in a separate analytics directory on S3. 
## Each table has its own folder within the directory. 
## Songs table files are partitioned by year and then artist. 
## Time table files are partitioned by year and month. 
## Songplays table files are partitioned by year and month.

In [5]:
# read song data file
song_data = os.path.join(input_data,'song-data','A','A','A')   
song_df=spark.read.json(song_data)

In [6]:
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [7]:
songs_schema = StructType([
    StructField('song_id', StringType()),
    StructField('title', StringType()),
    StructField('artist_id', StringType()),
    StructField('year', LongType()),
    StructField('duration', DoubleType())
 ])

songs_table = spark.read.json(song_data, schema=songs_schema)

In [8]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
|SOKTJDS12AF72A25E5|Drown In My Own T...|ARA23XO1187B9AF18F|   0|  192.522|
|SOEKAZG12AB018837E|I'll Slap Your Fa...|ARSVTNL1187B992A91|2001|129.85424|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For...|AR73AIO1187B9AD57B|2005|118.07302|
|SOBRKGM12A8C139EF6|Welcome to the Pl...|ARXQBR11187B98A2CC|1985|821.05424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [15]:
# write songs_table to a parquet file

try:
    songs_table.write \
                .mode("overwrite") \
                .partitionBy("year","artist_id") \
                .parquet("output/songs/")
except Exception as err:
    print("Error {0} has occurred!".format(err))

In [16]:
artist_schema = StructType([
    StructField('artist_id', StringType()),
    StructField('artist_name', StringType()),
    StructField('artist_location', StringType()),
    StructField('artist_latitude', DoubleType()),
    StructField('artist_longitude', DoubleType())
 ])

artists_table = spark.read.json(song_data, schema=artist_schema)

In [17]:
artists_table.show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARTC1LV1187B9A4858|  The Bonzo Dog Band|Goldsmith's Colle...|        51.4536|        -0.01802|
|ARA23XO1187B9AF18F|     The Smithereens|Carteret, New Jersey|       40.57885|       -74.21956|
|ARSVTNL1187B992A91|       Jonathan King|     London, England|       51.50632|        -0.12714|
|AR73AIO1187B9AD57B|   Western Addiction|   San Francisco, CA|       37.77916|      -122.42005|
|ARXQBR11187B98A2CC|Frankie Goes To H...|  Liverpool, England|           null|            null|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [18]:
# write artists_table to a parquet file

try:
    artists_table.write.parquet("output/artists.parquet")
except Exception as err:
    print("Error {0} has occurred!".format(err))

Error 'path file:/home/workspace/output/artists.parquet already exists.;' has occurred!


## Read Log Data

In [19]:
log_data = os.path.join(input_data,'log-data','2018','11')
log_df = spark.read.json(log_data)

In [20]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [21]:
log_df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [22]:
# filter log_df with action 'NextSong'
log_df = log_df.filter(log_df.page == "NextSong")

In [23]:
log_df.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [24]:
users_table = log_df.select(["userId", "firstName", "lastName", "gender", "level"])

In [25]:
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    61|   Samuel|Gonzalez|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [26]:
# write users_table to a parquet file

try:
    users_table.write.parquet("output/users.parquet")
except Exception as err:
    print("Error {0} has occurred!".format(err))

Error 'path file:/home/workspace/output/users.parquet already exists.;' has occurred!


#### create timestamp column from original timestamp column

In [27]:
import  pyspark.sql.functions as F
log_df = log_df.withColumn("timestamp", F.to_timestamp(F.col("ts") / 1000))

In [28]:
from pyspark.sql.functions import *
log_df = log_df.withColumn('hour', hour(log_df.timestamp))
log_df = log_df.withColumn('day', dayofmonth(log_df.timestamp))
log_df = log_df.withColumn('week', weekofyear(log_df.timestamp))
log_df = log_df.withColumn('month', month(log_df.timestamp))
log_df = log_df.withColumn('year', year(log_df.timestamp))
log_df = log_df.withColumn('weekday', dayofweek(log_df.timestamp))

In [29]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [30]:
time_table = log_df.select(["timestamp", "hour", "day", "week", "month", "year", "weekday"])

In [31]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|           timestamp|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:41:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 00:45:...|   0| 15|  46|   11|2018|      5|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|      5|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [32]:
# write time_table to a parquet file
try:
    time_table.write \
            .mode("overwrite") \
            .partitionBy("year","month") \
            .parquet("output/time/")
except Exception as err:
    print("Error {0} has occurred!".format(err))

### Create song_plays table by joining data between logs and songs data

In [33]:
# song_plays_cols = [songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent]

In [40]:
song_df.createOrReplaceTempView("songs_data")
log_df.createOrReplaceTempView("logs_data")

In [41]:
song_plays_query = """
    SELECT
        ld.timestamp AS start_time,
        ld.year,
        ld.month,
        ld.userId AS user_id,
        ld.level,
        sd.song_id,
        sd.artist_id,
        ld.sessionId AS session_id,
        ld.location,
        ld.userAgent As user_agent
    FROM logs_data ld
    JOIN songs_data sd ON (sd.artist_name = ld.artist)
    WHERE ld.userId IS NOT NULL
"""

In [42]:
songs_plays_table = spark.sql(song_plays_query)

In [43]:
from pyspark.sql.functions import monotonically_increasing_id
songs_plays_table.withColumn("songplay_id", monotonically_increasing_id()).show(3)

+--------------------+----+-----+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|          start_time|year|month|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|songplay_id|
+--------------------+----+-----+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----------+
|2018-11-15 16:55:...|2018|   11|     42| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       404|New York-Newark-J...|"Mozilla/5.0 (Win...|          0|
|2018-11-21 05:30:...|2018|   11|     97| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       797|Lansing-East Lans...|"Mozilla/5.0 (X11...|          1|
|2018-11-28 16:54:...|2018|   11|     14| free|SOIGHOD12A8C13B5A1|ARY589G1187B9A9F4E|       929|       Red Bluff, CA|Mozilla/5.0 (Wind...|          2|
+--------------------+----+-----+-------+-----+------------------+------------------+---------

In [44]:
# write time_table to a parquet file
try:
    songs_plays_table.write \
            .mode("overwrite") \
            .partitionBy("year", "month") \
            .parquet("output/songplays/")
except Exception as err:
    print("Error {0} has occurred!".format(err))