# Data-Lake-AWS

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col

## Load Config to environment and variables

In [34]:
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

KEY = os.environ['AWS_ACCESS_KEY_ID']
SECRET = os.environ['AWS_SECRET_ACCESS_KEY']

SONG_DATA_LOCAL = config['LOCAL']['INPUT_DATA_SD_LOCAL']
LOG_DATA_LOCAL = config['LOCAL']['INPUT_DATA_LD_LOCAL']
OUTPUT_DATA_LOCAL = config['LOCAL']['OUTPUT_DATA_LOCAL']

In [3]:
print(SONG_DATA_LOCAL)

data/song_data/*/*/*/*.json


## Create spark session with hadoop-aws package

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/anaconda3/envs/data-lake-aws/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/cian.young/.ivy2/cache
The jars for the packages stored in: /Users/cian.young/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ece58137-d076-4f45-8b9c-6ee3ef3003be;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found commons-cli#commons-cli;1.2 in central
	found org.apache.commons#commons-math3;3.1.1 in central
	found xmlenc#xmlenc;0.52 in central
	found commons-httpclient#commons-httpclient;3.1 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.4 in central
	found commons-io#commons-io;2.4 in central
	found commons-net#commons-net;3.1 in central
	found commons-collections#commo

23/02/26 14:29:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/26 14:29:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f7ab9e2fd60>


## Load song_data (from JSON to Spark)

In [7]:
#Read local song data
song_data_path = SONG_DATA_LOCAL

df_sd = spark.read.json(song_data_path)

In [8]:
df_sd.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [9]:
df_sd.show(5)

+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|  artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+-----------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                 |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|      Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.71455|     New York, N

In [10]:
df_sd.show(5, truncate=False)

+------------------+---------------+-----------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+----------------------------------------------------+----+
|artist_id         |artist_latitude|artist_location  |artist_longitude|artist_name                                                                                   |duration |num_songs|song_id           |title                                               |year|
+------------------+---------------+-----------------+----------------+----------------------------------------------------------------------------------------------+---------+---------+------------------+----------------------------------------------------+----+
|ARDR4AC1187FB371A1|null           |                 |null            |Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti|511.16363|1        |SOBAYLL12A8C138AF9|Son

## Create song_table and artist_table

### Create Songs table + write to parquet file

In [11]:
print(df_sd)

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]


In [15]:
df_sd.createOrReplaceTempView("songs_table_DF")

In [16]:
songs_table = spark.sql("""
    SELECT song_id, title, artist_id, year, duration
    FROM songs_table_DF
    ORDER BY song_id
""")

In [17]:
print(songs_table)

DataFrame[song_id: string, title: string, artist_id: string, year: bigint, duration: double]


In [18]:
songs_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



In [19]:
songs_table.show()

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F|2000| 43.36281|
|SOBAYLL12A8C138AF9|Sono andati? Fing...|ARDR4AC1187FB371A1|   0|511.16363|
|SOBBUGU12A8C13E95D|Setting Fire to S...|ARMAC4T1187FB3FA4C|2004|207.77751|
|SOBBXLX12A58A79DDA|Erica (2005 Digit...|AREDBBQ1187B98AFF5|   0|138.63138|
|SOBCOSW12A8C13D398|  Rumba De Barcelona|AR7SMBG1187B9B9066|   0|218.38322|
|SOBEBDG12A58A76D60|        Kassie Jones|ARI3BMM1187FB4255E|   0|220.78649|
|SOBKWDJ12A8C13B2F3|Wild Rose (Back 2...|AR36F9J1187FB406F1|   0|230.71302|
|SOBLGCN12AB0183212|James (Hold The L...|AR47JEX1187B995D81|1985|124.86485|
|SOBONFF12A6D4F84D8|Tonight Will Be A...|ARIK43K1187B9AE54C|1986| 307.3824|
|SOBZBAZ12A6D4F8742|      Spanish Grease|AROUOZZ1187B9ABE51|1997|168.25424|
|SOCIWDW12A8

In [20]:
datetime.now()

datetime.datetime(2023, 2, 26, 15, 12, 27, 836537)

In [35]:
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')

In [25]:
song_data_path

'data/song_data/*/*/*/*.json'

In [30]:
OUTPUT_DATA_LOCAL 

'data/output_data/'

In [39]:
songs_table_path = OUTPUT_DATA_LOCAL + "songs_table" + "_" + now

In [40]:
songs_table_path 

'data/output_data/songs_table_2023-02-26-15-24-19-473672'

In [41]:
# Write DF data to JSON file(s)
# Ref: https://stackoverflow.com/questions/29908892/save-a-large-spark-dataframe-as-a-single-json-file-in-s3
#df_sd.write.mode('append').json(songs_table_path)
# -------
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
# Partitioning: https://stackoverflow.com/questions/43731679/how-to-save-a-partitioned-parquet-file-in-spark-2-1

# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)


                                                                                

### Create Artists table and write to parquet file

In [42]:
df_sd

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [43]:
df_sd.createOrReplaceTempView("artists_table_DF")
artists_table = spark.sql("""
    SELECT  artist_id AS artist_id,
            artist_name AS name,
            artist_location AS location,
            artist_latitude as latitude,
            artist_longitude as longitude
    FROM artists_table_DF
    ORDER BY artist_id desc
""")

In [44]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [45]:
artists_table.show(5, truncate=False)

+------------------+------------------+--------------+--------+---------+
|artist_id         |name              |location      |latitude|longitude|
+------------------+------------------+--------------+--------+---------+
|ARYKCQI1187FB3B18F|Tesla             |              |null    |null     |
|ARXR32B1187FB57099|Gob               |              |null    |null     |
|ARWB3G61187FB49404|Steve Morse       |Hamilton, Ohio|null    |null     |
|ARVBRGZ1187FB4675A|Gwen Stefani      |              |null    |null     |
|ARULZCI1241B9C8611|Luna Orbit Project|              |null    |null     |
+------------------+------------------+--------------+--------+---------+
only showing top 5 rows



In [46]:
# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now

artists_table.write.parquet(artists_table_path)

## Load log_data (from JSON to Spark)

In [49]:
df_ld = spark.read.json(LOG_DATA_LOCAL)

In [50]:
df_ld.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [51]:
df_ld.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [52]:
df_ld_filtered = df_ld.filter(df_ld.page == 'NextSong')

In [59]:
df_ld_filtered.show(5, truncate=True)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      

### Create User table and write to parquet file

In [60]:
df_ld_filtered.createOrReplaceTempView('users_table_DF')
users_table = spark.sql("""
    SELECT DISTINCT userId AS user_id,
                    firstName AS first_name,
                    lastName AS last_name,
                    gender AS gender,
                    level as level
    FROM users_table_DF
    ORDER BY user_id
    """)

In [61]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [62]:
users_table.show(3)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     10|    Sylvie|     Cruz|     F| free|
|    100|     Adler|  Barrera|     M| free|
|    101|    Jayden|      Fox|     M| free|
+-------+----------+---------+------+-----+
only showing top 3 rows



In [63]:
"""# Write DF to Spark parquet file
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
artists_table_path = OUTPUT_DATA_LOCAL + "artists_table.parquet" + "_" + now

artists_table.write.parquet(artists_table_path)"""

now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')

In [64]:
now

'2023-02-26-15-41-55-493045'

In [67]:
users_table_path = OUTPUT_DATA_LOCAL + "users_table.parquet" + "_" + now

In [68]:
users_table.write.parquet(users_table_path)

### Create Time table and write to parquet file

#### Create timestamp column

In [69]:
df_ld_filtered

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [73]:
ts_df = df_ld_filtered.select("ts")

In [74]:
ts_df.show()

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
|1542253449796|
|1542260935796|
|1542261224796|
|1542261356796|
|1542261662796|
|1542262057796|
|1542262233796|
|1542262434796|
|1542262456796|
|1542262679796|
|1542262728796|
|1542262893796|
|1542263158796|
|1542263378796|
|1542265716796|
|1542265929796|
|1542266927796|
+-------------+
only showing top 20 rows



In [82]:
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql import types as t

In [83]:
@udf(t.TimestampType())
def get_timestamp (ts):
    return datetime.fromtimestamp(ts / 1000.0)
    
df_ld_filtered = df_ld_filtered.withColumn("timestamp", get_timestamp("ts"))
                    

df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-------------+-------------------------------

## Create datetime column

In [84]:
@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

df_ld_filtered = df_ld_filtered.withColumn("datetime", get_datetime("ts"))
df_ld_filtered.printSchema()
df_ld_filtered.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)

+-----------+---------+---------+------+-------------+--------+---------+-----+------------------------------------+------+--------+-----------------+---------+----------------------------------------------+------+-----

[Stage 44:>                                                         (0 + 1) / 1]                                                                                

In [85]:
df_ld_filtered.createOrReplaceTempView("time_table_DF")
time_table = spark.sql("""
    SELECT  DISTINCT datetime AS start_time, 
                     hour(timestamp) AS hour, 
                     day(timestamp)  AS day, 
                     weekofyear(timestamp) AS week,
                     month(timestamp) AS month,
                     year(timestamp) AS year,
                     dayofweek(timestamp) AS weekday
    FROM time_table_DF
    ORDER BY start_time
""")
time_table.printSchema()
time_table.show(5, truncate=False)

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)





+-------------------+----+---+----+-----+----+-------+
|start_time         |hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-01 14:01:46|14  |1  |44  |11   |2018|5      |
|2018-11-01 14:05:52|14  |1  |44  |11   |2018|5      |
|2018-11-01 14:08:16|14  |1  |44  |11   |2018|5      |
|2018-11-01 14:11:13|14  |1  |44  |11   |2018|5      |
|2018-11-01 14:17:33|14  |1  |44  |11   |2018|5      |
+-------------------+----+---+----+-----+----+-------+
only showing top 5 rows





In [86]:
# Write DF to Spark parquet file
# Ref: https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
time_table_path = OUTPUT_DATA_LOCAL + "time_table.parquet" + "_" + now
print(time_table_path)
time_table.write.parquet(time_table_path)

data/output_data/time_table.parquet_2023-02-26-19-31-41-057705


                                                                                

## Create songplays table and write it to parquet file
### Join song_data and log_data

In [87]:
df_ld_sd_joined = df_ld_filtered.join(df_sd, (df_ld_filtered.artist == df_sd.artist_name) & (df_ld_filtered.song == df_sd.title))
df_ld_sd_joined.printSchema()
df_ld_sd_joined.show(5, truncate=False)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- datetime: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = 

23/02/26 19:35:45 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+------+---------+---------+------+-------------+--------+---------+-----+----------------------------------+------+--------+-----------------+---------+--------------+------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------+------+-----------------------+-------------------+------------------+---------------+---------------+----------------+-----------+---------+---------+------------------+--------------+----+
|artist|auth     |firstName|gender|itemInSession|lastName|length   |level|location                          |method|page    |registration     |sessionId|song          |status|ts           |userAgent                                                                                                                                |userId|timestamp              |datetime           |artist_id         |artist_latitude|artist_location|artist_longitude|artist_name|duration |num_songs|song



In [88]:
df_ld_sd_joined = df_ld_sd_joined.withColumn("songplay_id", f.monotonically_increasing_id())


df_ld_sd_joined.createOrReplaceTempView("songplays_table_DF")
songplays_table = spark.sql("""
    SELECT  songplay_id AS songplay_id, 
            timestamp   AS start_time, 
            userId      AS user_id, 
            level       AS level,
            song_id     AS song_id,
            artist_id   AS artist_id,
            sessionId   AS session_id,
            location    AS location,
            userAgent   AS user_agent
    FROM songplays_table_DF
    ORDER BY (user_id, session_id) 
""")

songplays_table.printSchema()
songplays_table.show(5, truncate=False)

root
 |-- songplay_id: long (nullable = false)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)





+-----------+-----------------------+-------+-----+------------------+------------------+----------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|songplay_id|start_time             |user_id|level|song_id           |artist_id         |session_id|location                          |user_agent                                                                                                                               |
+-----------+-----------------------+-------+-----+------------------+------------------+----------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|0          |2018-11-21 13:56:47.796|15     |paid |SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|818       |Chicago-Naperville-Elgin, IL-IN-WI|"Mozilla/5.0 (X11; Linux x86_64) AppleWebKi



In [89]:
now = datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
songplays_table_path = OUTPUT_DATA_LOCAL + "songplays_table.parquet" + "_" + now
print(songplays_table_path)
time_table.write.parquet(songplays_table_path)

data/output_data/songplays_table.parquet_2023-02-26-19-36-07-419579


                                                                                