In [2]:
# 1. Import Libraries and Configure Environment
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import kagglehub
import torch
from transformers import pipeline
from sentence_transformers import SentenceTransformer, util
import statsmodels.api as sm
import statsmodels.formula.api as smf
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import re
from collections import Counter

# Configure Plotting
sns.set_theme(style="whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

# Check for GPU
device = 0 if torch.cuda.is_available() else -1
if torch.backends.mps.is_available():
    device = "mps"
print(f"Using device: {device}")

Using device: mps


In [None]:
# 2. Load Data and Preprocess Lyrics

# Download/Load Dataset
print("Loading dataset...")
path = kagglehub.dataset_download("rhaamrozenberg/billboards-top-100-song-1946-to-2022-lyrics")
print("Path to dataset files:", path)
# Load CSV
import os
import ast
csv_path = os.path.join(path, "Billboard_Hot_100_with_features.csv") # Verify filename if possible, usually it's this or similar

if not os.path.exists(csv_path):
    files = os.listdir(path)
    csv_files = [f for f in files if f.endswith('.csv')]
    if csv_files:
        csv_path = os.path.join(path, csv_files[0])
    else:
        raise FileNotFoundError("No CSV found in dataset path")

df = pd.read_csv(csv_path)
print(f"Loaded {len(df)} rows.")
print("Original Columns:", df.columns)

# Rename columns for consistency
# Actual columns: ['Song', 'Artist Names', 'Hot100 Ranking Year', 'Hot100 Rank', 'Lyrics']
rename_map = {
    'Hot100 Ranking Year': 'year',
    'Hot100 Rank': 'rank',
    'Lyrics': 'lyrics',
    'Song': 'title',
    'Artist Names': 'artist'
}
df.rename(columns=rename_map, inplace=True)
print("Renamed Columns:", df.columns)

# Ensure numeric types
df['year'] = pd.to_numeric(df['year'], errors='coerce')
df['rank'] = pd.to_numeric(df['rank'], errors='coerce')

# Basic Preprocessing
# 1. Convert 'year' to numeric if needed (it usually is)
# 2. Clean lyrics
def clean_lyrics(text):
    if not isinstance(text, str):
        return ""
    
    # The lyrics seem to be string representations of lists: "['word', 'word', ...]"
    # We need to parse this string back into a list and join it, or just clean the string representation.
    # Let's try to parse it safely.
    try:
        # If it looks like a list string "['...']"
        if text.strip().startswith('[') and text.strip().endswith(']'):
            # Use ast.literal_eval to safely parse the string list
            words = ast.literal_eval(text)
            if isinstance(words, list):
                text = ' '.join(words)
    except (ValueError, SyntaxError):
        # If parsing fails, fall back to regex cleaning of the string representation
        pass

    text = text.lower()
    # Remove text in brackets like [Chorus], [Verse 1] (if any remain after joining)
    text = re.sub(r'\[.*?\]', '', text)
    # Remove special characters but keep basic punctuation if needed, or strip all
    text = re.sub(r'[^a-z0-9\s\.,\'\?!]', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Apply cleaning
print("Cleaning lyrics...")
df['lyrics_clean'] = df['lyrics'].apply(clean_lyrics)

# Debug: Check lengths
print("Lyrics length stats:", df['lyrics_clean'].str.len().describe())

# Filter empty lyrics
before_len = len(df)
df = df[df['lyrics_clean'].str.len() > 50]
print(f"Rows after lyrics length filter: {len(df)} (dropped {before_len - len(df)})")

# Filter Year Window (2000-2023 is the dataset, but let's ensure)
before_len = len(df)
df = df[(df['year'] >= 2000) & (df['year'] <= 2023)]
print(f"Rows after year filter: {len(df)} (dropped {before_len - len(df)})")

# Sort
df = df.sort_values(['year', 'rank'])

print(f"Final dataset size: {len(df)}")
if len(df) > 0:
    print(df.head())
else:
    print("WARNING: Dataset is empty after filtering. Check year range and lyrics content.")

Loading dataset...
Path to dataset files: /Users/halfidaldal/.cache/kagglehub/datasets/rhaamrozenberg/billboards-top-100-song-1946-to-2022-lyrics/versions/1
Loaded 6879 rows.
Original Columns: Index(['Song', 'Artist Names', 'Hot100 Ranking Year', 'Hot100 Rank', 'Lyrics'], dtype='object')
Renamed Columns: Index(['title', 'artist', 'year', 'rank', 'lyrics'], dtype='object')
Cleaning lyrics...
Path to dataset files: /Users/halfidaldal/.cache/kagglehub/datasets/rhaamrozenberg/billboards-top-100-song-1946-to-2022-lyrics/versions/1
Loaded 6879 rows.
Original Columns: Index(['Song', 'Artist Names', 'Hot100 Ranking Year', 'Hot100 Rank', 'Lyrics'], dtype='object')
Renamed Columns: Index(['title', 'artist', 'year', 'rank', 'lyrics'], dtype='object')
Cleaning lyrics...
Lyrics length stats: count    6879.000000
mean     1399.715075
std       943.656294
min         0.000000
25%       819.500000
50%      1293.000000
75%      1863.000000
max      6036.000000
Name: lyrics_clean, dtype: float64
Rows af

In [None]:
# 3. Extract Sentiment and Emotion Features

# Initialize Sentiment Pipeline (RoBERTa)
print("Initializing Sentiment Model...")
sentiment_pipeline = pipeline(
    "sentiment-analysis",
    model="cardiffnlp/twitter-roberta-base-sentiment-latest",
    tokenizer="cardiffnlp/twitter-roberta-base-sentiment-latest",
    device=0 if device == 0 else -1, # MPS support in transformers pipelines can be tricky, usually CPU or CUDA
    top_k=None # Return all scores
)

# Initialize Emotion Pipeline (GoEmotions)
print("Initializing Emotion Model...")
emotion_pipeline = pipeline(
    "text-classification",
    model="SamLowe/roberta-base-go_emotions",
    device=0 if device == 0 else -1,
    top_k=None
)

# Function to process in batches
def get_sentiment_emotion(texts, batch_size=32):
    sentiments = []
    emotions = []
    
    # Process sentiment
    print("Processing Sentiment...")
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        # Enable truncation to prevent CUDA errors on long lyrics
        results = sentiment_pipeline(batch, truncation=True, max_length=512)
        # results is list of lists of dicts
        for res in results:
            # res is [{'label': 'positive', 'score': 0.9}, ...]
            scores = {item['label']: item['score'] for item in res}
            # Calculate valence score: Positive - Negative
            valence = scores.get('positive', 0) - scores.get('negative', 0)
            sentiments.append(valence)
            
    # Process emotions
    print("Processing Emotions...")
    # We'll focus on key emotions: sadness, anxiety (fear), anger, joy
    target_emotions = ['sadness', 'fear', 'anger', 'joy', 'optimism', 'excitement']
    
    emotion_data = {e: [] for e in target_emotions}
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        results = emotion_pipeline(batch, truncation=True, max_length=512)
        
        for res in results:
            scores = {item['label']: item['score'] for item in res}
            for e in target_emotions:
                emotion_data[e].append(scores.get(e, 0))
                
    return sentiments, emotion_data

# Run extraction
# Using a sample for speed if needed, but let's try full dataset (approx 2300 songs)
texts = df['lyrics_clean'].tolist()
print(f"Extracting features for {len(texts)} songs...")

# Note: This might take a few minutes.
valences, emotion_scores = get_sentiment_emotion(texts)

# Add to DataFrame
df['valence_transformer'] = valences
for e, scores in emotion_scores.items():
    df[f'emotion_{e}'] = scores

print("Feature extraction complete.")
df.head()

Initializing Sentiment Model...


Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu
Device set to use cpu


Initializing Emotion Model...


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

Cancellation requested; stopping current tasks.


KeyboardInterrupt: 

In [None]:
# 4. Extract Lexical and Economic Dictionary Features

# Define Lexicons
economic_lexicon = set([
    'money', 'broke', 'job', 'work', 'bills', 'rent', 'debt', 'pay', 'paycheck', 
    'mortgage', 'bank', 'rich', 'poor', 'cash', 'dollar', 'price', 'cost', 'economy',
    'crisis', 'recession', 'hustle', 'grind', 'wage', 'saving', 'spend'
])

pronouns_first = set(['i', 'me', 'my', 'mine', 'myself'])
pronouns_social = set(['we', 'us', 'our', 'ours', 'ourselves'])
negations = set(['not', 'never', 'no', 'cant', 'wont', 'dont', 'didnt', 'couldnt', 'wouldnt', 'shouldnt'])

def extract_lexical_features(text):
    words = text.split()
    num_words = len(words)
    if num_words == 0:
        return pd.Series([0]*6)
    
    # Counts
    econ_count = sum(1 for w in words if w in economic_lexicon)
    first_person_count = sum(1 for w in words if w in pronouns_first)
    social_count = sum(1 for w in words if w in pronouns_social)
    negation_count = sum(1 for w in words if w in negations)
    
    # Type-Token Ratio
    ttr = len(set(words)) / num_words
    
    # Avg Word Length
    avg_len = sum(len(w) for w in words) / num_words
    
    return pd.Series([
        econ_count / num_words, # Normalize by length
        first_person_count / num_words,
        social_count / num_words,
        negation_count / num_words,
        ttr,
        avg_len
    ])

print("Extracting lexical features...")
lexical_cols = ['freq_economic', 'freq_self', 'freq_social', 'freq_negation', 'ttr', 'avg_word_len']
df[lexical_cols] = df['lyrics_clean'].apply(extract_lexical_features)

print("Lexical extraction complete.")
df[lexical_cols].describe()

In [None]:
# 5. Compute Semantic Similarity to Recession Concepts

# Initialize Sentence Transformer
# 'all-mpnet-base-v2' is a strong general-purpose model
EMBEDDING_MODEL = "sentence-transformers/all-mpnet-base-v2"
print(f"Loading Embedding model: {EMBEDDING_MODEL}")
embedder = SentenceTransformer(EMBEDDING_MODEL, device=device if device != -1 else 'cpu')

# Define rich concept descriptions
concepts = {
    "econ_anxiety": "lyrics about losing jobs, struggling to pay bills, being broke, worrying about money, recession, crisis",
    "resilience": "working overtime, hustling to survive, keeping family afloat, working multiple jobs",
    "escapist_party": "forgetting problems by partying, drinking, getting high to escape real life",
    "luxury_flex": "bragging about money, expensive cars, brands, being rich"
}

# Encode the concept descriptions
print("Encoding concept descriptions...")
concept_embeddings = {name: embedder.encode(desc, convert_to_tensor=True) for name, desc in concepts.items()}

# Encode lyrics
print(f"Encoding lyrics for {len(df)} songs...")
lyrics_list = df['lyrics_clean'].tolist()
# Batch encoding
lyric_embeddings = embedder.encode(lyrics_list, batch_size=64, convert_to_tensor=True, show_progress_bar=True)

# Compute cosine similarities
print("Computing similarities...")
similarity_scores = {}

for name, concept_emb in concept_embeddings.items():
    # util.cos_sim returns (1, N)
    scores = util.cos_sim(concept_emb, lyric_embeddings)[0]
    similarity_scores[f'sim_{name}'] = scores.cpu().numpy()

# Add to dataframe
for name, scores in similarity_scores.items():
    df[name] = scores

print("Semantic similarity computation complete.")
df[['sim_econ_anxiety', 'sim_resilience', 'sim_escapist_party', 'sim_luxury_flex']].head()

In [None]:
# 6. Visualize Feature Trends (2003–2015 Focus)

# Define periods for visualization
df['period'] = pd.cut(df['year'], 
                      bins=[1999, 2007, 2010, 2015, 2023], 
                      labels=['Pre-Recession', 'Recession', 'Post-Recession', 'Modern'])

# Aggregate by year
yearly_means = df.groupby('year').mean(numeric_only=True).reset_index()

# Plotting Function
def plot_trend(feature, title, ylabel):
    plt.figure(figsize=(12, 6))
    
    # Plot full trend
    sns.lineplot(data=yearly_means, x='year', y=feature, marker='o', linewidth=2, label='Yearly Mean')
    
    # Highlight Recession Window
    plt.axvspan(2008, 2010, color='red', alpha=0.1, label='Recession (2008-2010)')
    
    plt.title(title, fontsize=16)
    plt.xlabel('Year', fontsize=12)
    plt.ylabel(ylabel, fontsize=12)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

# Plot Key Features
plot_trend('valence_transformer', 'Average Song Valence (Sentiment) Over Time', 'Valence Score')
plot_trend('freq_economic', 'Frequency of Economic Terms', 'Frequency')
plot_trend('sim_econ_anxiety', 'Semantic Similarity to "Economic Anxiety"', 'Cosine Similarity')
plot_trend('sim_escapist_party', 'Semantic Similarity to "Escapist Partying"', 'Cosine Similarity')
plot_trend('sim_luxury_flex', 'Semantic Similarity to "Luxury/Wealth"', 'Cosine Similarity')

In [None]:
# 7. Statistical Analysis: Detecting Shifts in the Recession Window

# Define Dummy Variables
df['is_recession'] = df['year'].apply(lambda x: 1 if 2008 <= x <= 2010 else 0)
df['is_post_recession'] = df['year'].apply(lambda x: 1 if 2011 <= x <= 2015 else 0)

# Function to run OLS
def run_recession_regression(target_col):
    # Formula: target ~ is_recession + is_post_recession + year + rank
    # We center 'year' to avoid multicollinearity issues or large numbers
    df['year_centered'] = df['year'] - 2008
    
    formula = f"{target_col} ~ is_recession + is_post_recession + year_centered + rank"
    model = smf.ols(formula=formula, data=df).fit()
    
    print(f"--- Regression Results for {target_col} ---")
    print(model.summary().tables[1])
    print("\n")
    return model

# Run for key features
key_features = ['valence_transformer', 'freq_economic', 'sim_econ_anxiety', 'sim_escapist_party', 'sim_luxury_flex']

for feature in key_features:
    run_recession_regression(feature)

In [None]:
# 8. Train Recession-Era Classifier

# Define Labels
# 1 for 2008-2010, 0 for surrounding years (2003-2007 and 2011-2015)
# We filter the dataset to this window for training to avoid noise from very old or very new songs
train_window_df = df[(df['year'] >= 2003) & (df['year'] <= 2015)].copy()
train_window_df['label'] = train_window_df['year'].apply(lambda x: 1 if 2008 <= x <= 2010 else 0)

print(f"Training set size: {len(train_window_df)}")
print("Class distribution:", train_window_df['label'].value_counts())

# Select Features
features = [
    'valence_transformer', 
    'emotion_sadness', 'emotion_fear', 'emotion_joy',
    'freq_economic', 'freq_self', 'freq_social', 'freq_negation', 'ttr', 'avg_word_len',
    'sim_econ_anxiety', 'sim_resilience', 'sim_escapist_party', 'sim_luxury_flex'
]

X = train_window_df[features]
y = train_window_df['label']

# Standardize Features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train Logistic Regression
log_reg = LogisticRegression(class_weight='balanced', max_iter=1000)
scores = cross_val_score(log_reg, X_scaled, y, cv=5, scoring='roc_auc')

print(f"Cross-Validation ROC-AUC: {scores.mean():.3f} (+/- {scores.std():.3f})")

# Fit on full training window
log_reg.fit(X_scaled, y)

# Inspect Coefficients
coef_df = pd.DataFrame({
    'Feature': features,
    'Coefficient': log_reg.coef_[0]
}).sort_values(by='Coefficient', ascending=False)

print("\n--- Feature Importance (Logistic Regression Coefficients) ---")
print(coef_df)

# Visualize Coefficients
plt.figure(figsize=(10, 6))
sns.barplot(data=coef_df, x='Coefficient', y='Feature', palette='viridis')
plt.title('Feature Importance for Detecting Recession-Era Songs')
plt.show()

In [None]:
# 9. Build and Monitor the Recession-Pop Index

# Apply to Full Dataset (1995-2023)
# We need to scale the full dataset features using the SAME scaler fitted on the training window
X_full = df[features]
X_full_scaled = scaler.transform(X_full)

# Predict Probabilities
df['recession_prob'] = log_reg.predict_proba(X_full_scaled)[:, 1]

# Create Index: Mean Probability per Year
recession_index = df.groupby('year')['recession_prob'].mean().reset_index()

# Plot Index
plt.figure(figsize=(14, 7))
sns.lineplot(data=recession_index, x='year', y='recession_prob', marker='o', linewidth=3, color='darkred')

# Highlight Recessions
# 2001 (Dot-com), 2008-2009 (Great Recession), 2020 (COVID)
plt.axvspan(2001, 2001.9, color='gray', alpha=0.2, label='Recession (NBER)')
plt.axvspan(2008, 2009.9, color='gray', alpha=0.2)
plt.axvspan(2020, 2020.5, color='gray', alpha=0.2)

plt.title('The "Recession Pop" Index (2000-2023)', fontsize=18)
plt.xlabel('Year', fontsize=14)
plt.ylabel('Recession Probability (Index)', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)

# Add smoothing
recession_index['smoothed'] = recession_index['recession_prob'].rolling(window=3, center=True).mean()
plt.plot(recession_index['year'], recession_index['smoothed'], linestyle='--', color='black', alpha=0.5, label='3-Year Moving Avg')

plt.show()

# Check recent years
recent = recession_index[recession_index['year'] >= 2018]
print("Recent Index Values:")
print(recent)