In [68]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.functions import udf, col, desc, row_number
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, col, monotonically_increasing_id
from pyspark.sql.window import Window

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [4]:
input_data = "data/"
output_data = "data/output_data/"

In [8]:
# get filepath to song data file
song_data = input_data + "song-data/song-data/*/*/*/*.json" 

# read song data file
df = spark.read.json(song_data)

In [9]:
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [14]:
df.limit(5).show()

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [15]:
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()

In [16]:
df.limit(5).show()

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [17]:
songs_table.write.option("header",True) \
    .partitionBy("year","artist_id") \
    .mode("overwrite") \
    .parquet(output_data + "songs")

In [37]:
# artists_table = df.select("artist_id", col("name") as "artist_name", col("location") as "artist_location", col("lattitude") as "artist_lattitude", col("longitude") as "artist_longitude").dropDuplicates()
artists_table = df.select("artist_id", df["artist_name"].alias("name"), \
                          df["artist_location"].alias("location"), \
                          df["artist_latitude"].alias("latitude"), \
                          df["artist_longitude"].alias("longitude")).dropDuplicates()

In [38]:
artists_table.write.option("header",True) \
        .mode("overwrite") \
        .parquet(output_data + "artists")

In [39]:
parDF1=spark.read.parquet(output_data + "artists")
parDF1.limit(5).show()


+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|40.79195|-73.94512|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
+------------------+--------------------+--------------------+--------+---------+



In [None]:
# get filepath to song data file
song_data = input_data + "song-data/song-data/*.json"

# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()

# write songs table to parquet files partitioned by year and artist
songs_table.write.option("header",True) \
    .partitionBy("year","artist_id") \
    .mode("overwrite") \
    .parquet(output_data + "songs")

# extract columns to create artists table
artists_table = df.select("artist_id", df["artist_name"].alias("name"), \
                          df["artist_location"].alias("location"), \
                          df["artist_latitude"].alias("latitude"), \
                          df["artist_longitude"].alias("longitude")).dropDuplicates()

# write artists table to parquet files
artists_table.write.option("header",True) \
        .mode("overwrite") \
        .parquet(output_data + "artists")

In [41]:
# get filepath to log data file
log_data = input_data + "log-data/*.json"
song_data = input_data + "song-data/song-data/*/*/*/*.json" 
# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

df.limit(5).show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [42]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [56]:
# users_table = df.select(col("userId") as "user_id", col("firstName") as "first_name",\
#                         col("last_Name") as "last_name", "gender", "level").dropDuplicates()

users_table = df.select(df["userId"].alias("user_id"), \
                        df["firstName"].alias("first_name"), \
                        df["lastName"].alias("last_name"), \
                        "gender", "level", "ts").dropDuplicates()


In [57]:
users_table.limit(5).show()

+-------+----------+---------+------+-----+-------------+
|user_id|first_name|last_name|gender|level|           ts|
+-------+----------+---------+------+-----+-------------+
|     97|      Kate|  Harrell|     F| paid|1542308354796|
|     80|     Tegan|   Levine|     F| paid|1542174581796|
|     80|     Tegan|   Levine|     F| paid|1542179185796|
|     16|     Rylan|   George|     M| paid|1542185237796|
|     80|     Tegan|   Levine|     F| paid|1542215370796|
+-------+----------+---------+------+-----+-------------+



In [60]:
# val windowSpec = Window.partitionBy("user_id").orderBy(desc("ts"))                                                           
# users_table = users_table.withColumn("row_number", row_number.over(windowSpec))
# users_table = users_table.filter(users_table.row_number == 1).drop("row_number")

user_window = Window \
    .partitionBy('user_id') \
    .orderBy(desc('ts')) \

users_table = users_table.withColumn("row_number", row_number().over(user_window))

users_table = users_table.filter(users_table.row_number == 1).drop("row_number").drop("ts")

users_table.limit(100).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     51|      Maia|    Burke|     F| free|
|      7|    Adelyn|   Jordan|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     54|     Kaleb|     Cook|     M| free|
|    101|    Jayden|      Fox|     M| free|
|     11| Christian|   Porter|     F| free|
|     29|Jacqueline|    Lynch|     F| paid|
|     69|  Anabelle|  Simpson|     F| free|
|     42|    Harper|  Barrett|     M| paid|
|     73|     Jacob|    Klein|     M| paid|
|     87|    Dustin|      Lee|     M| free|
|     64|    Hannah|  Calhoun|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|     30|     Avery|  Watkins|     F| paid|
|     34|    Evelin|    Ayala|     F| free|
|     59|      Lily|   Cooper|     F| free|
|      8|    Kaylee|  Summers|     F| free|
|     22|      Sean|   Wilson|     F| free|
|     28|  Brantley|     West|     M| free|
|     85|   Kinsley|    Young|  

