In [10]:
import requests
import zipfile
import os

# Set up Kaggle API credentials
KAGGLE_USERNAME = "anxxxxxxxxx95"
KAGGLE_KEY = "cbxxxxxxxxxxxxxxxxxxxxxba"
DATASET_ID = "tonygordonjr/spotify-dataset-2023"  # Found from the URL



# Kaggle API endpoint
DOWNLOAD_URL = f"https://www.kaggle.com/api/v1/datasets/download/tonygordonjr/spotify-dataset-2023/spotify-albums_data_2023.csv"

# Set up authentication
auth = (KAGGLE_USERNAME, KAGGLE_KEY)

# Define local save path
save_path = "./data/spotify_dataset.zip"

# Download dataset
response = requests.get(DOWNLOAD_URL, auth=auth, stream=True)

if response.status_code == 200:
    with open(save_path, "wb") as file:
        for chunk in response.iter_content(chunk_size=1024):
            file.write(chunk)
    print(f"Dataset downloaded successfully: {save_path}")
else:
    print(f"Failed to download dataset. Status Code: {response.status_code}")

required_files = {"spotify-albums_data_2023.csv", "spotify_tracks_data_2023.csv"}
extract_path = "../data"

with zipfile.ZipFile(save_path, "r") as zip_ref:
    all_files = zip_ref.namelist()  # List all files in the ZIP
    for file in all_files:
        if file in required_files:
            zip_ref.extract(file, extract_path)
            print(f"Extracted: {file}")

print("Extraction complete. Extracted files:", os.listdir(extract_path))
os.remove(save_path)


Dataset downloaded successfully: ./data/spotify_dataset.zip
Extracted: spotify-albums_data_2023.csv
Extraction complete. Extracted files: ['spotify-albums_data_2023.csv', 'spotify_dataset.zip', 'spotify_tracks_data_2023.csv']


In [11]:
from pyspark.sql import SparkSession
import logging, os

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Path to the JDBC driver
jdbc_path = os.path.abspath("../jars/postgresql-42.7.3.jar")

print(f'The JDBC path is {jdbc_path}')

try:
    spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Spotify Data Processing") \
        .config("spark.jars", jdbc_path) \
        .getOrCreate()
    logger.info("✅ Spark session created successfully.")
except Exception as e:
    logger.error(f"❌ Error creating Spark session: {e}")
    raise

2025-02-15 12:27:58,729 - INFO - ✅ Spark session created successfully.


The JDBC path is /Users/sivaprakash/de/spotify-kaggle-analysis/jars/postgresql-42.7.3.jar


In [12]:
spotify_albums_df = spark \
    .read \
    .option('header', True) \
    .option('inferSchema', True) \
    .csv("data/spotify-albums_data_2023.csv")

spotify_tracks_df = spark \
    .read \
    .option('header', True) \
    .option('inferSchema', True) \
    .csv("data/spotify_tracks_data_2023.csv")

                                                                                

In [13]:
spotify_albums_df.printSchema()
spotify_tracks_df.printSchema()


root
 |-- track_name: string (nullable = true)
 |-- track_id: string (nullable = true)
 |-- track_number: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- album_type: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- total_tracks: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- label: string (nullable = true)
 |-- album_popularity: string (nullable = true)
 |-- album_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_0: string (nullable = true)
 |-- artist_1: string (nullable = true)
 |-- artist_2: string (nullable = true)
 |-- artist_3: string (nullable = true)
 |-- artist_4: string (nullable = true)
 |-- artist_5: string (nullable = true)
 |-- artist_6: string (nullable = true)
 |-- artist_7: string (nullable = true)
 |-- artist_8: string (nullable = true)
 |-- artist_9: string (nullable = true)
 |-- artist_10: string (nullable = true)
 |-- art

In [72]:
from pyspark.sql.functions import col, countDistinct

# Filter, join, group by, and aggregate properly
album_count_per_label = (
    spotify_albums_df
    .join(
        spotify_tracks_df,
        on=spotify_tracks_df.id == spotify_albums_df.track_id,
        how="inner"
    )
    .filter(
        (col("explicit") == False) & (col('track_popularity') > 50)
    )
    .groupBy(col("label"))
    .agg(
        countDistinct("track_id").alias("total_tracks")  # Count distinct albums
    )
    .orderBy(col("total_tracks").desc())  # Sort descending
    .limit(20)
)

# album_count_per_label.show(truncate=False)

from pyspark.sql.functions import col, count

from pyspark.sql import functions as F

# Find duplicate track_id values
duplicate_tracks_df = (
    spotify_albums_df
    .groupBy('track_id')
    .agg(F.count("track_id").alias("tk_cnts"))
    .filter(F.col("tk_cnts") > 1)  # Only keep non-duplicates
).collect()

# Get duplicate track IDs as a list from duplicate_tracks_df
duplicate_track_ids = [row['track_id'] for row in duplicate_tracks_df]

# Filter spotify_tracks_df for tracks that are in the list of duplicate track IDs
spotify_tracks_df_filtered = (
    spotify_albums_df
    .filter(col("track_id").isin(duplicate_track_ids))
)

spotify_tracks_df_filtered.show()
# print(duplicate_track_ids)


# duplicate_tracks_df.count()
# duplicate_tracks_df.count() #438360 - With = 1 and #438524 with != 1
# spotify_albums_df.filter(col("track_id").isNotNull()).count()


                                                                                

