In [1]:
import pandas as pd
import numpy as np
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col,monotonically_increasing_id, from_unixtime
from pyspark.sql.functions import year, month, dayofmonth,dayofweek, hour, weekofyear, date_format

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
from pyspark.sql.types import StructType as R,TimestampType as Ts ,StructField as Fld,FloatType as Flt ,DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [5]:
spark = create_spark_session()
spark

## Read Songs and Log Data

In [6]:
INPUT_BUCKET='s3a://udacity-dend/'
LOG_DATA_PATH='log-data/*/*/*.json'
SONG_DATA_PATH ='song_data/A/A/*/*.json'

In [7]:
song_data = os.path.join(INPUT_BUCKET, SONG_DATA_PATH)

In [8]:
log_data = os.path.join(INPUT_BUCKET, LOG_DATA_PATH)

In [9]:
### Create Log Schema
log_schema = R([
    Fld("artist",Str()),
    Fld("auth",Str()),
    Fld("first_name",Str()),
    Fld("gender",Str()),
    Fld("item_in_session",Int()),
    Fld("last_name",Str()),
    Fld("length",Flt()),
    Fld("level",Str()),
    Fld("location",Str()),
    Fld("method",Str()),
    Fld("page",Str()),
    Fld("registration",Flt()),
    Fld("session_id",Int()),
    Fld("song",Str()),
    Fld("status",Int()),
    Fld("ts",Ts()),
    Fld("user_agent",Str()),
    Fld("user_id",Int())
    ])

In [10]:
### Create Song Schema
song_schema = R([
        Fld("artist_id", Str()),
        Fld("artist_latitude", Dbl()),
        Fld("artist_location", Str()),
        Fld("artist_longitude", Str()),
        Fld("artist_name", Str()),
        Fld("duration", Dbl()),
        Fld("num_songs", Int()),
        Fld("title", Str()),
        Fld("year", Int()),
    ])

In [11]:
df_song = spark.read.json(song_data)
df_log = spark.read.json(log_data)

In [12]:
df_song.count()

604

## Process Songs Data

In [13]:
# extract columns to create songs table
song_columns = ["song_id","title", "artist_id", "year", "duration"]

In [14]:
songs_table = df_song.select(song_columns).dropDuplicates().withColumn("song_id", monotonically_increasing_id())

In [15]:
OUTPUT_DATA_PATH = "outputs/"

In [16]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy("year", "artist_id").parquet(OUTPUT_DATA_PATH+'songs_table/')

In [17]:
# extract columns to create artists table
artists_columns = ["artist_id",
                  "artist_name as name",
                  "artist_location as location",
                  "artist_latitude as latitude",
                  "artist_longitude as longitude"]

In [18]:
artists_table = df_song.selectExpr(artists_columns).dropDuplicates()

In [19]:
# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(OUTPUT_DATA_PATH+'artists_table/')

In [20]:
artists_table.show()

+------------------+---------------+--------------------+--------+---------+
|         artist_id|           name|            location|latitude|longitude|
+------------------+---------------+--------------------+--------+---------+
|ARSVTNL1187B992A91|  Jonathan King|     London, England|51.50632| -0.12714|
|ARY0RQP1187FB48B93|      Mickey 3D|                    |    null|     null|
|ARPIKA31187FB4C233|     The Action|            New York|40.71455|-74.00712|
|ARX16TQ1187B9899C9|     Oysterhead|     New Orleans, LA|29.95369|-90.07771|
|ARE50SC1187B98C04A|    The 69 Eyes|   Helsinki, Finland|60.17116| 24.93258|
|AR5R1EG1187FB3E94F|     Linda Eder|        Brainerd, MN|46.35316|-94.20069|
|AR3GZLR1187FB3D817|    Biffy Clyro|                    |    null|     null|
|ARXR32B1187FB57099|            Gob|                    |    null|     null|
|ARMX9KX1187B9A58E8|   Richard Marx|         Chicago, IL|41.88415|-87.63241|
|AR0IT221187B999C4D| The Weathermen|             BELGIUM|50.50101|  4.47684|

## Process Log Data   

In [21]:
df_log = spark.read.json(log_data)

In [22]:
df_log.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [23]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [24]:
# filter by actions for song plays
df_log = df_log.filter(df_log.page == "NextSong")

