# Comprehensive Song Lyrics Analysis

This notebook performs a multi-faceted analysis on song lyrics:
1.  **Unsupervised Topic Modeling**: LDA, LSA, NMF (Discovering hidden topics).
2.  **Guided Topic Analysis**: Dictionary-based classification (Tracking specific topics like War, Love).
3.  **Temporal Analysis**: How topics evolve over decades.
4.  **Sentiment Analysis**: Emotional tone of topics.
5.  **Visualization**: Word Clouds and Interactive Maps.

**Note**: This notebook is optimized to handle large datasets (9GB+) by using sampling for unsupervised learning and chunking for guided analysis.

In [None]:
import pandas as pd
import numpy as np
import gensim
from gensim.utils import simple_preprocess
import nltk
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
import spacy
from gensim.models import CoherenceModel
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud
import pyLDAvis
import pyLDAvis.gensim_models
from collections import Counter
import time

# Download NLTK resources
try:
    nltk.data.find('corpora/stopwords')
except LookupError:
    nltk.download('stopwords')

try:
    nltk.data.find('sentiment/vader_lexicon.zip')
except LookupError:
    nltk.download('vader_lexicon')

# Initialize Spacy
try:
    nlp = spacy.load('en_core_web_sm', disable=['parser', 'ner'])
except OSError:
    print("Spacy model 'en_core_web_sm' not found. Please run: python -m spacy download en_core_web_sm")
    raise

## 1. Configuration & Data Loading (Sample for Unsupervised Models)

For **LDA/LSA/NMF**, I use a sample. Training these models on 5 million documents is extremely slow and memory-intensive. 
A sample of **500,000** songs is statistically very robust and fits in 32GB RAM.

In [None]:
# --- CONFIGURATION ---
DATA_PATH = 'song_lyrics.csv'

# Recommended for 32GB RAM: 500,000. 
# If you want a quick test, use 10,000.
SAMPLE_SIZE = 500000 

print(f"Loading sample of {SAMPLE_SIZE} rows for Unsupervised Learning (LDA/LSA/NMF)...")
start_time = time.time()

try:
    # Reading only necessary columns to save memory
    df = pd.read_csv(DATA_PATH, usecols=['lyrics', 'language', 'title', 'artist', 'year'])
except ValueError:
    df = pd.read_csv(DATA_PATH)

print(f"File loaded in {time.time() - start_time:.2f} seconds. Filtering data...")

# Filter for English
if 'language' in df.columns:
    df = df[df['language'] == 'en']

# Drop missing
df = df.dropna(subset=['lyrics', 'year'])

# Clean Year
df['year'] = pd.to_numeric(df['year'], errors='coerce')
df = df.dropna(subset=['year'])
df['year'] = df['year'].astype(int)
df = df[(df['year'] >= 1950) & (df['year'] <= 2024)]

# Sample
if SAMPLE_SIZE and len(df) > SAMPLE_SIZE:
    df_sample = df.sample(SAMPLE_SIZE, random_state=42)
    print(f"Created sample of {len(df_sample)} rows.")
else:
    df_sample = df
    print(f"Using full filtered dataset: {len(df_sample)} rows.")