+--------------------+--------------------+--------------------+--------------------+-----------+-------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------+--------+--------+--------+--------+---------+---------+------------+
|          track_name|            track_id|        track_number|         duration_ms| album_type|artists|total_tracks|          album_name|        release_date|               label|    album_popularity|            album_id|           artist_id|            artist_0|            artist_1|            artist_2|            artist_3|          artist_4|artist_5|artist_6|artist_7|artist_8|artist_9|artist_10|artist_11|duration_sec|
+--------------------+--------------------+--------------------+--------------------+-----------+-------+------------+--------------------+---------

In [14]:
# Cleaning of Track Dataframe.
cleaned_tracks_df = spotify_tracks_df.dropna()


In [15]:
# Cleaning of Album Dataframe.
from pyspark.sql.functions import col, round, to_date, when
updated_spotify_albums_df = (
            spotify_albums_df
            .withColumn("duration_min", round(col("duration_ms").cast('float') / 60000, 2))
            .withColumn('release_date', to_date(col("release_date"), "yyyy-MM-dd HH:mm:ss 'UTC'"))
            .withColumn("radio_mix", when(col("duration_min") <= 3.00, True).otherwise(False))
            .select(
                'track_id',
                'track_name',
                'duration_min',
                'release_date',
                'label',
                'radio_mix'
            )
        )

updated_spotify_albums_df_cleaned = updated_spotify_albums_df.dropna()
updated_spotify_albums_df_cleaned.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- duration_min: double (nullable = true)
 |-- release_date: date (nullable = true)
 |-- label: string (nullable = true)
 |-- radio_mix: boolean (nullable = false)



In [16]:
# Join The dataframe and create a master dataframe

master_df = (
    updated_spotify_albums_df_cleaned
    .join(cleaned_tracks_df, updated_spotify_albums_df_cleaned.track_id == cleaned_tracks_df.id, how = 'inner')
    .filter(
        (col('track_popularity') > 50) & (col('explicit') == False)
    )
    .select(
        'track_id',
        'track_name',
        'duration_min',
        'release_date' ,
        'label',
        'track_popularity',
        'radio_mix'
    )
)

master_df.printSchema()
# master_df.show(2)

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- duration_min: double (nullable = true)
 |-- release_date: date (nullable = true)
 |-- label: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- radio_mix: boolean (nullable = false)



In [11]:
master_df.createOrReplaceTempView("album_tracks")

25/02/14 09:47:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [14]:
spark.sql("""
    select
        *
    from album_tracks
    order by label
""").show(truncate=False)

                                                                                

+---------------------------------------------------------+------------+------------+
|label                                                    |track_number|total_tracks|
+---------------------------------------------------------+------------+------------+
|"""2 U"" PS (Bieber/Guetta)"                             |1           |1           |
|"Ariana Grande & Justin Bieber ""Stuck With U""- Charity"|1           |1           |
|"BLACKPINK ""Live in Seoul"" / Interscope P&D"           |8           |15          |
|"BLACKPINK ""Live in Seoul"" / Interscope P&D"           |9           |15          |
|"Roc Nation / ""Home"" Soundtrack"                       |1           |1           |
|"Roc Nation / ""Home"" Soundtrack"                       |1           |8           |
|"Roc Nation / ""Home"" Soundtrack"                       |6           |8           |
|(주)뮤직버디, (주)블렌딩                                 |1           |2           |
|(주)블렌딩                                               |1       

In [44]:
# The top 20 ‘labels’ against their total number of ‘tracks’.
spark.sql("""
    SELECT
        label,
        COUNT(track_id) as total_tracks
    FROM
        album_tracks
    GROUP BY
        label
    ORDER BY
        total_tracks DESC
    LIMIT 20
""").show(5, truncate=False)

                                                                                

+-------------------+------------+
|label              |total_tracks|
+-------------------+------------+
|Columbia           |429         |
|Warner Records     |391         |
|Taylor Swift       |321         |
|BIGHIT MUSIC       |284         |
|Walt Disney Records|258         |
+-------------------+------------+



In [21]:
# The top 25 popular ‘tracks’ released between 2020 and 2023.
spark.sql("""
    SELECT
        track_name
    FROM
        album_tracks
    WHERE
        EXTRACT(YEAR FROM release_date) BETWEEN 2020 AND 2023
    ORDER BY track_popularity DESC
    LIMIT 25;
""").show(25, truncate=False)

                                                                                

+----------------------------------------------------------------------------+
|track_name                                                                  |
+----------------------------------------------------------------------------+
|My Love Mine All Mine                                                       |
|"What Was I Made For? [From The Motion Picture ""Barbie""]"                 |
|Is It Over Now? (Taylor'S Version) (From The Vault)                         |
|One Of The Girls (With Jennie, Lily Rose Depp)                              |
|"What Was I Made For? [From The Motion Picture ""Barbie""]"                 |
|As It Was                                                                   |
|Flowers                                                                     |
|You’Re Losing Me (From The Vault)                                           |
|Anti-Hero                                                                   |
|Blinding Lights                                    

In [18]:
spark.stop()
# master_df.count()




In [17]:
# PostgreSQL connection details
table_name = "spotify.spotify_master"
prop = {
    "user": "postgres",
    "password": "postgres",
    "driver": "org.postgresql.Driver"
}
url = "jdbc:postgresql://localhost:5432/postgres"

# Write DataFrame to PostgreSQL
try:
    master_df.write.jdbc(url=url, table=table_name, mode="overwrite", properties=prop)
    print(f"✅ Data written to PostgreSQL table: {table_name}")
except Exception as e:
    print(f"❌ Failed to write data to PostgreSQL table: {table_name} with error: {e}")

                                                                                

✅ Data written to PostgreSQL table: spotify.spotify_master