In [25]:
# extract columns for users table  
users_columns = ["userId as user_id",
                 "firstName as first_name",
                  "lastName as last_name",
                   "level",
                  "gender",
                  "location"]

In [26]:
users_table = df_log.selectExpr(users_columns).dropDuplicates()

In [27]:
# write users table to parquet files
users_table.write.mode("overwrite").parquet(OUTPUT_DATA_PATH+'users_table/')

In [28]:
df_log.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [29]:
df = df_log

In [30]:
df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [31]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x:  datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
df = df.withColumn("timestamp", get_timestamp(df.ts))

In [32]:
# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d'))
df = df.withColumn("datetime", get_datetime(df.ts))

In [33]:
df.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+-------------------+----------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|          timestamp|  datetime|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+-------------------+----------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:26|2018-11-15|
+--------+---------+---------+------+-------------+-----

## datetime and timestamp NOT NULL now 

In [34]:
# extract columns to create time table
time_table = df.select(
        col('timestamp').alias('start_time'),
        hour('timestamp').alias('hour'),
        dayofmonth('timestamp').alias('day'),
        weekofyear('timestamp').alias('week'),
        month('timestamp').alias('month'),
        year('timestamp').alias('year') 
   )

In [35]:
time_table.show()

+-------------------+----+---+----+-----+----+
|         start_time|hour|day|week|month|year|
+-------------------+----+---+----+-----+----+
|2018-11-15 00:30:26|   0| 15|  46|   11|2018|
|2018-11-15 00:41:21|   0| 15|  46|   11|2018|
|2018-11-15 00:45:41|   0| 15|  46|   11|2018|
|2018-11-15 03:44:09|   3| 15|  46|   11|2018|
|2018-11-15 05:48:55|   5| 15|  46|   11|2018|
|2018-11-15 05:53:44|   5| 15|  46|   11|2018|
|2018-11-15 05:55:56|   5| 15|  46|   11|2018|
|2018-11-15 06:01:02|   6| 15|  46|   11|2018|
|2018-11-15 06:07:37|   6| 15|  46|   11|2018|
|2018-11-15 06:10:33|   6| 15|  46|   11|2018|
|2018-11-15 06:13:54|   6| 15|  46|   11|2018|
|2018-11-15 06:14:16|   6| 15|  46|   11|2018|
|2018-11-15 06:17:59|   6| 15|  46|   11|2018|
|2018-11-15 06:18:48|   6| 15|  46|   11|2018|
|2018-11-15 06:21:33|   6| 15|  46|   11|2018|
|2018-11-15 06:25:58|   6| 15|  46|   11|2018|
|2018-11-15 06:29:38|   6| 15|  46|   11|2018|
|2018-11-15 07:08:36|   7| 15|  46|   11|2018|
|2018-11-15 0

In [36]:
# write time table to parquet files partitioned by year and month
time_table.write.mode("overwrite").partitionBy("year", "month").parquet(OUTPUT_DATA_PATH + "time_table/")

In [37]:
# read in song data to use for songplays table
song_data = os.path.join(INPUT_BUCKET, SONG_DATA_PATH)
song_df = spark.read.json(song_data)

In [38]:
song_df.show(3)

+------------------+---------------+---------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+---------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|      Tennessee|       -85.97874|Royal Philharmoni...|  94.56281|        1|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|   0|
|ARXQC081187FB4AD42|       54.31407|             UK|        -2.23001|William Shatner_ ...|1047.71873|        1|SOXRPUH12AB017F769|Exodus: Part I: M...|   0|
|ARWUNH81187FB4A3E0|           null|Miami , Florida|            null|         Trick Daddy| 227.10812|        1|SOVNKJI12A8C13CB0D|Take It To Da Hou...|2001|
+------------------+---------------+---------------+------

In [54]:
songplays_table = song_df.select(
    col("artist_id"),
    col("artist_name"),
    col("duration"),
     col('song_id'),
    col('title')
     )

In [55]:
songplays_table.show()

