In [1]:
# import relevant packages
import pyspark 
from pyspark import SparkConf
from pyspark.sql import SparkSession 

In [2]:
# initiate spark
try:
    spark = SparkSession \
        .builder \
        .appName("Datalake_Dev") \
        .getOrCreate() 
    print("spark initiated") 
except Exception as e:
    print("spark failed")
    print(e) 

spark initiated


In [3]:
# collect the context details of this spark session 
spark.sparkContext.getConf().getAll() 

[('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1622810164399'),
 ('spark.app.name', 'Datalake_Dev'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '40165'),
 ('spark.driver.host', '8fd6a23a0323'),
 ('spark.ui.showConsoleProgress', 'true')]

In [4]:
# check spark session details 
spark

In [5]:
# unzip log files & song files
import zipfile

In [6]:
with zipfile.ZipFile("data/log-data.zip", 'r') as zipref:
    zipref.extractall("data/logs") 

In [7]:
with zipfile.ZipFile("data/song-data.zip", 'r') as zipref2:
    zipref2.extractall("data/songs") 

### Read test JSON files

Use the \data folder containing test file to run some example work

In [8]:
# set the path to all the files, use multiline option = true, read in all files
test_path_log = "data/logs/*.json" 
user_log = spark.read.option("multiline","true").json(test_path_log) 

In [9]:
# check schema 
user_log.printSchema() 

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [10]:
# check first line of dataframe 
user_log.show(n=1) 

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

#### Import some useful stuff

These may be needed according to etl.py

In [11]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType 

##### Step 1 - Create "time" table from logs data TS

In [12]:
# first - we need to create a timestamp, through a UDF, from the 'ts' column in our dataframe 
get_timestamp = udf(lambda x: int(int(x)//1000)) 
user_log = user_log.withColumn("timestamp", get_timestamp(user_log.ts).cast(IntegerType()))  
user_log.show(n=1) 

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId| timestamp|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+-----

In [13]:
# look at new schema again 
user_log.printSchema()  

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: integer (nullable = true)



In [14]:
# Second- create UDF that collects a datetime column from the new timestamp column 
get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x)/1000.0)))  

In [15]:
user_log = user_log.withColumn('datetime', get_datetime(user_log.ts)) 
user_log.show(n=1) 

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+--------------------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId| timestamp|            datetime|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+----------+--------------------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|1542241826|2018-11-15 00:30:...|
+--------+---------+---------+------+-------------+-

##### Now create the time table

In [16]:
# place into Temp View
user_log.createOrReplaceTempView("user_log_table") 

In [17]:
time_query = """
SELECT
    a.start_time,
    hour(a.start_time) as hour,
    dayofmonth(a.start_time) as day,
    weekofyear(a.start_time) as week,
    month(a.start_time) as month,
    year(a.start_time) as year,
    dayofweek(a.start_time) as weekday
    
FROM
    (SELECT DISTINCT 
        t1.datetime as start_time
            FROM user_log_table as t1
    ) as a 
ORDER BY a.start_time 
"""

In [18]:
# run query & show results 
time_table = spark.sql(time_query)
time_table.show(n=3) 

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 20:57:...|  20|  1|  44|   11|2018|      5|
|2018-11-02 01:25:...|   1|  2|  44|   11|2018|      6|
|2018-11-03 01:04:...|   1|  3|  44|   11|2018|      7|
+--------------------+----+---+----+-----+----+-------+
only showing top 3 rows



### Now lets focus on `Users` table

we need:
- user_id
- first_name
- last_name
- gender
- level 

In [19]:
user_query = """
SELECT DISTINCT
    a.userID as user_id,
    a.firstName as first_name,
    a.lastName as last_name,
    a.gender,
    a.level
    
FROM user_log_table as a 
WHERE 
    a.userID IS NOT NULL 
AND a.ts = (
            SELECT MAX(b.ts) FROM user_log_table as b WHERE b.userID = a.userID
            )  
"""

In [20]:
# run query & show results 
user_table = spark.sql(user_query)
user_table.show(n=3) 

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     37|    Jordan|    Hicks|     F| free|
|     15|      Lily|     Koch|     F| paid|
|     95|      Sara|  Johnson|     F| paid|
+-------+----------+---------+------+-----+
only showing top 3 rows



---------------------

#### Read in songplay JSON data

In [21]:
import os 

use the os.walk to go through each sub file and read all JSON files found

In [22]:
path = "data/songs/song_data/"
songs_list = [] 
for root, directories, files in os.walk(path):
    for file in files:
        filepath = os.path.join(root, file) 
        if filepath.endswith('.json'):
            songs_list.append(filepath) 

In [23]:
#print(songs_list) 

