In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek
from pyspark.sql.functions import to_timestamp, date_format
from pyspark.sql import types as t


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']


In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
input_data = 's3a://datalbc/input/'
output_data = 's3a://datalbc/output/'
#output_data = '/home/workspace/output/'

In [4]:
#song_data = input_data + 'song-data/A/A/A/*.json'
song_data = input_data + 'song-data/*/*/*/*.json'
# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = df.select('song_id','title','artist_id','year','duration')

# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet(output_data + 'songs', mode='overwrite')

# extract columns to create artists table
artists_table = df.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

# write artists table to parquet files
artists_table.write.parquet(output_data + 'artists', mode='overwrite')

In [5]:
spark.read.parquet(output_data + 'songs').show(5, truncate=False)

+------------------+----------------------------------------------------+------------------+----+---------+
|song_id           |title                                               |artist_id         |year|duration |
+------------------+----------------------------------------------------+------------------+----+---------+
|SOBAYLL12A8C138AF9|Sono andati? Fingevo di dormire                     |ARDR4AC1187FB371A1|0   |511.16363|
|SOOLYAZ12A6701F4A6|Laws Patrolling (Album Version)                     |AREBBGV1187FB523D2|0   |173.66159|
|SOBBUGU12A8C13E95D|Setting Fire to Sleeping Giants                     |ARMAC4T1187FB3FA4C|2004|207.77751|
|SOAOIBZ12AB01815BE|I Hold Your Hand In Mine [Live At Royal Albert Hall]|ARPBNLO1187FB3D52F|2000|43.36281 |
|SONYPOM12A8C13B2D7|I Think My Wife Is Running Around On Me (Taco Hell) |ARDNS031187B9924F0|2005|186.48771|
+------------------+----------------------------------------------------+------------------+----+---------+
only showing top 5 rows



In [6]:
spark.read.parquet(output_data + 'artists').show(5)

+------------------+--------------------+--------------------+---------------+----------------+
|         artist_id|         artist_name|     artist_location|artist_latitude|artist_longitude|
+------------------+--------------------+--------------------+---------------+----------------+
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|       40.79086|       -73.96644|
|AROUOZZ1187B9ABE51|         Willie Bobo|New York, NY [Spa...|       40.79195|       -73.94512|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|           null|            null|
|ARD842G1187B997376|          Blue Rodeo|Toronto, Ontario,...|       43.64856|       -79.38533|
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |           null|            null|
+------------------+--------------------+--------------------+---------------+----------------+
only showing top 5 rows



In [7]:
log_data = input_data + 'log-data/*.json'

# read log data file
df = spark.read.json(log_data)


In [8]:
# extract columns for users table    
users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').dropDuplicates()
users_table.show()

+------+---------+---------+------+-----+
|userId|firstName| lastName|gender|level|
+------+---------+---------+------+-----+
|    57|Katherine|      Gay|     F| free|
|    84|  Shakira|     Hunt|     F| free|
|    22|     Sean|   Wilson|     F| free|
|    52| Theodore|    Smith|     M| free|
|    80|    Tegan|   Levine|     F| paid|
|    15|     Lily|     Koch|     F| paid|
|    37|   Jordan|    Hicks|     F| free|
|    98|   Jordyn|   Powell|     F| free|
|    48|   Marina|   Sutton|     F| free|
|    17| Makinley|    Jones|     F| free|
|    45| Dominick|   Norris|     M| free|
|    43|   Jahiem|    Miles|     M| free|
|    21|  Preston|  Sanders|     M| free|
|     3|    Isaac|   Valdez|     M| free|
|    62|   Connar|   Moreno|     M| free|
|     5|   Elijah|    Davis|     M| free|
|    44|   Aleena|    Kirby|     F| paid|
|    50|      Ava| Robinson|     F| free|
|    68|   Jordan|Rodriguez|     F| free|
|    56|   Cienna|  Freeman|     F| free|
+------+---------+---------+------

In [9]:
# write users table to parquet files
users_table.write.parquet(output_data + 'users', mode='overwrite')

In [10]:
spark.read.parquet(output_data + 'users').show(5)

+------+---------+---------+------+-----+
|userId|firstName| lastName|gender|level|
+------+---------+---------+------+-----+
|    88| Mohammad|Rodriguez|     M| free|
|    88| Mohammad|Rodriguez|     M| paid|
|      |     null|     null|  null| free|
|    11|Christian|   Porter|     F| free|
|    75|   Joseph|Gutierrez|     M| free|
+------+---------+---------+------+-----+
only showing top 5 rows



In [11]:
# filter by actions for song plays
df = df.filter(df.page == 'NextSong')
df = df.withColumn('timestamp', (df.ts/1000).cast(dataType=t.TimestampType()))

In [12]:
time_table = df.select(col('timestamp').alias('start_time'),
                           hour('timestamp').alias('hour'), 
                           dayofmonth('timestamp').alias('day'),
                           weekofyear('timestamp').alias('week'),
                           month('timestamp').alias('month'),
                           year('timestamp').alias('year'),
                           dayofweek('timestamp').alias('weekday'))

In [13]:
time_table.show(5, truncate=False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:26.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 00:41:21.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 00:45:41.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 03:44:09.796|3   |15 |46  |11   |2018|5      |
|2018-11-15 05:48:55.796|5   |15 |46  |11   |2018|5      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [14]:
time_table.write.partitionBy('year', 'month').parquet(output_data + 'time', mode='overwrite')

In [15]:
spark.read.parquet(output_data + 'time').show(5, truncate=False)

+-----------------------+----+---+----+-----+----+-------+
|start_time             |hour|day|week|month|year|weekday|
+-----------------------+----+---+----+-----+----+-------+
|2018-11-15 00:30:26.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 00:41:21.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 00:45:41.796|0   |15 |46  |11   |2018|5      |
|2018-11-15 03:44:09.796|3   |15 |46  |11   |2018|5      |
|2018-11-15 05:48:55.796|5   |15 |46  |11   |2018|5      |
+-----------------------+----+---+----+-----+----+-------+
only showing top 5 rows



In [16]:
df_song = spark.read.json(song_data)

In [17]:
df_join = df.join(df_song, (df.song==df_song.title) & (df.artist==df_song.artist_name) & (df.length==df_song.duration), 'left')

In [18]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df_join.select('timestamp', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent')

In [19]:
songplays_table.write.partitionBy('year', 'month').parquet(output_data + 'songplays', mode='overwrite')

In [20]:
spark.read.parquet(output_data + 'songplays').show(5)

+--------------------+------+-----+-------+---------+---------+--------------------+--------------------+
|           timestamp|userId|level|song_id|artist_id|sessionId|            location|           userAgent|
+--------------------+------+-----+-------+---------+---------+--------------------+--------------------+
|2018-11-15 00:30:...|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-15 00:41:...|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-15 00:45:...|    26| free|   null|     null|      583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|
|2018-11-15 03:44:...|    61| free|   null|     null|      597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|
|2018-11-15 05:48:...|    80| paid|   null|     null|      602|Portland-South Po...|"Mozilla/5.0 (Mac...|
+--------------------+------+-----+-------+---------+---------+--------------------+--------------------+
only showing top 5 rows