df_sample['decade'] = (df_sample['year'] // 10) * 10
data_sample = df_sample['lyrics'].values.tolist()

In [None]:
# Preprocessing Functions
stop_words = stopwords.words('english')

lyric_stopwords = [
    'song', 'lyrics', 'chorus', 'verse', 'hook', 'bridge', 'intro', 'outro', 'repeat', 
    'la', 'da', 'na', 'ooh', 'ahh', 'oh', 'yeah', 'hey', 'whoa', 'hmm', 'baby', 'mh', 'uh', 'huh', 'wow', 'boom', 'bang', 'doo', 'ah', 'ha',
    'wanna', 'gonna', 'gotta', 'cause', 'em', 'till', 'ain', 'bout', 
    'like', 'know', 'got', 'get', 'go', 'come', 'say', 'tell', 'think', 'see', 'look', 'take', 'make', 'want', 'need', 'feel',
    'way', 'time', 'thing', 'man', 'girl', 'boy', 'woman', 'day', 'night', 'life', 'world', 'heart', 'love', 
    'let', 'put', 'back', 'right', 'never', 'ever', 'one', 'two', 'three', 'yeah', 'uh', 'yo'
]
keywords_to_keep = ['love', 'life', 'world', 'heart', 'war', 'god', 'money', 'fight', 'die', 'party']
lyric_stopwords = [w for w in lyric_stopwords if w not in keywords_to_keep]
stop_words.extend(lyric_stopwords)

def preprocess_data(data):
    total = len(data)
    print(f"Starting preprocessing for {total} documents...")
    
    print("1/4 Tokenizing...")
    data_words = list(sent_to_words(data))
    
    print("2/4 Building Bigrams...")
    bigram = gensim.models.Phrases(data_words, min_count=5, threshold=50)
    bigram_mod = gensim.models.phrases.Phraser(bigram)
    
    print("3/4 Removing Stopwords & Forming Bigrams...")
    data_words_nostops = [[word for word in simple_preprocess(str(doc)) if word not in stop_words] for doc in data_words]
    data_words_bigrams = [bigram_mod[doc] for doc in data_words_nostops]
    
    print("4/4 Lemmatizing (this is the slowest part)...")
    data_lemmatized = []
    for i, sent in enumerate(data_words_bigrams):
        if i % 5000 == 0:
            print(f"   Processed {i}/{total} documents...", end='\r')
        doc = nlp(" ".join(sent))
        data_lemmatized.append([token.lemma_ for token in doc if token.pos_ in ['NOUN', 'PROPN', 'ADJ']])
    print(f"\nPreprocessing complete.")
    return data_lemmatized

def sent_to_words(sentences):
    for i, sentence in enumerate(sentences):
        yield(gensim.utils.simple_preprocess(str(sentence), deacc=True))

start_time = time.time()
data_lemmatized = preprocess_data(data_sample)
print(f"Total preprocessing time: {time.time() - start_time:.2f} seconds.")

## 2. Dictionary and Corpus

Shared dictionary and corpus for all models.

In [None]:
print("Building Dictionary and Corpus...")
id2word = gensim.corpora.Dictionary(data_lemmatized)
id2word.filter_extremes(no_below=10, no_above=0.4)
corpus = [id2word.doc2bow(text) for text in data_lemmatized]

print(f"Unique tokens: {len(id2word)}")
print(f"Documents: {len(corpus)}")

## 3. Model Training & Comparison (Unsupervised)

Training LDA, LSA, and NMF to discover hidden topics.

In [None]:
import logging
# Enable logging for Gensim to show progress
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

num_topics = 10
models = {}
coherence_scores = {}

# --- 1. LDA ---
print("Training LDA Model (this may take a few minutes)...")
print("Check the logs below for per-pass progress.")
start_time = time.time()
lda_model = gensim.models.LdaMulticore(corpus=corpus, id2word=id2word, num_topics=num_topics, random_state=100, passes=10)
models['LDA'] = lda_model
print(f"LDA Training finished in {time.time() - start_time:.2f} seconds.")

# --- 2. LSA (LSI) ---
print("Training LSA Model...")
lsa_model = gensim.models.LsiModel(corpus=corpus, id2word=id2word, num_topics=num_topics)
models['LSA'] = lsa_model

# --- 3. NMF ---
print("Training NMF Model...")
nmf_model = gensim.models.Nmf(corpus=corpus, id2word=id2word, num_topics=num_topics, random_state=100)
models['NMF'] = nmf_model

## 4. Evaluation: Coherence Scores

Comparing the models.

In [None]:
print("Calculating Coherence Scores...")
for name, model in models.items():
    cm = CoherenceModel(model=model, texts=data_lemmatized, dictionary=id2word, coherence='c_v')
    coherence_scores[name] = cm.get_coherence()
    print(f"{name} Coherence Score: {coherence_scores[name]:.4f}")

plt.figure(figsize=(8, 5))
plt.bar(coherence_scores.keys(), coherence_scores.values(), color=['blue', 'orange', 'green'])
plt.title('Coherence Score Comparison')
plt.ylabel('Coherence Score (c_v)')
plt.show()

## 5. Temporal Analysis (LDA)

I will now use the LDA model to see how topics change over decades.

In [None]:
# 1. Get Topic Distribution for each document
print("Inferring topics for all documents...")

# Get dominant topic and probability for each doc
topic_data = []
for i, row in enumerate(lda_model[corpus]):
    row = sorted(row, key=lambda x: x[1], reverse=True)
    dominant_topic = row[0][0]
    dominant_prob = row[0][1]
    topic_data.append([dominant_topic, dominant_prob])

df_topics = pd.DataFrame(topic_data, columns=['Dominant_Topic', 'Topic_Prob'])
df_reset = df_sample.reset_index(drop=True)
df_final = pd.concat([df_reset, df_topics], axis=1)

# 2. Analyze Topic Trends by Decade
decade_topic_counts = df_final.groupby(['decade', 'Dominant_Topic']).size().reset_index(name='Count')
decade_counts = df_final.groupby('decade').size().reset_index(name='Total_Songs')
decade_topic_dist = pd.merge(decade_topic_counts, decade_counts, on='decade')
decade_topic_dist['Percentage'] = decade_topic_dist['Count'] / decade_topic_dist['Total_Songs']

# Get Topic Keywords
topic_labels = {}
for i in range(num_topics):
    words = lda_model.show_topic(i, topn=3)
    topic_labels[i] = f"T{i}: " + ", ".join([w[0] for w in words])

decade_topic_dist['Topic_Label'] = decade_topic_dist['Dominant_Topic'].map(topic_labels)

# Plot
plt.figure(figsize=(12, 8))
sns.lineplot(data=decade_topic_dist, x='decade', y='Percentage', hue='Topic_Label', marker='o')
plt.title('LDA Topic Trends Over Decades')
plt.ylabel('Percentage of Songs')
plt.xlabel('Decade')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.show()

## 6. Sentiment Analysis by Topic

I use VADER to calculate the sentiment of each song and then aggregate by topic.

In [None]:
print("Calculating sentiment scores (this may take a while)...")
sia = SentimentIntensityAnalyzer()

def get_sentiment_with_progress(text, i, total):
    if i % 5000 == 0:
        print(f"   Processed {i}/{total} songs...", end='\r')
    return sia.polarity_scores(str(text))['compound']

total_rows = len(df_final)
df_final['Sentiment'] = [get_sentiment_with_progress(txt, i, total_rows) for i, txt in enumerate(df_final['lyrics'])]
print("\nSentiment calculation complete.")

# Group by Topic and calculate mean sentiment
topic_sentiment = df_final.groupby('Dominant_Topic')['Sentiment'].mean().reset_index()
topic_sentiment['Topic_Label'] = topic_sentiment['Dominant_Topic'].map(topic_labels)

# Plot Average Sentiment
plt.figure(figsize=(12, 6))
sns.barplot(data=topic_sentiment, x='Topic_Label', y='Sentiment', palette='coolwarm')
plt.title('Average Sentiment by LDA Topic')
plt.xticks(rotation=45, ha='right')
plt.ylabel('Compound Sentiment Score (-1 to 1)')
plt.axhline(0, color='black', linestyle='--')
plt.show()

## 7. Word Clouds

Visualizing the top words for each topic.

In [None]:
from matplotlib import gridspec

cols = 3
rows = int(np.ceil(num_topics / cols))

plt.figure(figsize=(20, 10))
gs = gridspec.GridSpec(rows, cols)

for i in range(num_topics):
    plt.subplot(gs[i])
    topic_words = dict(lda_model.show_topic(i, 30))
    wc = WordCloud(background_color='white', max_words=30)
    wc.generate_from_frequencies(topic_words)
    plt.imshow(wc, interpolation='bilinear')
    plt.axis('off')
    plt.title(f'Topic {i}')

plt.tight_layout()
plt.show()

## 8. Interactive Visualization (pyLDAvis)

Interactive exploration of topics and their relationships.

In [None]:
pyLDAvis.enable_notebook()
vis = pyLDAvis.gensim_models.prepare(lda_model, corpus, id2word)
vis

## 9. Guided Topic Analysis (FULL DATASET - CHUNKING)

Here I analyze the **ENTIRE** dataset by reading it in chunks. This allows me to classify millions of songs without running out of RAM.
I define fixed topics and count how many songs fall into each category per decade.

In [None]:
# Define Fixed Topics
fixed_topics = {
    'Love/Romance': ['love', 'heart', 'baby', 'kiss', 'miss', 'hold', 'feel', 'darling', 'honey'],
    'War/Violence': ['war', 'fight', 'gun', 'kill', 'soldier', 'blood', 'army', 'battle', 'bomb', 'death', 'shoot'],
    'Party/Fun': ['party', 'dance', 'drink', 'night', 'fun', 'club', 'music', 'dj', 'celebrate', 'rock'],
    'Life/Existential': ['life', 'world', 'time', 'live', 'die', 'god', 'soul', 'mind', 'dream', 'truth'],
    'Money/Success': ['money', 'rich', 'cash', 'gold', 'fame', 'dollar', 'pay', 'diamond', 'star']
}

def classify_text_simple(text, topics):
    if not isinstance(text, str):
        return 'Other'
    text = text.lower()
    scores = {topic: 0 for topic in topics}
    for topic, keywords in topics.items():
        for word in keywords:
            if word in text:
                scores[topic] += 1
    max_topic = max(scores, key=scores.get)
    if scores[max_topic] == 0:
        return 'Other'
    return max_topic

# Initialize Counters
decade_topic_counts = Counter()
decade_total_counts = Counter()

CHUNK_SIZE = 100000
print("Starting Full Dataset Analysis (Chunked)...")

# Read CSV in chunks
try:
    chunk_iter = pd.read_csv(DATA_PATH, usecols=['lyrics', 'language', 'year'], chunksize=CHUNK_SIZE)
except ValueError:
    # Fallback if columns differ
    chunk_iter = pd.read_csv(DATA_PATH, chunksize=CHUNK_SIZE)

total_processed = 0
start_time = time.time()

for i, chunk in enumerate(chunk_iter):
    # Filter English
    if 'language' in chunk.columns:
        chunk = chunk[chunk['language'] == 'en'].copy()
    else:
        chunk = chunk.copy()
    
    # Clean Year
    chunk['year'] = pd.to_numeric(chunk['year'], errors='coerce')
    chunk = chunk.dropna(subset=['year', 'lyrics'])
    chunk['year'] = chunk['year'].astype(int)
    chunk = chunk[(chunk['year'] >= 1950) & (chunk['year'] <= 2024)]
    
    # Create Decade
    chunk['decade'] = (chunk['year'] // 10) * 10
    
    # Classify
    chunk['Guided_Topic'] = chunk['lyrics'].apply(lambda x: classify_text_simple(x, fixed_topics))
    
    # Update Counts
    for _, row in chunk.iterrows():
        decade = row['decade']
        topic = row['Guided_Topic']
        decade_total_counts[decade] += 1
        if topic != 'Other':
            decade_topic_counts[(decade, topic)] += 1
            
    total_processed += len(chunk)
    elapsed = time.time() - start_time
    print(f"Processed chunk {i+1} (Total rows: {total_processed}). Elapsed: {elapsed:.0f}s", end='\r')

print(f"\nFull Dataset Analysis Complete. Processed {total_processed} rows in {elapsed:.0f} seconds.")

In [None]:
# Visualize Guided Trends (Full Data)

# Convert counters to DataFrame
plot_data = []
for (decade, topic), count in decade_topic_counts.items():
    total = decade_total_counts[decade]
    if total > 0:
        percentage = count / total
        plot_data.append({'decade': decade, 'Guided_Topic': topic, 'Percentage': percentage})

df_plot = pd.DataFrame(plot_data).sort_values('decade')

plt.figure(figsize=(12, 8))
sns.lineplot(data=df_plot, x='decade', y='Percentage', hue='Guided_Topic', marker='o', linewidth=2.5)
plt.title('Guided Topic Trends Over Decades (FULL DATASET)')
plt.ylabel('Percentage of Songs')
plt.xlabel('Decade')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True)
plt.show()