+------------------+--------------------+----------+------------------+--------------------+
|         artist_id|         artist_name|  duration|           song_id|               title|
+------------------+--------------------+----------+------------------+--------------------+
|ARSUVLW12454A4C8B8|Royal Philharmoni...|  94.56281|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|
|ARXQC081187FB4AD42|William Shatner_ ...|1047.71873|SOXRPUH12AB017F769|Exodus: Part I: M...|
|ARWUNH81187FB4A3E0|         Trick Daddy| 227.10812|SOVNKJI12A8C13CB0D|Take It To Da Hou...|
|ARTC1LV1187B9A4858|  The Bonzo Dog Band| 301.40036|SOAFBCP12A8C13CC7D|King Of Scurf (20...|
|ARA23XO1187B9AF18F|     The Smithereens|   192.522|SOKTJDS12AF72A25E5|Drown In My Own T...|
|ARLRWBW1242077EB29|     Mikhail Pletnev| 221.70077|SOYVBGZ12A6D4F92A8|Piano Sonata No. ...|
|AR5LZJD1187FB4C5E5|        Britt Nicole| 204.06812|SOGXFIF12A58A78CC4|Hanging On (Mediu...|
|ARV3PXE1187B98E680|   John Brown's Body| 289.01832|SOQAUGD12A58A7A92D

In [56]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- datetime: string (nullable = true)



In [57]:
df2 = df.select(
        col('ts').alias('ts'),
        col('userId').alias('user_id'),
        col('level').alias('level'),
        col('location').alias('location'),
        col('userAgent').alias('user_agent'),
        month('datetime').alias('month'),
        year('datetime').alias('year'),
        col("song")
                )

In [58]:
df2.show(4)

+-------------+-------+-----+--------------------+--------------------+-----+----+---------------+
|           ts|user_id|level|            location|          user_agent|month|year|           song|
+-------------+-------+-----+--------------------+--------------------+-----+----+---------------+
|1542241826796|     26| free|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|   11|2018|  Sehr kosmisch|
|1542242481796|     26| free|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|   11|2018|The Big Gundown|
|1542242741796|     26| free|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|   11|2018|       Marry Me|
|1542253449796|     61| free|Houston-The Woodl...|"Mozilla/5.0 (Mac...|   11|2018|      Blackbird|
+-------------+-------+-----+--------------------+--------------------+-----+----+---------------+
only showing top 4 rows



In [59]:
total = df2.join(songplays_table, songplays_table.title == df2.song)

In [45]:
total.count()

55

In [60]:
total.show()

+-------------+-------+-----+--------------------+--------------------+-----+----+-----------------+------------------+--------------------+---------+------------------+-----------------+
|           ts|user_id|level|            location|          user_agent|month|year|             song|         artist_id|         artist_name| duration|           song_id|            title|
+-------------+-------+-----+--------------------+--------------------+-----+----+-----------------+------------------+--------------------+---------+------------------+-----------------+
|1542287824796|     30| paid|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|   11|2018|            Crazy|ARAGJTD1187B9A8646|P Money ft. Littl...|223.84281|SOUPKAB12AB0185DF9|            Crazy|
|1542293170796|     30| paid|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|   11|2018|         Elevator|AR7WK5411A348EF5EA|        Minitel Rose|248.31955|SOTCOTZ12A8C136BCB|         Elevator|
|1542298745796|     97| paid|Lansing-East Lans...|"Mozilla/5

In [65]:
songplays_table = total

In [66]:
songplays_table.show(5)OUTPUT_DATA_PATH

+-------------+-------+-----+--------------------+--------------------+-----+----+--------------+------------------+--------------------+---------+------------------+--------------+
|           ts|user_id|level|            location|          user_agent|month|year|          song|         artist_id|         artist_name| duration|           song_id|         title|
+-------------+-------+-----+--------------------+--------------------+-----+----+--------------+------------------+--------------------+---------+------------------+--------------+
|1542287824796|     30| paid|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|   11|2018|         Crazy|ARAGJTD1187B9A8646|P Money ft. Littl...|223.84281|SOUPKAB12AB0185DF9|         Crazy|
|1542293170796|     30| paid|San Jose-Sunnyval...|Mozilla/5.0 (Wind...|   11|2018|      Elevator|AR7WK5411A348EF5EA|        Minitel Rose|248.31955|SOTCOTZ12A8C136BCB|      Elevator|
|1542298745796|     97| paid|Lansing-East Lans...|"Mozilla/5.0 (X11...|   11|2018|        

In [67]:
songplays_table.printSchema()

root
 |-- ts: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)



In [69]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.mode("overwrite").partitionBy("year", "month").parquet(OUTPUT_DATA_PATH+'songplays/')

In [None]:
## https://knowledge.udacity.com/questions/652188
## https://knowledge.udacity.com/questions/67777