# 01_Data_Load_and_Clean.ipynb
**Author:** Saif Al-Murqi
**Date:** 2025-06-10
**Purpose:**  
1. Ingest raw Spotify chart CSV/API data  
2. Perform initial cleaning and type conversions  
3. Add basic feature engineering (e.g. short labels, derived columns)  
4. Export cleaned dataset for downstream analysis


In [None]:
import pandas as pd
import numpy as np
import os
import kagglehub
from pathlib import Path
import pycountry
from sqlalchemy import create_engine, text


In [None]:
# Download the dataset and get the local path
dataset_path = kagglehub.dataset_download('asaniczka/top-spotify-songs-in-73-countries-daily-updated')

# Print the path to confirm
print('Dataset downloaded to:', dataset_path)


In [None]:
# Load the Spotify dataset
csv_path = Path(dataset_path) / 'universal_top_spotify_songs.csv'
df = pd.read_csv(csv_path)


columns_to_keep = [
    'spotify_id',
    'name',
    'artists',
    'daily_rank',
    'daily_movement',
    'weekly_movement',
    'country',
    'snapshot_date',
    'popularity'
]
df = df[columns_to_keep]

# Quick preview
df.info()
print(df.isna().sum())
df.head()


In [None]:
# Cleaning the spotify dataset

def iso_to_country_name(code): # Takes country ISO code and returns name
    if pd.isna(code):
        return 'Global'
    country = pycountry.countries.get(alpha_2 = code)
    return country.name if country else code

df.rename(columns={'name': 'song_name'}, inplace=True)

df['country'] = df['country'].apply(iso_to_country_name)

df.dropna(subset=['song_name', 'artists'], inplace=True)
df['artists'] = df['artists'].apply(lambda x: x.split(','))
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'])

# Updated preview
print(df.info())
print(df.isna().sum()) # Check no null values remain
df.sample(n=50, random_state=42) # Select 100 random rows to display from the dataset


In [None]:
# Feature Engineering

df['week_start'] = df['snapshot_date'] - pd.to_timedelta(df['snapshot_date'].dt.weekday, unit='D')  # Feature 1: Start of the week (used for time-series grouping)
df['year'] = df['snapshot_date'].dt.year    # Feature 2: Extract year for filtering/slicing
df['is_global'] = df['country'].apply(lambda x: x == 'Global')  # Feature 3: Boolean to distinguish global vs country-level charts

'''volatility = (  # Feature 4: Volatility Score (per song) – total absolute weekly movement
    df.groupby(['spotify_id','song_name'])['weekly_movement'].apply(lambda x: x.abs().sum()).reset_index(name='volatility_score')
)'''
volatility = (
    df.groupby('spotify_id').agg(song_name=('song_name', lambda x: x.mode().iloc[0]), volatility_score=('weekly_movement', lambda x: x.abs().sum())).reset_index()
)

volatility_by_country = (   # Feature 5: Volatility Score by Country – sum of absolute weekly movements
    df[df['is_global'] == False].groupby('country')['weekly_movement'].apply(lambda x: x.abs().mean()).reset_index(name='volatility_score_by_country')
)

first_last = (  # Feature 6: First/last appearance, days in chart, and trend duration (per song)
    df.groupby('spotify_id').agg(song_name=('song_name', lambda x: x.mode().iloc[0]), first_appearance=('snapshot_date', 'min'), last_appearance=('snapshot_date', 'max'), days_in_chart=('snapshot_date', 'nunique')).reset_index()
)

first_last['trend_duration'] = (first_last['last_appearance'] - first_last['first_appearance']).dt.days

#Note: Do not merge df with volatility_by_country, latter is only used for heatmap

In [None]:
# Replace these with your actual PostgreSQL settings
username = '' #Input Username
password = ''   #Input Password
host = ''       #Input Host
port = ''       #Input Port
database = ''   #Input Database

# Create the connection string
conn_str = f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}'

# Create the SQLAlchemy engine
engine = create_engine(conn_str)

In [None]:
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE spotify_daily"))
    conn.execute(text("TRUNCATE TABLE spotify_volatility"))
    conn.execute(text("TRUNCATE TABLE spotify_trend_duration"))
    conn.execute(text("TRUNCATE TABLE spotify_volatility_by_country"))


df.to_sql('spotify_daily', engine, index=False, if_exists='append', method='multi', chunksize=10000)
volatility.to_sql('spotify_volatility', engine, index=False, if_exists='append', method='multi', chunksize=5000)
first_last.to_sql('spotify_trend_duration', engine, index=False, if_exists='append', method='multi', chunksize=5000)
volatility_by_country.to_sql('spotify_volatility_by_country', engine, index=False, if_exists='append', method='multi', chunksize=2500)