In [1]:
# Imports
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, monotonically_increasing_id, udf, col, to_date
from pyspark.sql.types import TimestampType

In [2]:
# AWS Cred
os.environ['AWS_ACCESS_KEY_ID'] = "" 
os.environ['AWS_SECRET_ACCESS_KEY'] = "" 

In [3]:
# Create Spark Session
def create_spark_session():
    """
        Create a Spark Session 
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
spark = create_spark_session()

In [5]:
# get filepath to log data file
#testdata/logs.json/2018-11-01-events.json
log_data = "s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json"

# read log data file
df = spark.read.json(log_data)
    
# filter by actions for song plays
df = df.filter(df.page == "NextSong")

In [6]:
from IPython.display import display

display(df)
df.show(1)

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+------------+------+-------------+--------------------+------+
| artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|        song|status|           ts|           userAgent|userId|
+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+------------+------+-------------+--------------------+------+
|Des'ree|Logged In|   Kaylee|     F|            1| Summers|246.30812| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139|You Gotta Be|   200|1541106106796|"Mozilla/5.0 (Win...|     8|
+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+------------+------+-------------+--------------

In [7]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp(x / 1000), TimestampType())
df = df.withColumn("start_time", get_timestamp(df.ts))
df.select("start_time").show(2)

+--------------------+
|          start_time|
+--------------------+
|2018-11-01 21:01:...|
|2018-11-01 21:05:...|
+--------------------+
only showing top 2 rows



In [8]:
# extract columns to create time table
time_table = df.select("start_time") \
                        .withColumn("hour", hour("start_time")) \
                        .withColumn("day", dayofmonth("start_time")) \
                        .withColumn("week", weekofyear("start_time")) \
                        .withColumn("month", month("start_time")) \
                        .withColumn("year", year("start_time")) \
                        .withColumn("weekday", dayofweek("start_time")) \
                        .dropDuplicates()

In [8]:
song_df = spark.read.json("s3a://udacity-dend/song_data/A/A/B/*.json")

In [9]:
# Artists table check
artists_table = song_df.select("artist_id", "artist_latitude", "artist_location", "artist_longitude", "artist_name").withColumnRenamed("artist_name", "name") \
    .withColumnRenamed("artist_latitude", "latitude") \
    .withColumnRenamed("artist_longitude", "longitude") \
    .withColumnRenamed("artist_location", "location") \
    .dropDuplicates()

In [11]:
artists_table.createOrReplaceTempView("artists_table")

In [12]:
artists_table.show(5)

+------------------+--------+------------------+---------+------------------+
|         artist_id|latitude|          location|longitude|              name|
+------------------+--------+------------------+---------+------------------+
|ARPIKA31187FB4C233|40.71455|          New York|-74.00712|        The Action|
|AR5AA4Q1187FB4CFBD|    null|                  |     null|    Alisha's Attic|
|ARV6GHH1187B9AED0D|62.19845|            SWEDEN| 17.55142|      Sofia Talvik|
|ARE6D0Y119B8667D9F|    null|                  |     null|Alejandro Santiago|
|ARZD4UW1187B9AB3D2|40.71455|NEW YORK, New York|-74.00712|       Latin Lover|
+------------------+--------+------------------+---------+------------------+
only showing top 5 rows



In [13]:
# users table check
users_table = df.select("userId", "firstName", "lastName", "gender", "level").dropDuplicates()

In [14]:
users_table.show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|   101|   Jayden|     Fox|     M| free|
|     8|   Kaylee| Summers|     F| free|
|    26|     Ryan|   Smith|     M| free|
|    10|   Sylvie|    Cruz|     F| free|
+------+---------+--------+------+-----+



In [9]:
display(time_table)
time_table.show(3)

DataFrame[start_time: timestamp, hour: int, day: int, week: int, month: int, year: int, weekday: int]

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



In [16]:
songs_table = song_df.select("song_id", "title", "artist_id", "year", "duration").dropDuplicates()
songs_table.createOrReplaceTempView("songs_table")
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOKPKMV12A8C14125E|Catwalk (Black In...|AR2L9A61187B9ADDBC|1995|492.30322|
|SOSRVUJ12AB018731E|       Walk the Walk|ARD8E0V1187FB5C3DB|2000|290.16771|
|SODVVEL12A6D4F9EA0|Good Old Wagon (L...|ARI9DQS1187B995625|1964|139.78077|
|SONHGLD12AB0188D47|          Our Father|AR1S3NH1187B98C2BC|1999| 202.4224|
|SOWBWRV12A6D4FB3D0|          I Know You|AR1LOGZ1187B990FB4|   0|307.46077|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [17]:
display(df)

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, start_time: timestamp]

In [18]:
display(song_df)

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [19]:
joined_df = df.join(song_df, song_df.artist_name == df.artist, "inner")

In [20]:
display(joined_df)

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, start_time: timestamp, artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [21]:
song_df.show(1)

+------------------+---------------+--------------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+-----------+---------+---------+------------------+--------------------+----+
|ARKIQCZ1187B9A7C7C|       52.23974|Northampton, Nort...|        -0.88576|    Bauhaus|248.65914|        1|SOSIJKW12A8C1330E3|A God In An Alcov...|   0|
+------------------+---------------+--------------------+----------------+-----------+---------+---------+------------------+--------------------+----+
only showing top 1 row



In [22]:
df.show(1)

+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+------------+------+-------------+--------------------+------+--------------------+
| artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|        song|status|           ts|           userAgent|userId|          start_time|
+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+------------+------+-------------+--------------------+------+--------------------+
|Des'ree|Logged In|   Kaylee|     F|            1| Summers|246.30812| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139|You Gotta Be|   200|1541106106796|"Mozilla/5.0 (Win...|     8|2018-11-01 21:01:...|
+-------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+-

In [23]:
songplays_table = joined_df.select(
    col("start_time"),
    col("userId").alias("user_id"),
    col("level"),
    col("song_id"),
    col("artist_id"),
    col("sessionId").alias("session_id"),
    col("location"), 
    year("start_time").alias("year"),
    month("start_time").alias("month"),
    col("userAgent").alias("user_agent"))\
    .withColumn("songplay_id", monotonically_increasing_id())