In [1]:
import configparser
from datetime import datetime
import os
import pandas as pd
from pyspark.sql.types import MapType, StringType, StructType, IntegerType, DoubleType, TimestampType, StructField
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, split
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [6]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [2]:
#input_data = "s3a://udacity-dend/"
input_data_logs = "./data/log-data.zip"
input_data_songs = "./data/song-data.zip"
input_data_songs_json = "./data/song_data/"
input_data_logs_json = "./data/logs_data/"

In [3]:
import zipfile
with zipfile.ZipFile(input_data_songs, 'r') as zip_ref:
    zip_ref.extractall(input_data_songs_json)
with zipfile.ZipFile(input_data_logs, 'r') as zip_ref:
    zip_ref.extractall(input_data_logs_json)

In [4]:
jsonSongSchema = StructType([
    StructField("num_songs",IntegerType()),
    StructField("artist_id",StringType()),
    StructField("artist_latitud",DoubleType()),
    StructField("artist_longitud",DoubleType()),
    StructField("artist_location",StringType()),
    StructField("artist_name",StringType()),
    StructField("song_id",StringType()),
    StructField("title",StringType()),
    StructField("duration",DoubleType()),
    StructField("year",IntegerType()),  
])


In [7]:
df = spark.read.json(input_data_songs_json+"*/*/*/*.json",schema=jsonSongSchema)

In [13]:
dfLogs = spark.read.json(input_data_logs_json+"*.json")

In [9]:
df.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitud: double (nullable = true)
 |-- artist_longitud: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)



In [14]:
dfLogs.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
df.head(5)

