In [39]:
import configparser
from datetime import datetime
from time import sleep, time
import os
from pyspark.sql import SparkSession
# from pyspark.sql import types as t
from pyspark.sql.functions import udf, col, count, when
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek, from_unixtime

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()
# The package `org.apache.hadoop:hadoop-aws:2.7.0` allows you to connect aws S3.

In [3]:
spark

In [4]:
config = configparser.ConfigParser()
# config.read('dl.cfg')

config.read_file(open('dl.cfg'))
    
# print(config)
# config.sections()

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [5]:
# input_data_base_path = "s3a://udacity-dend/"
input_data = "data/"
output_data = "analytics/"

In [6]:
# df = spark.read.json(input_data + "song-data/A/A/A/*.json")

### Process the song data

In [62]:
# def process_song_data(spark, input_data, output_data):

# get filepath to song data file
song_data = input_data + "/song-data/*/*/*/*.json"

# read song data file
df = spark.read.json(song_data)

df.printSchema()
df.count()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



71

In [63]:

# # extract columns to create songs table
song_cols = ["song_id", "title", "artist_id", "year", "duration", "artist_name"]
songs_table = df.select(song_cols).distinct()
songs_table.show(5)

# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_name").mode('overwrite').parquet(output_data + "/songs")
print ("songs_table written to parquet\n")


