In [318]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date, TimestampType as Ts

In [319]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('DEFAULT','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('DEFAULT','AWS_SECRET_ACCESS_KEY')

In [320]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [321]:
# get filepath to song data file
song_data = "./data/song_data/*/*/*/*.json"

In [322]:
stagingSongsSchema = R([
    Fld("song_id", Str()),
    Fld("num_songs", Int()),
    Fld("title", Str()),
    Fld("artist_name", Str()),
    Fld("artist_latitude", Dbl()),
    Fld("year", Int()),
    Fld("duration", Dbl()),
    Fld("artist_id", Str()),
    Fld("artist_longitude", Dbl()),
    Fld("artist_location", Str())
])

In [323]:
# read song data file
df_song = spark.read.json(song_data, schema=stagingSongsSchema)

In [324]:
df_song.count()

71

In [325]:
df_song.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: double (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_location: string (nullable = true)



In [326]:
df_song.show(5)

+------------------+---------+--------------------+--------------------+---------------+----+---------+------------------+----------------+-----------------+
|           song_id|num_songs|               title|         artist_name|artist_latitude|year| duration|         artist_id|artist_longitude|  artist_location|
+------------------+---------+--------------------+--------------------+---------------+----+---------+------------------+----------------+-----------------+
|SOBAYLL12A8C138AF9|        1|Sono andati? Fing...|Montserrat Caball...|           null|   0|511.16363|ARDR4AC1187FB371A1|            null|                 |
|SOOLYAZ12A6701F4A6|        1|Laws Patrolling (...|Mike Jones (Featu...|           null|   0|173.66159|AREBBGV1187FB523D2|            null|      Houston, TX|
|SOBBUGU12A8C13E95D|        1|Setting Fire to S...|The Dillinger Esc...|       40.82624|2004|207.77751|ARMAC4T1187FB3FA4C|       -74.47995|Morris Plains, NJ|
|SOAOIBZ12AB01815BE|        1|I Hold Your Hand ...| 

In [327]:
df_song.createOrReplaceTempView("staging_songs")

In [328]:
songs_table = spark.sql(
    """
    SELECT 
        song_id,
        title,
        artist_id,
        year,
        duration
    FROM staging_songs
    """
)

In [329]:
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (...|AREBBGV1187FB523D2|   0|173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|2005|186.48771|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [330]:
songs_table.write.mode("ignore").partitionBy("year", "artist_id").parquet("./output/songs")

In [331]:
artists_table = spark.sql(
    """
    SELECT
        artist_id,
        artist_name AS name,
        artist_location AS location,
        artist_latitude AS latitude,
        artist_longitude AS longitude
    FROM staging_songs
    """
)

In [332]:
artists_table.show(3)

+------------------+--------------------+-----------------+--------+---------+
|         artist_id|                name|         location|latitude|longitude|
+------------------+--------------------+-----------------+--------+---------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                 |    null|     null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|      Houston, TX|    null|     null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|Morris Plains, NJ|40.82624|-74.47995|
+------------------+--------------------+-----------------+--------+---------+
only showing top 3 rows



In [333]:
artists_table.write.mode("ignore").parquet("./output/artists")

In [334]:
# get filepath to log data file
log_data = "./data/log-data/*.json"

In [335]:
stagingEventsSchema = R([
    Fld("artist", Str()),
    Fld("auth", Str()),
    Fld("firstName", Str()), 
    Fld("gender", Str()), 
    Fld("itemInSession", Int()), 
    Fld("lastName", Str()), 
    Fld("length", Dbl()), 
    Fld("level", Str()), 
    Fld("location", Str()), 
    Fld("method", Str()), 
    Fld("page", Str()), 
    Fld("registration", Str()), 
    Fld("sessionId", Str()), 
    Fld("song", Str()), 
    Fld("status", Str()), 
    Fld("ts", Str()), 
    Fld("userAgent", Str()), 
    Fld("userId", Str()) 
])

In [336]:
df_log = spark.read.json(log_data, schema=stagingEventsSchema)

In [337]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [338]:
df_log.count()

8056

In [339]:
df_log.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [340]:
df_log = df_log.filter(df_log.page == 'NextSong')

In [341]:
df_log.count()

6820

In [342]:
df_log.show(10)

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyva

In [343]:
df_log.createOrReplaceTempView("staging_events")

In [344]:
users_table = spark.sql(
    """
    SELECT
        userId AS user_id,
        firstName AS first_name,
        lastName AS last_name,
        gender,
        level
    FROM staging_events
    """
)

In [345]:
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     26|      Ryan|    Smith|     M| free|
|     61|    Samuel| Gonzalez|     M| free|
|     80|     Tegan|   Levine|     F| paid|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [346]:
users_table.write.mode("ignore").parquet("./output/users")

In [347]:
df_log1 = df_log.withColumn("new_ts", df_log['ts'].cast('date'))

In [354]:
df_log1.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|new_ts|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|  null|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextS

In [357]:
start_time = spark.sql(
    """
    SELECT 
        DISTINCT from_unixtime(cast(ts as bigint)/1000,'yyyy-MM-dd HH:mm:ss') AS start_time
    FROM staging_events;
    """
)

In [350]:
time_table = start_time.select(
    "start_time",
    hour("start_time").alias('hour'),
    dayofmonth("start_time").alias('day'),
    weekofyear("start_time").alias('week'),
    month("start_time").alias('month'),
    year("start_time").alias('year'),
    dayofweek("start_time").alias('weekday')
)

In [351]:
time_table.write.mode("ignore").partitionBy('year', 'month').parquet('./output/time')

In [403]:
songplays_table = spark.sql(
    """
    WITH CTE AS (
        SELECT
            DISTINCT from_unixtime(cast(se.ts as bigint)/1000,'yyyy-MM-dd HH:mm:ss') AS start_time,
            se.userId, se.level, ss.song_id, ss.artist_id, se.sessionId AS session_id, se.location, se.userAgent AS user_agent
        FROM 
            staging_events se
        INNER JOIN 
            staging_songs ss
        ON 
            se.song = ss.title 
        AND 
            se.artist = ss.artist_name
    ) 
    SELECT 
        ROW_NUMBER() OVER(ORDER BY start_time) songplay_id,
        *, 
        YEAR(start_time) year, 
        MONTH(start_time) month 
    FROM 
        CTE;
    """
)

In [404]:
songplays_table.show(5)

+-----------+-------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|         start_time|userId|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          1|2018-11-21 16:56:47|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
+-----------+-------------------+------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+



In [405]:
songplays_table.write.mode("ignore").partitionBy('year', 'month').parquet('./output/songplays')