In [24]:
# set the path to all the files, use multiline option = true, read in all files
songs = spark.read.option("multiline","true").json(songs_list)  

In [25]:
songs.printSchema() 

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



#### Create `Artists` table from the songplays data

we need:

- artist_id
- name
- location 
- latitude
- longitude 

In [26]:
# place into Temp View
songs.createOrReplaceTempView("songs_table") 

In [27]:
artists_query = """
SELECT DISTINCT 
    a.artist_id,
    a.artist_name as name,
    a.artist_location as location,
    a.artist_latitude as latitude,
    a.artist_longitude as longitude
    
FROM songs_table as a 
WHERE a.artist_id IS NOT NULL
AND a.year = (SELECT MAX (b.year) FROM songs_table as b WHERE b.artist_id = a.artist_id) 
"""

In [28]:
# run query & show results 
artists_table = spark.sql(artists_query)
artists_table.show(n=3) 

+------------------+------------+---------------+--------+---------+
|         artist_id|        name|       location|latitude|longitude|
+------------------+------------+---------------+--------+---------+
|ARPBNLO1187FB3D52F|    Tiny Tim|   New York, NY|40.71455|-74.00712|
|ARBEBBY1187B9B43DB|   Tom Petty|Gainesville, FL|    null|     null|
|AR0IAWL1187B9A96D0|Danilo Perez|         Panama|  8.4177|-80.11278|
+------------------+------------+---------------+--------+---------+
only showing top 3 rows



#### Create `songs` dimension table

we need:
- song_id
- title
- artist_id
- year
- duration 

In [29]:
songs_query = """
SELECT DISTINCT 
    song_id,
    title,
    artist_id,
    year,
    duration
FROM songs_table
"""

In [30]:
# run query & show results 
songs_table = spark.sql(songs_query)
songs_table.show(n=3) 

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOGNCJP12A58A80271|Do You Finally Ne...|ARB29H41187B98F0EF|1972|342.56934|
|SOOJPRH12A8C141995|   Loaded Like A Gun|ARBGXIG122988F409D|   0|173.19138|
|SOFCHDR12AB01866EF|         Living Hell|AREVWGE1187B9B890A|   0|282.43546|
+------------------+--------------------+------------------+----+---------+
only showing top 3 rows



---------------

#### Create `songplays` fact table

In [31]:
#user_log.printSchema() 

In [32]:
#songs.printSchema() 

In [45]:
songplays_query = """
SELECT 
    row_number() over (order by a.datetime) as songplay_id,
    a.datetime as start_time, 
    month(a.datetime) as month,
    year(a.datetime) as year,
    a.userId as user_id, 
    a.level,
    b.song_id,
    b.artist_id, 
    a.sessionId as session_id,
    a.location,
    a.userAgent as user_agent
FROM user_log_table as a 
LEFT JOIN songs_table as b 
ON a.song = b.title 
WHERE a.page = 'NextSong' 
"""

In [46]:
# run query & show results 
songplays_table = spark.sql(songplays_query)
songplays_table.show(n=3) 

+-----------+--------------------+-----+----+-------+-----+-------+---------+----------+--------------------+--------------------+
|songplay_id|          start_time|month|year|user_id|level|song_id|artist_id|session_id|            location|          user_agent|
+-----------+--------------------+-----+----+-------+-----+-------+---------+----------+--------------------+--------------------+
|          1|2018-11-02 01:25:...|   11|2018|    101| free|   null|     null|       184|New Orleans-Metai...|"Mozilla/5.0 (Win...|
|          2|2018-11-05 00:33:...|   11|2018|     69| free|   null|     null|       256|Philadelphia-Camd...|"Mozilla/5.0 (Mac...|
|          3|2018-11-07 00:01:...|   11|2018|     97| paid|   null|     null|       293|Lansing-East Lans...|"Mozilla/5.0 (X11...|
+-----------+--------------------+-----+----+-------+-----+-------+---------+----------+--------------------+--------------------+
only showing top 3 rows



#### Writing tables to partitioned parquet files

In [35]:
output_path = "data/Tables/"

In [37]:
time_table.write.partitionBy('year','month').parquet(os.path.join(output_path, 'time'),'overwrite') 

In [38]:
songs_table.write.partitionBy('year','artist_id').parquet(os.path.join(output_path, 'songs'),'overwrite') 

In [39]:
artists_table.write.parquet(os.path.join(output_path, 'artists'),'overwrite') 

In [41]:
user_table.write.parquet(os.path.join(output_path, 'users'),'overwrite') 

In [47]:
songplays_table.write.partitionBy('year','month').parquet(os.path.join(output_path, 'songplays'),'overwrite') 

In [48]:
# close spark connection 
spark.stop() 