+------------------+--------------------+------------------+----+---------+--------------+
|           song_id|               title|         artist_id|year| duration|   artist_name|
+------------------+--------------------+------------------+----+---------+--------------+
|SOKEJEJ12A8C13E0D0|The Urgency (LP V...|ARC43071187B990240|   0|245.21098|  Wayne Watson|
|SOZCTXZ12AB0182364|      Setanta matins|AR5KOSW1187FB35FF4|   0|269.58322|         Elena|
|SORRZGD12A6310DBC3|      Harajuku Girls|ARVBRGZ1187FB4675A|2004|290.55955|  Gwen Stefani|
|SOGVQGJ12AB017F169|           Ten Tonne|AR62SOJ1187FB47BB5|2005|337.68444|Chase & Status|
|SOQOTLQ12AB01868D0|  Clementina Santafè|ARGCY1Y1187B9A4FA5|   0|153.33832|      Gloriana|
+------------------+--------------------+------------------+----+---------+--------------+
only showing top 5 rows

songs_table written to parquet



In [56]:
# extract columns to create artists table
# artists_cols = ["df['artist_id']", "df['artist_name'] as name", "df['artist_location'] as location", "df['artist_latitude'] as latitude", "df['artist_longitude'] as longitude"]
# artists_cols = ["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"]

# artists_cols = ["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]
# artists_table = \
#     df.select(artists_cols) \
#         .withColumnRenamed("artist_name", "name") \
#         .withColumnRenamed("artist_location", "location") \
#         .withColumnRenamed("artist_latitude", "latitude") \
#         .withColumnRenamed("artist_longitude", "longitude") \
#         .distinct()

artists_cols = ["artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude"]
artists_table = df.selectExpr(artists_cols).distinct()

# artists_table = df.selectExpr(artists_cols)
artists_table.show(5)

# write artists table to parquet files
artists_table.write.mode('overwrite').parquet(output_data + "/artists")
print ("artists_table written to parquet")
    
    

+------------------+---------------+---------------+--------+----------+
|         artist_id|           name|       location|latitude| longitude|
+------------------+---------------+---------------+--------+----------+
|ARPBNLO1187FB3D52F|       Tiny Tim|   New York, NY|40.71455| -74.00712|
|ARXR32B1187FB57099|            Gob|               |    null|      null|
|AROGWRA122988FEE45|Christos Dantis|               |    null|      null|
|ARBGXIG122988F409D|     Steel Rain|California - SF|37.77916|-122.42005|
|AREVWGE1187B9B890A|     Bitter End|      Noci (BA)| -13.442|  -41.9952|
+------------------+---------------+---------------+--------+----------+
only showing top 5 rows

artists_table written to parquet


### Now the log data

In [88]:
# def process_log_data(spark, input_data, output_data):
# get filepath to log data file
log_data = input_data + "/log-data/*.json"

# read log data file
df = spark.read.json(log_data)

# df.show(3)
# df.limit(3).toPandas()

# filter by actions for song plays
df = df.filter(df["page"] == "NextSong")

df = df.withColumn("start_time", from_unixtime(col("ts")/1000)) \
    .withColumn('year', year('start_time')) \
    .withColumn('month', month('start_time'))
# df.show(3)

df.printSchema()
df.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



6820

In [90]:
# extract columns for users table    
# users_cols = ["userId", "firstName", "lastName", "gender", "level"]
# users_table = \
#     df.select(users_cols) \
#         .withColumnRenamed("userId", "user_id") \
#         .withColumnRenamed("firstName", "first_name") \
#         .withColumnRenamed("lastName", "last_name")
# users_table.show(3)

users_cols = ["userId as user_id", "firstName as first_name", "lastName as last_name", "gender", "level"]
users_table = df.selectExpr(users_cols).distinct()

users_table.show(3)
users_table.count()
# write users table to parquet files
# users_table.write.mode('overwrite').parquet(output_data + "/users")

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
+-------+----------+---------+------+-----+
only showing top 3 rows



104

In [96]:
# # create timestamp column from original timestamp column
# get_timestamp = udf()
# df = 

# # create datetime column from original timestamp column
# get_datetime = udf()
# df = 

# extract columns to create time table
time_table = df \
    .select(from_unixtime(col("ts")/1000).alias("start_time")) \
    .withColumn('hour', hour('start_time')) \
    .withColumn('day', dayofmonth('start_time')) \
    .withColumn('week', weekofyear('start_time')) \
    .withColumn('month', month('start_time')) \
    .withColumn('year', year('start_time')) \
    .withColumn('weekday', dayofweek('start_time')) \
    .distinct()

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year", "month").mode('overwrite').parquet(output_data + "/time")

In [97]:
time_table.show(3)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 12:38:03|  12| 15|  46|   11|2018|      5|
|2018-11-15 22:00:58|  22| 15|  46|   11|2018|      5|
|2018-11-21 19:00:45|  19| 21|  47|   11|2018|      4|
+-------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [98]:
time_table.count()

6813

In [112]:
# read in song data to use for songplays table
# song_df = df \
#     .where(df["page"] == "NextSong") \
#     .join( \
#         songs_table, \
#         (df.artist == songs_table.artist_name) & (df.song == songs_table.title) & (df.length == songs_table.duration) & (songs_table.song_id.isNotNull()), \
#         how='left' \
#         )


# df = df.withColumn("start_time", from_unixtime(col("ts")/1000)) \
#     .withColumn('year', year('start_time')) \
#     .withColumn('month', month('start_time'))
# # df.show(3)

partial = False
if partial:
    song_data = input_data + "/song-data/A/A/A/*.json"
else:
    song_data = input_data + "/song-data/*/*/*/*.json"
songs_df = spark.read.json(song_data)
songs_table = songs_df \
    .select(["song_id", "title", "artist_id", "year", "duration", "artist_name"]) \
    .filter(col("song_id").isNotNull()) \
    .distinct()
     
# song_df = df \
#     .join( \
#         songs_table, \
#         (df.artist == songs_table.artist_name) & (df.song == songs_table.title) & (df.length == songs_table.duration), \
#         how='left' \
#         ) \
#     .filter(songs_table.song_id.isNotNull())


#.select(["ts", "userId", "level", ])

# song_df.count()
# song_df.show(3)

# sp_df = df.selectExpr(["artist", "length", "ts", "song", "userId as user_id", "level", "sessionId as session_id", "location", "userAgent as user_agent"]) \
#         .withColumn("start_time", from_unixtime(col("ts")/1000)) \
#         .withColumn('year', year('start_time')) \
#         .withColumn('month', month('start_time')) \
#         .drop("ts")
# sp_df = df.selectExpr(["artist", "length", "ts", "song", "userId as user_id", "level", "sessionId as session_id", "location", "userAgent as user_agent"]) \
#         .drop("ts")

# sp_df.show(3)

# songplays_table = sp_df \
#     .join( \
#         songs_table, \
#         (sp_df.artist == songs_table.artist_name) & (sp_df.song == songs_table.title) & (sp_df.length == songs_table.duration), \
#         how='left' \
#         ) \
#     .filter(songs_table.song_id.isNotNull()) \
#     .drop("artist") \
#     .drop("length")

songplays_table = df \
    .join( \
        songs_table, \
        (df.artist == songs_table.artist_name) & (df.song == songs_table.title) & (df.length == songs_table.duration), \
        how='left' \
        ) #\
#     .filter(songs_table.song_id.isNotNull())

songplays_table = songplays_table\
    .selectExpr(["start_time", "userId as user_id", "level", "song_id", "artist_id", "sessionId as session_id", "location", "userAgent as user_agent"])

# .withColumnRenamed("userId", "user_id") \
#     .withColumnRenamed("sessionId", "session_id") \
#     .withColumnRenamed("userAgent", "user_agent") \

songplays_table.show(3)

# extract columns from joined song and log datasets to create songplays table 
# we'll want to partition by year and month so we need to extract that info from the ts column
# songplays_table = \
#     song_df \
#         .select(["ts", "userId", "level", "song_id", "artist_id", "sessionId", "location", "userAgent"]) \
#         .withColumn("start_time", from_unixtime(col("ts")/1000)) \
#         .withColumn('year', year('start_time')) \
#         .withColumn('month', month('start_time')) \
#         .drop("ts")
# songplays_table.show(3)
# songplays_table.count()

# write songplays table to parquet files partitioned by year and month
# songplays_table.write.partitionBy("year", "month").mode('overwrite').parquet(output_data + "/songplays")


+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
|2018-11-15 00:30:26|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-15 00:41:21|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-15 00:45:41|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+
only showing top 3 rows



In [113]:
songplays_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [102]:
songplays_table.count()

1

In [57]:
# from pyspark.sql.functions import isnan, when, count, col

# artists_table.select([count(when(col('artist_id').isNull(), True))]).show()
songs_table.select([count(when(col(c).isNull(), c)).alias(c) for c in songs_table.columns]).show()
artists_table.select([count(when(col(c).isNull(), c)).alias(c) for c in artists_table.columns]).show()

users_table.select([count(when(col(c).isNull(), c)).alias(c) for c in users_table.columns]).show()
time_table.select([count(when(col(c).isNull(), c)).alias(c) for c in time_table.columns]).show()
songplays_table.select([count(when(col(c).isNull(), c)).alias(c) for c in songplays_table.columns]).show()


+-------+-----+---------+----+--------+-----------+
|song_id|title|artist_id|year|duration|artist_name|
+-------+-----+---------+----+--------+-----------+
|      0|    0|        0|   0|       0|          0|
+-------+-----+---------+----+--------+-----------+

+---------+----+--------+--------+---------+
|artist_id|name|location|latitude|longitude|
+---------+----+--------+--------+---------+
|        0|   0|       0|      38|       38|
+---------+----+--------+--------+---------+

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      0|         0|        0|     0|    0|
+-------+----------+---------+------+-----+

+----------+----+---+----+-----+----+-------+
|start_time|hour|day|week|month|year|weekday|
+----------+----+---+----+-----+----+-------+
|         0|   0|  0|   0|    0|   0|      0|
+----------+----+---+----+-----+----+-------+

+------+-----+-------+---------+---------+--------+---------+