In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

print('------ Imports Done ------')

------ Imports Done ------


In [2]:
config = configparser.ConfigParser()
config.read('data_lake.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_KEY']

logs_bucket = config['S3']['LOG_DATA']
songs_bucket = config['S3']['SONG_DATA']
output_bucket = config['S3']['OUTPUT_DATA']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

In [4]:
spark

## Start with the songs files

In [5]:
songs = spark.read.json(f'{songs_bucket}/A/A/A/*.json')
songs.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|301.40036|        1|SOAFBCP12A8C13CC7D|King Of Scurf (20...|1972|
|ARA23XO1187B9AF18F|       40.57885|Carteret, New Jersey|       -74.21956|     The Smithereens|  192.522|        1|SOKTJDS12AF72A25E5|Drown In My Own T...|   0|
|ARSVTNL1187B992A91|       51.50632|     London, England|        -0.12714|       Jonathan King|129.85424|        1|SOEKAZG12AB018837E|I'll Slap Your Fa...|2001|
|AR73AIO1187B9AD57B|       37.7791

In [6]:
songs.createOrReplaceTempView('tmp_songs')

In [7]:
songs_table = spark.sql(
    '''
    SELECT DISTINCT song_id, title, artist_id, year, duration
    FROM tmp_songs
    WHERE song_id IS NOT NULL
    ''')
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOHOZBI12A8C132E3C|         Smash It Up|AR0MWD61187B9B2B12|2000|195.39546|
|SOXZYWX12A6310ED0C|     It's About Time|ARC1IHZ1187FB4E920|   0| 246.9873|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOIGICF12A8C141BC5|        Game & Watch|AREWD471187FB49873|2004|580.54485|
|SOAPERH12A58A787DC|The One And Only ...|ARZ5H0P1187B98A1DD|   0|230.42567|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [8]:
artist_table = spark.sql(
    '''
    SELECT DISTINCT artist_id, artist_name AS name, artist_location AS location,
                    artist_latitude AS latitude, artist_longitude AS longitude
    FROM tmp_songs
    WHERE artist_id IS NOT NULL
    ''')
artist_table.show(5)

+------------------+-----------------+-----------+--------+---------+
|         artist_id|             name|   location|latitude|longitude|
+------------------+-----------------+-----------+--------+---------+
|ARJNIUY12298900C91|     Adelitas Way|           |    null|     null|
|AR5LMPY1187FB573FE|Chaka Khan_ Rufus|Chicago, IL|41.88415|-87.63241|
|AR1C2IX1187B99BF74|  Broken Spindles|           |    null|     null|
|ARC1IHZ1187FB4E920|     Jamie Cullum|           |    null|     null|
|ARKYKXP11F50C47A6A| The Supersuckers|           |    null|     null|
+------------------+-----------------+-----------+--------+---------+
only showing top 5 rows



# Now with the events files

In [9]:
events = spark.read.json(f'{logs_bucket}/*/*/*.json')
events = events.withColumn("start_time",
        (col("ts")/1000).cast("timestamp"))
events.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          start_time|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|50841-09-12 03:26...|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| fr

In [10]:
events.createOrReplaceTempView('tmp_events')

In [11]:
time_table = spark.sql(
    '''
    SELECT DISTINCT start_time,
                    EXTRACT(hour from start_time) as hour,
                    EXTRACT(day from start_time) as day,
                    EXTRACT(week from start_time) as week,
                    EXTRACT(month from start_time) as month,
                    EXTRACT(year from start_time) as year,
                    DAYOFWEEK(start_time) AS weekday
    FROM tmp_events
    WHERE ts IS NOT NULL
    ''')
time_table.show(5)

+--------------------+----+---+----+-----+-----+-------+
|          start_time|hour|day|week|month| year|weekday|
+--------------------+----+---+----+-----+-----+-------+
|50842-12-03 20:43...|  20|  3|  49|   12|50842|      4|
|50843-07-21 13:46...|  13| 21|  30|    7|50843|      3|
|50844-03-05 18:26...|  18|  5|   9|    3|50844|      7|
|50859-03-07 10:56...|  10|  7|  10|    3|50859|      6|
|50860-05-26 18:19...|  18| 26|  22|    5|50860|      4|
+--------------------+----+---+----+-----+-----+-------+
only showing top 5 rows



In [12]:
users_table = spark.sql('''
    SELECT DISTINCT userId as user_id, firstName as first_name, lastName as last_name, gender, level
    FROM tmp_events
    WHERE userId IS NOT NULL
    ''')
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [13]:
songplays_table = spark.sql(
    '''
    SELECT e.start_time, e.userId AS user_id, e.level, e.sessionId AS session_id, e.location,
           e.userAgent AS user_agent, s.song_id, s.artist_id
    FROM tmp_events e
    JOIN tmp_songs s ON e.artist = s.artist_name
    AND e.song = s.title
    WHERE e.page = 'NextSong'
    ''')

+----------+-------+-----+----------+--------+----------+-------+---------+
|start_time|user_id|level|session_id|location|user_agent|song_id|artist_id|
+----------+-------+-----+----------+--------+----------+-------+---------+
+----------+-------+-----+----------+--------+----------+-------+---------+

