The Python script below performs the following configuration tasks:

Initializes a SparkSession with the necessary .jar files for Snowflake integration.

Loads the Snowflake JDBC and Spark-Snowflake connector using the spark.jars property.

Sets up the Spark context for PySpark operations.

Prepares the environment to read/write between Spark and Snowflake.

In [1]:
#Importing all necessary classes and modules.
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import findspark

#Initalizing the findspark module to recognize the spark installed on my system
findspark.init()

# Initialize Spark Session
spark = SparkSession.builder.appName("SpotifyDataNormalization").config("spark.jars", r"C:\spark-3.5.5-bin-hadoop3\jars\snowflake-jdbc-3.23.2.jar,C:\spark-3.5.5-bin-hadoop3\jars\spark-snowflake_2.12-3.1.1").getOrCreate()

#Setting snowflake credentials.
snowflakeConfig = {
    "sfURL": "https://cexmrap-kb36509.snowflakecomputing.com",
    "sfUser": os.getenv("SNOWFLAKE_USER1"),
    "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
    "sfDatabase": os.getenv("SNOWFLAKE_DB1"),
    "sfSchema": os.getenv("SNOWFLAKE_SCHEMA1"),
    "sfWarehouse": os.getenv("SNOWFLAKE_WAREHOUSE1"),
    "sfRole": os.getenv("SNOWFLAKE_ROLE1")
}


The Python script below performs the following tasks:

Converts the contents of the played_at field from Unix Timestamp to a readable Date and Time format.

Writes the transformed data to a CSV file.

The CSV file will be overwritten each time the script is run.

In [6]:
# Read data into a dataframe
df = spark.read.option("multiline", "true").json("all_recent_tracks.json")

# Convert timestamp to normal date and time 
df = df.withColumn("played_at", F.from_unixtime(F.col("played_at") / 1000, "yyyy-MM-dd HH:mm:ss"))

# Creating A CSV file of the transformed data
df.coalesce(1).write.option("header", "true").mode("overwrite").csv("clean_all_tracks.csv")

# Load the transformed data into snowflake database
df.write.format("snowflake").options(**snowflakeConfig).option("dbtable", "all_tracks").option("sfTableCreation", "CREATE_IF_NEEDED").mode("append").save()

df.show(truncate=False)

+-----------------+-------------------+----------------------------------------------------+
|artist_name      |played_at          |track_name                                          |
+-----------------+-------------------+----------------------------------------------------+
|Strings And Heart|2025-03-29 01:30:46|honeydew (praise the Lord)                          |
|Strings And Heart|2025-03-29 01:32:49|dopamine                                            |
|Strings And Heart|2025-03-29 01:35:48|evergreen love                                      |
|Strings And Heart|2025-03-29 01:37:49|bright eyed                                         |
|Strings And Heart|2025-03-29 01:42:10|Your Love                                           |
|Strings And Heart|2025-03-29 01:45:10|dulce                                               |
|Strings And Heart|2025-03-29 01:48:15|rescue                                              |
|Rojo             |2025-03-29 01:51:23|Cuando Te Encontré feat. String

This script, although commented out, plays a crucial role in some of our projects. Its intended purpose is to move the necessary CSV file from the folder created by PySpark after the transformation process. The goal is to take the CSV file containing the transformed data and move it to a specific location.

Why is it commented out?
Some may wonder why this script is commented. The reason is that the name of the file changes each time the script is re-run. Because PySpark generates a new file with a unique name (e.g., part-00001-...), the path of the file keeps changing. Given that the data is intended to keep growing and the file name is dynamic, it wouldn't be feasible to hard-code the path or attempt to manually move the file. This makes it challenging to automate without first addressing the changing file names.