[Row(num_songs=1, artist_id='ARDR4AC1187FB371A1', artist_latitud=None, artist_longitud=None, artist_location='', artist_name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', song_id='SOBAYLL12A8C138AF9', title='Sono andati? Fingevo di dormire', duration=511.16363, year=0),
 Row(num_songs=1, artist_id='AREBBGV1187FB523D2', artist_latitud=None, artist_longitud=None, artist_location='Houston, TX', artist_name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", song_id='SOOLYAZ12A6701F4A6', title='Laws Patrolling (Album Version)', duration=173.66159, year=0),
 Row(num_songs=1, artist_id='ARMAC4T1187FB3FA4C', artist_latitud=None, artist_longitud=None, artist_location='Morris Plains, NJ', artist_name='The Dillinger Escape Plan', song_id='SOBBUGU12A8C13E95D', title='Setting Fire to Sleeping Giants', duration=207.77751, year=2004),
 Row(num_songs=1, artist_id='ARPBNLO1187FB3D52F', artist_latitud=None, artist_longitud=None, artist_location='New York,

In [16]:
dfLogs.head(5)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [17]:
df.count()

71

In [18]:
dfLogs.count()

8056

In [19]:
df.show(5, truncate=True)

+---------+------------------+--------------+---------------+--------------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitud|artist_longitud|     artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+--------------+---------------+--------------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|          null|           null|                    |Montserrat Caball...|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16363|   0|
|        1|AREBBGV1187FB523D2|          null|           null|         Houston, TX|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|
|        1|ARMAC4T1187FB3FA4C|          null|           null|   Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|
|        1|ARPBNLO1187FB3D52F|          null| 

In [20]:
dfLogs.show(5, truncate=True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [21]:
dfLogs = dfLogs.filter(dfLogs.page == 'NextSong')

In [22]:
dfLogs.count()

6820

In [23]:
df.createOrReplaceTempView("songs_table_staging")
dfLogs.createOrReplaceTempView("logs_table_staging")

In [24]:
test = spark.sql("select * from logs_table_staging limit 5")

In [25]:
test.show()

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

In [27]:
songplay = spark.sql("""
SELECT distinct lgTable.ts as start_time, lgTable.userId, lgTable.level, sgTable.song_id, sgTable.artist_id, lgTable.sessionId as session_id, lgTable.location, lgTable.userAgent as user_agent
from logs_table_staging as lgTable
join songs_table_staging as sgTable
on (lgTable.song = sgTable.title and lgTable.artist = sgTable.artist_name)
where lgTable.page = 'NextSong'
""")

In [28]:
songplay.registerTempTable("songplays")

In [29]:
songplay.show(5)

+-------------+------+-----+------------------+------------------+----------+--------------------+--------------------+
|   start_time|userId|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------+------+-----+------------------+------------------+----------+--------------------+--------------------+
|1542837407796|    15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-------------+------+-----+------------------+------------------+----------+--------------------+--------------------+



In [30]:
songplay.count()

1

In [31]:
user_table = spark.sql("""
SELECT distinct userId as user_id, firstName as first_name, lastName as last_name, gender, level
from logs_table_staging
where page='NextSong'
""")

In [32]:
user_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [33]:
user_table.count()

104

In [34]:
song_table = spark.sql("""
SELECT distinct song_id, title, artist_id, year, duration
from songs_table_staging
where song_id is not null
""")

In [35]:
song_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOUDSGM12AC9618304|Insatiable (Instr...|ARNTLGG11E2835DDB9|   0|266.39628|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOZCTXZ12AB0182364|      Setanta matins|AR5KOSW1187FB35FF4|   0|269.58322|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [36]:
song_table.count()

71

In [37]:
artist_table = spark.sql("""
select distinct artist_id, artist_name as name, artist_location as location, artist_latitud as latitude, artist_longitud as longitude
from songs_table_staging
where artist_id is not null
""")

In [38]:
artist_table.show(5)

+------------------+------------+---------------+--------+---------+
|         artist_id|        name|       location|latitude|longitude|
+------------------+------------+---------------+--------+---------+
|ARGSJW91187B9B1D6B|JennyAnyKind| North Carolina|    null|     null|
|ARPFHN61187FB575F6| Lupe Fiasco|    Chicago, IL|    null|     null|
|ARQGYP71187FB44566|Jimmy Wakely|    Mineola, AR|    null|     null|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|    null|     null|
|ARGCY1Y1187B9A4FA5|    Gloriana| Nashville, TN.|    null|     null|
+------------------+------------+---------------+--------+---------+
only showing top 5 rows



In [39]:
artist_table.count()

69

In [40]:
spark.udf.register("get_hour", lambda x: int(datetime.fromtimestamp(x / 1000.0).hour))
spark.udf.register("get_day", lambda x: int(datetime.fromtimestamp(x / 1000.0).day))
spark.udf.register("get_week", lambda x: int(datetime.fromtimestamp(x / 1000.0).isocalendar()[1]))
spark.udf.register("get_month", lambda x: int(datetime.fromtimestamp(x / 1000.0).month))
spark.udf.register("get_year", lambda x: int(datetime.fromtimestamp(x / 1000.0).year))
spark.udf.register("get_dayofweek", lambda x: int(datetime.fromtimestamp(x / 1000.0).weekday()))

<function __main__.<lambda>(x)>

In [41]:
time_table = spark.sql("""
SELECT distinct ts as start_time, get_hour(ts) as hour, get_day(ts) as day, get_week(ts) as week, get_month(ts) as month, get_year(ts) as year, get_dayofweek(ts) as dayofweek
from logs_table_staging
""")

In [42]:
time_table.show(5)

+-------------+----+---+----+-----+----+---------+
|   start_time|hour|day|week|month|year|dayofweek|
+-------------+----+---+----+-----+----+---------+
|1542308793796|  19| 15|  46|   11|2018|        3|
|1542315419796|  20| 15|  46|   11|2018|        3|
|1542319396796|  22| 15|  46|   11|2018|        3|
|1542784903796|   7| 21|  47|   11|2018|        2|
|1542785123796|   7| 21|  47|   11|2018|        2|
+-------------+----+---+----+-----+----+---------+
only showing top 5 rows



In [47]:
time_table.write.parquet('./time_table_partition_test_dir1/')