In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import types as T

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()

In [5]:
input_data = "s3a://udacity-dend/"
song_data = input_data + "song_data/A/A/A/*.json"
    
    # read song data file
df_song = spark.read.json(song_data)
print("Song done")

Song done


In [6]:
df_song.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [7]:
# to extract columns for songs table
songs_table = df_song.select(["song_id", "title", "artist_id", "year", "duration"]).distinct()

In [8]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SODZYPO12A8C13A91E|Burn My Body (Alb...|AR1C2IX1187B99BF74|   0|177.99791|
|SOIGHOD12A8C13B5A1|        Indian Angel|ARY589G1187B9A9F4E|2004|171.57179|
|SOOVHYF12A8C134892|     I'll Be Waiting|ARCLYBR1187FB53913|1989|304.56118|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [9]:
output_data  = "s3a://udacity-sparkify-data-lake-chinazo/" 

# # write songs table to parquet files partitioned by year and artist
# songs_table.write.parquet(output_data+'songs/'+'songs.parquet', partitionBy=['year', 'artist_id'])

In [10]:
# extract columns for artist_table
artists_table = df_song.select(["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]).distinct()

In [11]:
artists_table.show(5)

+------------------+-------------+---------------+---------------+----------------+
|         artist_id|  artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+-------------+---------------+---------------+----------------+
|ARC1IHZ1187FB4E920| Jamie Cullum|               |           null|            null|
|ARZKCQM1257509D107|   Dataphiles|               |           null|            null|
|AREWD471187FB49873|     Son Kite|               |           null|            null|
|ARGE7G11187FB37E05| Cyndi Lauper|   Brooklyn, NY|           null|            null|
|ARSVTNL1187B992A91|Jonathan King|London, England|       51.50632|        -0.12714|
+------------------+-------------+---------------+---------------+----------------+
only showing top 5 rows



In [14]:
# # write artists table to parquet files 
# artists_table.write.parquet(output_data + 'artist/' + 'artists.parquet', partitionBy=['artist_id'])

In [12]:
# get filepath to log data file
log_data = input_data + "log_data/*/*/*.json"

# read log data file
df_log = spark.read.json(log_data)
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [13]:
df_log = df_log.where("page='NextSong'")
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [14]:
users_table = df_log.select(["userId", "firstName", "lastName", "gender", "level"])
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    26|     Ryan|   Smith|     M| free|
|    61|   Samuel|Gonzalez|     M| free|
|    80|    Tegan|  Levine|     F| paid|
+------+---------+--------+------+-----+
only showing top 5 rows



In [None]:
# # write users table to parquet files
# users_table.write.parquet(output_data + 'users/' + 'users.parquet', partitionBy = ['userId'])

In [15]:
log_df = df_log.withColumn('timestamp',( (df_log.ts.cast('float')/1000).cast("timestamp")) )

In [16]:
# extract columns to create time table
time_table = log_df.select(
                col("timestamp").alias("start_time"),
                hour("timestamp").alias('hour'),
                dayofmonth("timestamp").alias('day'),
                weekofyear("timestamp").alias('week'),
                month("timestamp").alias('month'), 
                year("timestamp").alias('year'), 
                date_format(col("timestamp"), "E").alias("weekday")
            )

In [17]:
time_table.show(5)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-15 00:29:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 00:40:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 00:44:...|   0| 15|  46|   11|2018|    Thu|
|2018-11-15 03:44:...|   3| 15|  46|   11|2018|    Thu|
|2018-11-15 05:48:...|   5| 15|  46|   11|2018|    Thu|
+--------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [None]:
# # write time table to parquet files partitioned by year and month
# time_table.write.parquet(output_data + 'time/' + 'time.parquet', partitionBy=['start_time'])

In [18]:

# read in song data to use for songplays table
song_df = spark.read.json(input_data+'song_data/*/*/*/*.json')

# join song_df and log_df
song_log_joined_table = log_df.join(song_df, (log_df.song == song_df.title) & (log_df.artist == song_df.artist_name) & (log_df.length == song_df.duration), how='inner')

# extract columns from joined song and log datasets to create songplays table 
songplays_table = song_log_joined_table.distinct() \
                    .select("userId", "timestamp", "song_id", "artist_id", "level", "sessionId", "location", "userAgent" ) \
                    .withColumn("songplay_id", row_number().over( Window.partitionBy('timestamp').orderBy("timestamp"))) \
                    .withColumnRenamed("userId","user_id")        \
                    .withColumnRenamed("timestamp","start_time")  \
                    .withColumnRenamed("sessionId","session_id")  \
                    .withColumnRenamed("userAgent", "user_agent") \

NameError: name 'Window' is not defined

In [None]:
songplays_table.show(5)