In [1]:
import configparser
from datetime import datetime
import os
import glob
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, broadcast
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek

In [2]:
from etl import create_spark_session, process_song_data, process_log_data

In [3]:
spark = create_spark_session()

In [4]:
input_data = '../udacity_data_modeling_postgres/data/'
output_data = '../udacity_data_modeling_postgres/data/output/'

In [5]:
# process song and log data
process_song_data(spark, input_data, output_data)
process_log_data(spark, input_data, output_data)

In [6]:
# Dimmension table: users
users_table = spark.read.parquet(output_data + "/users.parquet")
print(users_table.printSchema())
users_table.limit(5).toPandas()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

None


Unnamed: 0,user_id,first_name,last_name,gender,level
0,26,Ryan,Smith,M,free
1,16,Rylan,George,M,paid
2,54,Kaleb,Cook,M,free
3,80,Tegan,Levine,F,paid
4,30,Avery,Watkins,F,paid


In [7]:
# Dimmension table: songs
songs_table = spark.read.parquet(output_data + "/songs.parquet")
print(songs_table.printSchema())
songs_table.limit(5).toPandas()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- artist_id: string (nullable = true)

None


Unnamed: 0,song_id,title,artist_name,duration,year,artist_id
0,SOYMRWW12A6D4FAB14,The Moon And I (Ordinary Day Album Version),Jeff And Sheri Easter,267.7024,0,ARKFYS91187B98E58F
1,SOUDSGM12AC9618304,Insatiable (Instrumental Version),Clp,266.39628,0,ARNTLGG11E2835DDB9
2,SOHKNRJ12A6701D1F8,Drop of Rain,Tweeterfriendly Music,189.57016,0,AR10USD1187B99F3F1
3,SOXVLOJ12AB0189215,Amor De Cabaret,Sonora Santanera,177.47546,0,ARKRRTF1187B9984DA
4,SOMJBYD12A6D4F8557,Keepin It Real (Skit),Rated R,114.78159,0,ARD0S291187B9B7BF5


In [8]:
# Dimmension table: artists
artists_table = spark.read.parquet(output_data + "/artists.parquet")
print(artists_table.printSchema())
artists_table.limit(5).toPandas()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

None


Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARKFYS91187B98E58F,Jeff And Sheri Easter,,,
1,AR10USD1187B99F3F1,Tweeterfriendly Music,"Burlington, Ontario, Canada",,
2,ARGSJW91187B9B1D6B,JennyAnyKind,North Carolina,35.21962,-80.01955
3,ARMJAGH1187FB546F3,The Box Tops,"Memphis, TN",35.14968,-90.04892
4,ARD7TVE1187B99BFB1,Casual,California - LA,,


In [9]:
# Dimmension table: time
time_table = spark.read.parquet(output_data + "/time.parquet")
print(time_table.printSchema())
time_table.limit(5).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

None


  series = series.astype(t, copy=False)


Unnamed: 0,start_time,hour,day,week,weekday,year,month
0,2018-11-14 16:30:26.796,16,14,46,4,2018,11
1,2018-11-15 04:42:06.796,4,15,46,5,2018,11
2,2018-11-15 06:17:10.796,6,15,46,5,2018,11
3,2018-11-15 09:03:56.796,9,15,46,5,2018,11
4,2018-11-20 17:53:49.796,17,20,47,3,2018,11


In [10]:
# Fact table: songplays_table
songplays_table = spark.read.parquet(output_data + "/songplays.parquet")
print(songplays_table.printSchema())
songplays_table.limit(5).toPandas()

root
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

None


  series = series.astype(t, copy=False)


Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,2018-11-14 16:30:26.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
1,2018-11-14 16:41:21.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
2,2018-11-14 16:45:41.796,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
3,2018-11-14 19:44:09.796,61,free,,,597,"Houston-The Woodlands-Sugar Land, TX","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11
4,2018-11-14 21:48:55.796,80,paid,,,602,"Portland-South Portland, ME","""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2018,11


In [11]:
# stop spark session
spark.stop()