In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from zipfile import ZipFile

### Read config file

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

['dl.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

### Instantiate a Spark session

In [4]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()

### Extract log and song data files

In [5]:
with ZipFile(config['LOCAL']['INPUT_DATA'] + "song-data.zip", 'r') as song_zip:
    song_zip.extractall(config['LOCAL']['INPUT_DATA'] + "extract/")

In [6]:
with ZipFile(config['LOCAL']['INPUT_DATA'] + "log-data.zip", 'r') as log_zip:
    log_zip.extractall(config['LOCAL']['INPUT_DATA'] + "extract/log_data/")

### Load log and song data into data frames

In [7]:
song_data = config['LOCAL']['INPUT_DATA'] + "extract/song_data/*/*/*/*.json"

In [8]:
log_data = config['LOCAL']['INPUT_DATA'] + "extract/log_data/"

In [9]:
song_df = spark.read.json(song_data)
song_df.createOrReplaceTempView("song")

In [10]:
song_df.printSchema()
song_df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|5

### Transform timestamp column in log data frame and filter data

In [11]:
from pyspark.sql.types import StringType
get_datetime = udf(lambda ts: datetime.fromtimestamp(float(ts)/1000.).strftime('%Y-%m-%d %H:%M:%S'), StringType())

In [12]:
log_df = spark.read.json(log_data)
log_df = log_df.withColumn('start_time', get_datetime('ts')).filter("page='NextSong'")
log_df.createOrReplaceTempView("log")

In [13]:
log_df.printSchema()
log_df.show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+-------------------+
|     artist|     auth|fir

### Write songplay table partitioned by year and artist

In [14]:
songplay_table = spark.sql("""
    select
        row_number() over (partition by l.ts order by l.userId) as songplay_id,
        l.start_time,
        l.userId as user_id,
        l.level,
        s.song_id,
        s.artist_id,
        l.sessionId as session_id,
        l.location,
        l.userAgent as user_agent
    from
        song s
    inner join log l on
        s.artist_name=l.artist and
        s.title=l.song
""")

songplay_table.show(10)

try:
    songplay_table.withColumn('year', year(col('start_time'))).write.partitionBy('year', 'artist_id').parquet(config['LOCAL']['OUTPUT_DATA'] + "songplays.parquet")
except Exception as e:
    print("Error:", e)

+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|songplay_id|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|          1|2018-11-21 21:56:47|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-----------+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



### Write user table

In [15]:
user_table = spark.sql("""
    select distinct
        userId as user_id,
        firstName as first_name,
        lastName as last_name,
        gender,
        level
    from
        log
""")

user_table.show(10)

try:
    user_table.write.parquet(config['LOCAL']['OUTPUT_DATA'] + "users.parquet")
except Exception as e:
    print("Error:", e)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
|     63|      Ayla|  Johnson|     F| free|
|     37|    Jordan|    Hicks|     F| free|
|      6|   Cecilia|    Owens|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     27|    Carlos|   Carter|     M| free|
+-------+----------+---------+------+-----+
only showing top 10 rows



### Write song table partitioned by year and artist

In [16]:
song_table = spark.sql("""
    select distinct
        song_id,
        title,
        artist_id,
        year,
        duration
    from
        song
""")

song_table.show(10)

try:
    song_table.write.partitionBy('year', 'artist_id').parquet(config['LOCAL']['OUTPUT_DATA'] + "songs.parquet")
except Exception as e:
    print("Error:", e)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
|SOWTBJW12AC468AC6E|Broken-Down Merry...|ARQGYP71187FB44566|   0|151.84934|
|SOGOSOV12AF72A285E|   ¿Dónde va Chichi?|ARGUVEV1187B98BA17|1997|313.12934|
|SOTUKVB12AB0181477|   Blessed Assurance|AR7ZKHQ1187B98DD73|1993|  270.602|
|SOMVWWT12A58A7AE05|Knocked Out Of Th...|ARQ9BO41187FB5CF1F|   0|183.17016|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOILPQQ12AB017E82A|Sohna Nee Sohna Data|AR1ZHYZ1187FB3C717|   0|599.24853|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
+-----------

### Write artist table

In [17]:
artist_table = spark.sql("""
    select distinct
        artist_id,
        artist_name,
        artist_location,
        artist_latitude,
        artist_longitude
    from
        song
""")

artist_table.show(10)

try:
    artist_table.write.parquet(config['LOCAL']['OUTPUT_DATA'] + "artists.parquet")
except Exception as e:
    print("Error:", e)

+------------------+----------------+---------------+---------------+----------------+
|         artist_id|     artist_name|artist_location|artist_latitude|artist_longitude|
+------------------+----------------+---------------+---------------+----------------+
|ARPBNLO1187FB3D52F|        Tiny Tim|   New York, NY|       40.71455|       -74.00712|
|ARBEBBY1187B9B43DB|       Tom Petty|Gainesville, FL|           null|            null|
|AR0IAWL1187B9A96D0|    Danilo Perez|         Panama|         8.4177|       -80.11278|
|ARMBR4Y1187B9990EB|    David Martin|California - SF|       37.77916|      -122.42005|
|ARD0S291187B9B7BF5|         Rated R|           Ohio|           null|            null|
|AR0RCMP1187FB3F427|Billie Jo Spears|   Beaumont, TX|       30.08615|       -94.10158|
|ARKRRTF1187B9984DA|Sonora Santanera|               |           null|            null|
|ARHHO3O1187B989413|       Bob Azzam|               |           null|            null|
|ARJIE2Y1187B994AB7|     Line Renaud|      

### Write time table partitioned by year and month

In [18]:
time_table = spark.sql("""
    select distinct
        start_time,
        hour(start_time) as hour,
        day(start_time) as day,
        weekofyear(start_time) as week,
        month(start_time) as month,
        year(start_time) as year,
        weekday(start_time) as weekday
    from
        log
""")

time_table.show(10)

try:
    time_table.write.partitionBy('year', 'month').parquet(config['LOCAL']['OUTPUT_DATA'] + "times.parquet")
except Exception as e:
    print("Error:", e)

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 07:28:47|   7| 15|  46|   11|2018|      3|
|2018-11-15 16:49:50|  16| 15|  46|   11|2018|      3|
|2018-11-15 17:46:01|  17| 15|  46|   11|2018|      3|
|2018-11-21 07:53:00|   7| 21|  47|   11|2018|      2|
|2018-11-14 18:35:36|  18| 14|  46|   11|2018|      2|
|2018-11-28 11:51:12|  11| 28|  48|   11|2018|      2|
|2018-11-28 12:14:27|  12| 28|  48|   11|2018|      2|
|2018-11-28 19:00:08|  19| 28|  48|   11|2018|      2|
|2018-11-28 19:30:33|  19| 28|  48|   11|2018|      2|
|2018-11-05 01:50:53|   1|  5|  45|   11|2018|      0|
+-------------------+----+---+----+-----+----+-------+
only showing top 10 rows



In [19]:
#import shutil
#shutil.rmtree("spark-warehouse/")
#shutil.rmtree("data/extract/")