In [67]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- start_time: timestamp (nullable = true)



In [61]:
users_table.write.option("header",True) \
    .mode("overwrite") \
    .parquet(output_data + "users")

In [74]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts : int(int(ts)/1000),  IntegerType())
df = df.withColumn("timestamp", get_timestamp("ts"))

# create datetime column from original timestamp column
get_datetime = udf(lambda ts : datetime.fromtimestamp(ts), TimestampType())
df = df.withColumn("start_time", get_datetime("timestamp"))

In [75]:
df.select("ts", "timestamp", "start_time").limit(5).show()

+-------------+----------+-------------------+
|           ts| timestamp|         start_time|
+-------------+----------+-------------------+
|1542241826796|1542241826|2018-11-15 00:30:26|
|1542242481796|1542242481|2018-11-15 00:41:21|
|1542242741796|1542242741|2018-11-15 00:45:41|
|1542253449796|1542253449|2018-11-15 03:44:09|
|1542260935796|1542260935|2018-11-15 05:48:55|
+-------------+----------+-------------------+



In [81]:
time_table = df.select("start_time", hour("start_time").alias("hour"), dayofmonth("start_time").alias("day"), \
                       weekofyear("start_time").alias("week"), month("start_time").alias("month"), \
                       year("start_time").alias("year")).withColumn("weekday", date_format(col("start_time"), "u")).dropDuplicates()


In [82]:
time_table.limit(5).show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 07:08:36|   7| 15|  46|   11|2018|      4|
|2018-11-15 11:32:33|  11| 15|  46|   11|2018|      4|
|2018-11-21 05:50:57|   5| 21|  47|   11|2018|      3|
|2018-11-14 08:54:49|   8| 14|  46|   11|2018|      3|
|2018-11-14 16:34:43|  16| 14|  46|   11|2018|      3|
+-------------------+----+---+----+-----+----+-------+



In [83]:
time_table.write.option("header",True) \
    .partitionBy("year","month") \
    .mode("overwrite") \
    .parquet(output_data + "time")

In [92]:
song_df = spark.read.json(song_data)

# extract columns from joined song and log datasets to create songplays table artist_id
songplays_table = df.join(song_df,(df["song"] == song_df["title"]) & ( df["artist"] == song_df["artist_name"]),"left").dropDuplicates()

songplays_table = songplays_table.select("start_time", df["userId"].alias("user_id"), "level", "song_id",\
                                         "artist_id", df["sessionId"].alias("session_id"), "location",\
                                         month("start_time").alias("month"), year("start_time").alias("year"), \
                                         df["userAgent"].alias("user_agent")).withColumn("songplay_id", monotonically_increasing_id())

In [105]:
songplays_table.write.option("header",True) \
    .partitionBy("year","month") \
    .mode("overwrite") \
    .parquet(output_data + "songs")

In [107]:
ee = spark.read.parquet(output_data + "songs")
ee.count()
# ee.limit(10).show()

71

In [109]:
ee = spark.read.parquet(output_data + "artists")
ee.count()
ee.limit(10).show()

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624|-74.47995|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086|-73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|40.79195|-73.94512|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632| -0.12714|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|     null|
|ARD842G1187B997376|          Blue Rodeo|Toronto, Ontario,...|43.64856|-79.38533|
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|    null|     null|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|    null|     null|
|ARIG6O41187B988

In [111]:
ee = spark.read.parquet(output_data + "time")
# ee.count()
ee.limit(10).show()

+-------------------+----+---+----+-------+----+-----+
|         start_time|hour|day|week|weekday|year|month|
+-------------------+----+---+----+-------+----+-----+
|2018-11-15 09:54:18|   9| 15|  46|      4|2018|   11|
|2018-11-15 14:57:44|  14| 15|  46|      4|2018|   11|
|2018-11-15 15:21:50|  15| 15|  46|      4|2018|   11|
|2018-11-15 16:42:54|  16| 15|  46|      4|2018|   11|
|2018-11-21 05:07:40|   5| 21|  47|      3|2018|   11|
|2018-11-21 05:14:42|   5| 21|  47|      3|2018|   11|
|2018-11-21 09:17:45|   9| 21|  47|      3|2018|   11|
|2018-11-21 13:23:15|  13| 21|  47|      3|2018|   11|
|2018-11-21 15:30:00|  15| 21|  47|      3|2018|   11|
|2018-11-14 06:03:05|   6| 14|  46|      3|2018|   11|
+-------------------+----+---+----+-------+----+-----+



