In [13]:
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofmonth, hour, \
    dayofweek, month, year, weekofyear

In [3]:
conf_parser = configparser.ConfigParser()
with open('etl_config.cfg', 'r') as config_file:
    conf_parser.read_file(config_file)

aws_access = conf_parser['AWS']['AWS_ACCESS_KEY']
aws_secret = conf_parser['AWS']['AWS_SECRET_KEY']

In [14]:
spark = SparkSession.builder.config("spark.jars.packages",
                                    "org.apache.hadoop:hadoop-aws:2.7.0")\
    .getOrCreate()

In [15]:
# Load data from json files
log_data = spark.read.json(path='log-data/*')
song_data = spark.read.json(path='song_data/*/*/*/*.json')

In [4]:
song_data.show(n=5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [5]:
log_data.show(n=5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [6]:
song_data.createOrReplaceTempView('song_data')
log_data.createOrReplaceTempView('log_data')

In [19]:
# Extract users table data and store in parquet format

users_extract = """
SELECT DISTINCT userId AS user_id,
    firstName AS first_name,
    lastName AS last_name,
    gender,
    level
FROM log_data
"""
users_table_data = spark.sql(users_extract)
users_table_data.write.parquet('Transformed_data/users.parquet')


+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     98|    Jordyn|   Powell|     F| free|
|     34|    Evelin|    Ayala|     F| free|
|     85|   Kinsley|    Young|     F| paid|
|     38|    Gianna|    Jones|     F| free|
|     85|   Kinsley|    Young|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [32]:
# Extract songs table data and store it in parquet format

songs_extract = """
SELECT DISTINCT song_id,
    title,
    artist_id,
    year,
    duration
FROM song_data
"""
songs_tbl_data = spark.sql(songs_extract)
songs_tbl_data.write.parquet("Transformed_data/songs.parquet")

In [24]:
# Extract artists table data and store it in parquet format

artists_extract = """
SELECT DISTINCT artist_id,
    artist_name AS name,
    artist_location AS location,
    artist_latitude AS lattitude,
    artist_longitude AS longitude
FROM song_data
"""
artists_tbl_data = spark.sql(artists_extract)
artists_tbl_data.write.parquet("Transformed_data/artists.parquet")

In [20]:
# Extract time table data and store it in parquet format

time_extract = """
SELECT cast(t1.timestamp_temp AS timestamp) AS start_time,
    hour(t1.timestamp_temp) AS hour,
    dayofmonth(t1.timestamp_temp) AS day,
    weekofyear(t1.timestamp_temp) AS week,
    month(t1.timestamp_temp) AS month,
    year(t1.timestamp_temp) AS year,
    CASE WHEN dayofweek(t1.timestamp_temp) IN (6, 7) THEN True ELSE False END
     AS 
    weekday
FROM 
    (SELECT from_unixtime(ts/1000, 'YYYY-MM-dd hh:mm:ss') AS timestamp_temp 
    FROM log_data) t1
"""
time_tbl_data = spark.sql(time_extract)
time_tbl_data.show(5)
time_tbl_data.printSchema()
# time_tbl_data.write.parquet("Transformed_data/time.parquet")


+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-14 07:30:26|   7| 14|  46|   11|2018|  false|
|2018-11-14 07:41:21|   7| 14|  46|   11|2018|  false|
|2018-11-14 07:45:41|   7| 14|  46|   11|2018|  false|
|2018-11-14 08:57:51|   8| 14|  46|   11|2018|  false|
|2018-11-14 10:29:37|  10| 14|  46|   11|2018|  false|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: boolean (nullable = false)



In [21]:
time_extract = """
SELECT t1.timestamp_temp AS start_time,
    hour(t1.timestamp_temp) AS hour,
    dayofmonth(t1.timestamp_temp) AS day,
    weekofyear(t1.timestamp_temp) AS week,
    month(t1.timestamp_temp) AS month,
    year(t1.timestamp_temp) AS year,
    CASE WHEN dayofweek(t1.timestamp_temp) IN (6, 7) THEN True ELSE False END
     AS 
    weekday
FROM 
    (SELECT from_unixtime(ts/1000, 'YYYY-MM-dd hh:mm:ss') AS timestamp_temp 
    FROM log_data) t1
"""
time_tbl_data = spark.sql(time_extract)
time_tbl_data.show(5)
time_tbl_data.printSchema()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-14 07:30:26|   7| 14|  46|   11|2018|  false|
|2018-11-14 07:41:21|   7| 14|  46|   11|2018|  false|
|2018-11-14 07:45:41|   7| 14|  46|   11|2018|  false|
|2018-11-14 08:57:51|   8| 14|  46|   11|2018|  false|
|2018-11-14 10:29:37|  10| 14|  46|   11|2018|  false|
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: boolean (nullable = false)



In [22]:
# Extract songsplay table data and store it in parquet format
songsplay_extract = """
SELECT CAST(log_temp.start_time AS TIMESTAMP) AS start_time,
    log_temp.user_id,
    log_temp.level,
    song_temp.song_id,
    song_temp.artist_id,
    log_temp.session_id,
    log_temp.location,
    log_temp.user_agent
FROM (SELECT from_unixtime(ts/1000, 'YYYY-MM-dd hh:mm:ss') AS start_time,
        userId AS user_id,
        level,
        song,
        artist,
        location,
        sessionId AS session_id,
        userAgent AS user_agent
    FROM log_data) log_temp
JOIN
    (SELECT song_id,
        artist_id,
        artist_name,
        title
    FROM song_data) song_temp
ON log_temp.song = song_temp.title
AND log_temp.artist = song_temp.artist_name
"""
songsplay_tbl = spark.sql(songsplay_extract)
songsplay_tbl.show(5)
songsplay_tbl.printSchema()
# songsplay_tbl.write.parquet("Transformed_data/songsplay.parquet")



+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|2018-11-21 04:56:47|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)