In [None]:
""" import shutil
# Move the part file to desired location
part_file = "clean_tracks.csv\part-00000-e767b6d4-f389-44a7-91b0-dec958759f38-c000.csv"
destination = "clean_recent_tracks.csv"
shutil.move(part_file,destination) """


  """ import shutil


' import shutil\n# Move the part file to desired location\npart_file = "clean_tracks.csv\\part-00000-e767b6d4-f389-44a7-91b0-dec958759f38-c000.csv"\ndestination = "clean_recent_tracks.csv"\nshutil.move(part_file,destination) '

The script below performs the following tasks:

Trims down the track ID: It extracts only the numbers from the full track ID, which might originally look like spotify:track:{id}. This operation helps focus on the core track ID, removing the unnecessary prefix.

Splitting and Indexing: The splitting of the track ID and indexing is used as an exercise to improve skills in working with string manipulation.

Converts song duration: The script converts song durations from milliseconds to minutes and seconds for easier readability.

Drops the link column: Since the link column was filled with null values, it was deemed unnecessary and dropped to clean up the dataset.

In [4]:
#Loading the raw file(json) into the second dataframe 
df2 = spark.read.option("multiline", "true").json("all_top_tracks.json")

# Split by ":" and get just the id which is the last part
df2 = df2.withColumn("id", F.split("id", ":").getItem(2))

#Set the duration in milliseconds to nomral minutes and seconds format
#Convert to minutes
df2 = df2.withColumn("minutes", F.floor(F.col("duration") / 60000))
#Get the seconds
df2= df2.withColumn("seconds", F.floor((F.col("duration") % 60000) / 1000))
#Join the two of them together
df2 = df2.withColumn( "duration",F.concat_ws(":", F.col("minutes"), F.lpad(F.col("seconds").cast("string"), 2, "0")))
#Removing the link column
df2 = df2.drop("link")

#Converting the dataframe into a csv file 
df2.coalesce(1).write.option("header", "true").mode("overwrite").csv("clean_all_top_tracks.csv")

# Load the transformed data into snowflake database
df2.write.format("snowflake").options(**snowflakeConfig).option("dbtable", "top_tracks").option("sfTableCreation", "CREATE_IF_NEEDED").mode("append").save()

df2.show(truncate=False)

+---------------+--------+----------------------+----------+-----------------------------------+-------+-------+
|artist_name    |duration|id                    |popularity|track_name                         |minutes|seconds|
+---------------+--------+----------------------+----------+-----------------------------------+-------+-------+
|Claire Leslie  |3:00    |4HR5BN6hc4AmcPO1NK0fgK|45        |24/7                               |3      |0      |
|Grace Marr     |2:35    |5BH8UixV8wu3FR5xJksIJN|2         |Belong                             |2      |35     |
|Cade Kellam    |4:02    |7LcJx95sWXpqCMLxQJfhMM|46        |Blessed                            |4      |2      |
|Forrest Frank  |2:41    |1716cky8w4roZox3AyO1zh|58        |CELEBRATION                        |2      |41     |
|Forrest Frank  |2:41    |2YfSEfmGirtcp6C6ZcLelL|57        |CELEBRATION                        |2      |41     |
|OAKS           |3:16    |2TUW3yDwsfNcBOtk8RPSqu|26        |Clean                              |

The script below performs the following tasks:

Retrieves all contents of the genre column: The genre column in the raw JSON file is an array containing multiple genre values for each track.

Extracts all genre values: The script extracts and processes each genre from the array, ensuring all the different genres associated with a track are retrieved.

In [5]:
#Loading the raw file(json) into the third dataframe 
df3 = spark.read.option("multiline", "true").json("all_top_artists.json")

#Getting all the contents of the genre array
df3 = df3.withColumn("genre", F.explode("Genre"))

#Converting the dataframe into a csv file 
df3.coalesce(1).write.option("header", "true").mode("overwrite").csv("clean_all_top_artists.csv")

# Load the transformed data into snowflake database
df3.write.format("snowflake").options(**snowflakeConfig).option("dbtable", "top_artist").option("sfTableCreation", "CREATE_IF_NEEDED").mode("append").save()

df3.show(truncate=False)

+-----------------+-----------+--------------------------+------------------------------------------------------+
|Artistname       |FamousLevel|genre                     |Link                                                  |
+-----------------+-----------+--------------------------+------------------------------------------------------+
|sxxnt.           |53         |gospel r&b                |https://open.spotify.com/artist/4T0c560DVGr1cAtE3reOP1|
|Cade Kellam      |36         |gospel r&b                |https://open.spotify.com/artist/1GWnsRvGpSmKxHOjMQah4k|
|Cade Kellam      |36         |christian alternative rock|https://open.spotify.com/artist/1GWnsRvGpSmKxHOjMQah4k|
|Claire Leslie    |42         |pop worship               |https://open.spotify.com/artist/5GkuwRdmvp8r48JCPwqM7E|
|Claire Leslie    |42         |christian pop             |https://open.spotify.com/artist/5GkuwRdmvp8r48JCPwqM7E|
|Claire Leslie    |42         |christian                 |https://open.spotify.com/artis