In [113]:
ee = spark.read.parquet(output_data + "users")
# ee.count()
ee.limit(10).show()

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| paid|
|     75|    Joseph|Gutierrez|     M| free|
|     53|   Celeste| Williams|     F| free|
|     60|     Devin|   Larson|     M| free|
|     68|    Jordan|Rodriguez|     F| free|
|     90|    Andrea|   Butler|     F| free|
|     14|  Theodore|   Harris|     M| free|
|      2|   Jizelle| Benjamin|     F| free|
|     77| Magdalene|   Herman|     F| free|
|     89|   Kynnedi|  Sanchez|     F| free|
+-------+----------+---------+------+-----+



In [115]:
ee = spark.read.parquet(output_data + "songplays")
# ee.count()
ee.limit(10).show()

+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+------------+----+-----+
|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent| songplay_id|year|month|
+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+------------+----+-----+
|2018-11-15 16:28:02|     97| paid|   null|     null|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|257698037760|2018|   11|
|2018-11-15 17:56:18|     97| paid|   null|     null|       605|Lansing-East Lans...|"Mozilla/5.0 (X11...|257698037761|2018|   11|
|2018-11-15 22:23:48|     49| paid|   null|     null|       630|San Francisco-Oak...|Mozilla/5.0 (Wind...|257698037762|2018|   11|
|2018-11-21 11:40:46|     15| paid|   null|     null|       764|Chicago-Napervill...|"Mozilla/5.0 (X11...|257698037763|2018|   11|
|2018-11-21 15:33:44|     58| paid|   null|     null|       768|Augusta-Richmond ..

In [None]:
# get filepath to log data file
log_data = "log-data/*.json"
song_data = input_data + "song-data/song-data/*/*/*/*.json" 
# read log data file
df = spark.read.json(log_data)

# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

# extract columns for users table    
users_table = df.select(df["userId"].alias("user_id"), \
                        df["firstName"].alias("first_name"), \
                        df["lastName"].alias("last_name"), \
                        "gender", "level").dropDuplicates()

user_window = Window \
    .partitionBy('user_id') \
    .orderBy(desc('ts')) \

users_table = users_table.withColumn("row_number", row_number().over(user_window))

users_table = users_table.filter(users_table.row_number == 1).drop("row_number").drop("ts")

# write users table to parquet files
users_table.write.option("header",True) \
    .mode("overwrite") \
    .parquet(output_data + "users")

# create timestamp column from original timestamp column
get_timestamp = udf(lambda ts : int(int(ts)/1000),  IntegerType())
df = df.withColumn("timestamp", get_timestamp("ts"))

# create datetime column from original timestamp column
get_datetime = udf(lambda ts : datetime.fromtimestamp(ts), TimestampType())
df = df.withColumn("start_time", get_datetime("timestamp"))

# extract columns to create time table
time_table = df.select("start_time", hour("start_time"), day("start_time"), week("start_time"),\
                       month("start_time"), year("start_time"), weekday("start_time")).dropDuplicates()

# write time table to parquet files partitioned by year and month
time_table.write.option("header",True) \
    .partitionBy("year","month") \
    .mode("overwrite") \
    .parquet(output_data + "time")

# SELECT DISTINCT
#     timestamp 'epoch' + se.ts * interval '0.001 second',
#     se.userId,
#     se.level,
#     ss.song_id,
#     ss.artist_id,
#     se.sessionId,
#     se.location,
#     se.userAgent
# FROM staging_events se
# LEFT JOIN staging_songs ss
# ON ss.title = se.song AND ss.artist_name = se.artist
# WHERE se.page = 'NextSong'
# ;
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent

# SELECT song_id, artists.artist_id
# FROM songs JOIN artists
# ON songs.artist_id = artists.artist_id
# WHERE title = %s AND name = %s AND duration = %s
# ;
# read in song data to use for songplays table
song_df = spark.read.json(song_data)

# extract columns from joined song and log datasets to create songplays table artist_id
songplays_table = df.join(song_df,(df["song"] == artists_table["title"]) & ( df["name"] == songs_table["artist"]),"left").dropDuplicates
songplays_table = songplays_table.select("start_time", col("userId") as "user_id", "level", "song_id",\
                                         "artist_id", col("sessionId") as "session_id", "location",\
                                         month("start_time"), year("start_time") \
                                         col("userAgent") as "user_agent").withColumn("songplay_id", monotonically_increasing_id())

# write songplays table to parquet files partitioned by year and month
songplays_table.write.option("header",True) \
    .partitionBy("year","month") \
    .mode("overwrite") \
    .parquet(output_data + "songs")