In [2]:
import os
import pandas as pd
import lyricsgenius
import time
import random
from dotenv import load_dotenv


# -----------------------------
# CONFIGURATION
# -----------------------------

# Load environment variables from .env file (using absolute path for reliability)
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath('__file__'))), '.env')
load_dotenv(env_path, override=True, encoding='utf-8')

# 1) Genius API Token
GENIUS_API_TOKEN = os.getenv("GENIUS_API_TOKEN")
if not GENIUS_API_TOKEN:
    raise ValueError("GENIUS_API_TOKEN not found in .env file")

# 2) Path to your artist list (one artist name per line)
ARTIST_LIST_PATH = "artists.txt"

# 3) Output CSV (where we'll append results as we go)
OUTPUT_CSV = "scraped_lyrics.csv"


# 4) How many songs to fetch per artist
SONGS_PER_ARTIST = int(os.getenv("SONGS_PER_ARTIST", "25"))
print(f"Will fetch up to {SONGS_PER_ARTIST} songs per artist")

# 5) Pause (seconds) between artist requests to avoid rate-limiting
SLEEP_BETWEEN_ARTISTS = float(os.getenv("SLEEP_BETWEEN_ARTISTS", "1.5"))
print(f"Will sleep {SLEEP_BETWEEN_ARTISTS} seconds between artist requests")

# 6) Rate limit handling configuration
INITIAL_BACKOFF = int(os.getenv("INITIAL_BACKOFF", 10))  # Start with 10 seconds
MAX_RETRIES = int(os.getenv("MAX_RETRIES", 5))       # Try up to 5 times

print(f"Using initial backoff of {INITIAL_BACKOFF}s with {MAX_RETRIES} max retries")
# -----------------------------
# INITIALIZE GENIUS CLIENT
# -----------------------------

# Initialize lyricsgenius.Genius with some options
genius = lyricsgenius.Genius(
    GENIUS_API_TOKEN,
    timeout=15,
    retries=3,
    sleep_time=0.25,  # small pause between each page scrape
    excluded_terms=["(Remix)", "(Live)"],  # Exclude these terms from song titles
    skip_non_songs=True,  # Skip non-song entries (e.g., interviews)
)


# -----------------------------
# RATE LIMIT HANDLER
# -----------------------------
def with_rate_limit_handling(api_function):
    """Decorator to handle rate limit errors with exponential backoff"""
    def wrapper(*args, **kwargs):
        for attempt in range(MAX_RETRIES + 1):
            try:
                return api_function(*args, **kwargs)
            except Exception as e:
                error_str = str(e)
                # Check if it's a rate limit error
                if "429" in error_str and attempt < MAX_RETRIES:
                    # Calculate backoff time with jitter
                    backoff_time = INITIAL_BACKOFF * (2 ** attempt) + random.uniform(1, 5)
                    print(f"\nRate limit exceeded. Waiting {backoff_time:.1f} seconds before retry {attempt+1}/{MAX_RETRIES}")
                    time.sleep(backoff_time)
                else:
                    if "429" in error_str:
                        print(f"\nRate limit exceeded after {MAX_RETRIES} retries. Consider increasing wait time.")
                    raise
    return wrapper



# -----------------------------
# HELPER FUNCTION: fetch_artist_lyrics
# -----------------------------
@with_rate_limit_handling
def search_artist(artist_name, max_songs):
    """Search for an artist with rate limit handling"""
    return genius.search_artist(artist_name, max_songs=max_songs, sort="popularity")

@with_rate_limit_handling
def search_song(title, artist):
    """Search for a song with rate limit handling"""
    return genius.search_song(title=title, artist=artist)


# Add this function after your imports and before the GENIUS CLIENT section


def fetch_artist_lyrics(artist_name, max_songs=SONGS_PER_ARTIST):
    """
    Fetch up to max_songs tracks for `artist_name`, returning a list of dicts
    """
    songs_data = []
    try:
        # Search for the artist with rate limit handling
        artist_obj = search_artist(artist_name, max_songs)
        
        if artist_obj is None or not artist_obj.songs:
            print(f"  → No songs found for artist: {artist_name}")
            return songs_data

        for song in artist_obj.songs:
            title = song.title.strip()
            lyrics = song.lyrics.strip()
            
            # Skip extremely short lyrics (e.g., < 20 chars)
            if len(lyrics) < 20:
                continue
            songs_data.append({
                "artist": artist_name,
                "song_title": title,
                "lyrics": lyrics
            })
            
    except Exception as e:
        print(f"ERROR: Could not search for artist [{artist_name}]: {e}")
        
    return songs_data

def main():
    # 1) Read existing CSV (if any), so we don't re‐scrape duplicates
    if os.path.exists(OUTPUT_CSV):
        master_df = pd.read_csv(OUTPUT_CSV, encoding='utf-8')
        # master_df = safe_read_csv(OUTPUT_CSV)
        # Create a set of (artist, song_title) for quick "already scraped" checks
        existing_pairs = set(zip(master_df["artist"], master_df["song_title"]))
        
        # Check which artists have already met their quota
        artist_song_counts = master_df.groupby('artist').size()
        complete_artists = set(artist_song_counts[artist_song_counts >= SONGS_PER_ARTIST].index)
        incomplete_artists = set(artist_song_counts[artist_song_counts < SONGS_PER_ARTIST].index)
        
        print(f"Loaded {len(master_df)} existing rows from {OUTPUT_CSV}")
        print(f"Complete artists (>= {SONGS_PER_ARTIST} songs): {len(complete_artists)}")
        print(f"Incomplete artists (< {SONGS_PER_ARTIST} songs): {len(incomplete_artists)}")
    else:
        master_df = pd.DataFrame(columns=["artist", "song_title", "lyrics"])
        existing_pairs = set()
        complete_artists = set()
        incomplete_artists = set()
        print(f"No existing CSV found. A new one will be created: {OUTPUT_CSV}")

    # 2) Read artist list
    with open(ARTIST_LIST_PATH, "r", encoding="utf-8") as f:
        artists = [line.strip() for line in f if line.strip()]
    print(f"Read {len(artists)} artists from {ARTIST_LIST_PATH}")

    # 3) Filter artists: skip complete ones, include incomplete and new ones
    artists_to_scrape = [artist for artist in artists if artist not in complete_artists]
    skipped_count = len(artists) - len(artists_to_scrape)
    
    print(f"Will scrape {len(artists_to_scrape)} artists (skipping {skipped_count} completed artists)")
    if incomplete_artists:
        print(f"Resuming scraping for {len(incomplete_artists)} incomplete artists")

    # 4) Loop over each artist that needs scraping
    for idx, artist_name in enumerate(artists_to_scrape, 1):
        # Check if this is a resume case
        if artist_name in incomplete_artists:
            current_count = len([pair for pair in existing_pairs if pair[0] == artist_name])
            remaining_needed = SONGS_PER_ARTIST - current_count
            print(f"[{idx}/{len(artists_to_scrape)}] Resuming artist: {artist_name} (has {current_count}, needs {remaining_needed} more) ", end="")
        else:
            print(f"[{idx}/{len(artists_to_scrape)}] Scraping new artist: {artist_name} ", end="")
        
        fetched = fetch_artist_lyrics(artist_name, max_songs=SONGS_PER_ARTIST)

        # Filter out any (artist, song) pairs we already have
        new_rows = []
        for item in fetched:
            key = (item["artist"], item["song_title"])
            if key in existing_pairs:
                continue
            new_rows.append(item)
            existing_pairs.add(key)

        # 5) Append new_rows to master_df (and save immediately)
        if new_rows:
            new_df = pd.DataFrame(new_rows)
            master_df = pd.concat([master_df, new_df], ignore_index=True)

            # Sort by artist for better organization
            master_df = master_df.sort_values(['artist', 'song_title']).reset_index(drop=True)

            # Save after each artist to avoid data loss if script crashes
            master_df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')
            print(f"→ Retrieved {len(new_rows)} new songs (total now {len(master_df)})")
        else:
            print("→ No new songs found or all songs already exist.")

        # 6) Sleep to avoid hitting rate limits
        time.sleep(SLEEP_BETWEEN_ARTISTS)

    # Final sorting and statistics
    master_df = master_df.sort_values(['artist', 'song_title']).reset_index(drop=True)
    master_df.to_csv(OUTPUT_CSV, index=False, encoding='utf-8')

    print("\nScraping complete.")
    print(f"Final row count: {len(master_df)}")
    print(f"Distinct artists in CSV: {master_df['artist'].nunique()}")
    print(f"Distinct songs in CSV: {master_df['song_title'].nunique()}")
    
    # Show final artist statistics
    final_artist_counts = master_df.groupby('artist').size().sort_values(ascending=False)
    print(f"\nTop 10 artists by song count:")
    print(final_artist_counts.head(10))
    
    # Show artists that still need more songs
    incomplete_final = final_artist_counts[final_artist_counts < SONGS_PER_ARTIST]
    if len(incomplete_final) > 0:
        print(f"\nArtists still needing more songs ({len(incomplete_final)} total):")
        print(incomplete_final.head(10))

if __name__ == "__main__":
    main()

Will fetch up to 10 songs per artist
Will sleep 0.25 seconds between artist requests
Using initial backoff of 5s with 5 max retries
Loaded 3000 existing rows from scraped_lyrics.csv
Complete artists (>= 10 songs): 300
Incomplete artists (< 10 songs): 0
Read 300 artists from artists.txt
Will scrape 0 artists (skipping 300 completed artists)

Scraping complete.
Final row count: 3000
Distinct artists in CSV: 300
Distinct songs in CSV: 2851

Top 10 artists by song count:
artist
21 Savage          10
Paramore           10
P!nk               10
Otis Redding       10
One Direction      10
Oasis              10
Norah Jones        10
Nirvana            10
Nine Inch Nails    10
Nicki Minaj        10
dtype: int64


In [3]:

import pandas as pd
from dotenv import load_dotenv
import re
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer


# Download required NLTK data
try:
    nltk.download('stopwords', quiet=True)
    nltk.download('punkt', quiet=True)
    nltk.download('wordnet', quiet=True)
    nltk.download('averaged_perceptron_tagger', quiet=True)
except:
    print("NLTK downloads may have failed - some preprocessing features might not work")


# -----------------------------
# CLEANING AND PREPROCESSING FUNCTIONS
# -----------------------------

def clean_genius_metadata(lyrics_text):
    """Remove Genius website metadata and formatting from scraped lyrics"""
    if not lyrics_text or pd.isna(lyrics_text):
        return ""
    
    # Convert to string if not already
    lyrics_text = str(lyrics_text)
    
    # Find the first verse/chorus/structure marker and remove everything before it
    structure_patterns = [
        r'\[Verse\s*\d*\]',      # [Verse], [Verse 1], [Verse 2], etc.
        r'\[Chorus\]',           # [Chorus]
        r'\[Intro\]',            # [Intro]
        r'\[Pre-Chorus\]',       # [Pre-Chorus]
        r'\[Bridge\]',           # [Bridge]
        r'\[Outro\]',            # [Outro]
        r'\[Refrain\]',          # [Refrain]
        r'\[Hook\]',             # [Hook]
        r'\[Part\s*\d*\]',       # [Part], [Part 1], etc.
        r'\[Interlude\]',        # [Interlude]
        r'Lyrics:',              # "Lyrics:" marker
    ]
    
    # Find the earliest occurrence of any structure marker
    earliest_match = None
    earliest_pos = len(lyrics_text)
    
    for pattern in structure_patterns:
        match = re.search(pattern, lyrics_text, re.IGNORECASE)
        if match and match.start() < earliest_pos:
            earliest_pos = match.start()
            earliest_match = match
    
    # If we found a structure marker, start from there
    if earliest_match:
        lyrics_text = lyrics_text[earliest_pos:]
    
    # Remove any remaining metadata patterns that might still be present
    patterns_to_remove = [
        r'\d+\s*Contributors.*?(?=\[|$)',  # "109 Contributors..."
        r'Translations.*?(?=\[|$)',  # "Translations..." 
        r'.*?Lyrics".*?describes.*?(?=\[|$)',  # Song description text
        r'.*?is a.*?(?=\[|$)',  # "This song is a..."
        r'Read More.*?(?=\[|$)',  # "Read More" links
        r'See .*? Live.*?(?=\[|$)',  # Concert information
        r'Get tickets.*?(?=\[|$)',  # Ticket links
        r'You might also like.*?(?=\[|$)',  # Recommendations
        r'Embed$',  # "Embed" at end
        r'^\d+Embed',  # Numbers + "Embed"
        r'Produced by.*?$',  # Producer credits
        r'\[Produced by.*?\]',  # Producer credits in brackets
        r'^".*?" is.*?(?=\[|$)',  # Song title descriptions like '"Formation" is a Black Power anthem'
        r'^".*?" sees.*?(?=\[|$)',  # '"Fortnight" sees Taylor...'
        r'^".*?" presents.*?(?=\[|$)',  # '"Sorry" presents itself...'
        r'^Track.*?(?=\[|$)',  # Track descriptions
        r'^One of.*?(?=\[|$)',  # "One of Bey's most..."
        r'^Accompanied by.*?(?=\[|$)',  # "Accompanied by the release..."
        r'^Widely regarded.*?(?=\[|$)',  # "Widely regarded as..."
        r'^The title track.*?(?=\[|$)',  # "The title track of..."
    ]
    
    cleaned_lyrics = lyrics_text
    for pattern in patterns_to_remove:
        cleaned_lyrics = re.sub(pattern, '', cleaned_lyrics, flags=re.IGNORECASE | re.DOTALL)
    
    # Additional cleanup: remove any text before the first bracket that might be leftover description
    lines = cleaned_lyrics.split('\n')
    start_idx = 0
    for i, line in enumerate(lines):
        # Look for lines that start with brackets (song structure)
        if re.match(r'^\s*\[.*?\]', line.strip()):
            start_idx = i
            break
        # Or look for lines that seem to be actual lyrics (not descriptions)
        elif len(line.strip()) > 0 and not any(desc_word in line.lower() for desc_word in 
                                              ['describes', 'is a', 'sees', 'presents', 'track', 'song']):
            start_idx = i
            break
    
    if start_idx > 0:
        cleaned_lyrics = '\n'.join(lines[start_idx:])
    
    # Clean up song structure markers (keep them but normalize format)
    cleaned_lyrics = re.sub(r'\[([^\]]+)\]', r'[\1]', cleaned_lyrics)
    
    # Remove extra whitespace and empty lines
    cleaned_lyrics = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_lyrics)
    cleaned_lyrics = re.sub(r'^\s+|\s+$', '', cleaned_lyrics, flags=re.MULTILINE)
    cleaned_lyrics = cleaned_lyrics.strip()
    
    return cleaned_lyrics
def preprocess_lyrics_for_topic_modeling(lyrics_text):
    """
    Preprocess lyrics text for topic modeling by removing song structure
    and normalizing text (keeping natural expressions)
    """
    if not lyrics_text or pd.isna(lyrics_text):
        return ""
    
    # Convert to lowercase
    text = lyrics_text.lower()
    
    # Remove song structure markers [Verse 1], [Chorus], etc.
    text = re.sub(r'\[.*?\]', '', text)
    
    # Remove parentheses but keep the content inside
    text = re.sub(r'[()]', '', text)
    
    # Remove punctuation except apostrophes (to keep contractions)
    text = re.sub(r'[^\w\s\']', ' ', text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

def advanced_text_preprocessing(text):
    """
    Advanced preprocessing: tokenization, stopword removal, lemmatization
    """
    if not text:
        return []
    
    try:
        # Tokenize
        tokens = word_tokenize(text)
        
        # Get English stopwords and add music-specific common words
        stop_words = set(stopwords.words('english'))
        music_stopwords = {
            'verse', 'chorus', 'bridge', 'outro', 'intro', 'refrain'
        }
        stop_words.update(music_stopwords)
        
        # Remove stopwords, short words, and non-alphabetic tokens
        tokens = [
            token for token in tokens 
            if token.lower() not in stop_words 
            and len(token) > 2 
            and token.isalpha()
        ]
        
        # Lemmatization
        lemmatizer = WordNetLemmatizer()
        tokens = [lemmatizer.lemmatize(token) for token in tokens]
        
        return tokens
    except:
        # Fallback if NLTK fails
        words = text.split()
        return [word for word in words if len(word) > 2 and word.isalpha()]

def clean_lyrics_dataset(csv_path, output_path=None):
    """Clean an entire CSV dataset of scraped lyrics"""
    if output_path is None:
        output_path = csv_path.replace('.csv', '_cleaned.csv')
    
    print(f"Loading lyrics dataset from {csv_path}...")
    df = pd.read_csv(csv_path, encoding='utf-8')
    
    print(f"Original dataset: {len(df)} rows")
    
    # Clean lyrics column
    print("Cleaning lyrics...")
    df['lyrics_cleaned'] = df['lyrics'].apply(clean_genius_metadata)
    
    # Remove rows with very short lyrics after cleaning
    original_count = len(df)
    df = df[df['lyrics_cleaned'].str.len() >= 50]  # At least 50 characters
    print(f"Removed {original_count - len(df)} rows with insufficient lyrics after cleaning")
    
    # Replace original lyrics with cleaned version
    df['lyrics'] = df['lyrics_cleaned']
    df = df.drop('lyrics_cleaned', axis=1)
    
    # Sort and reset index
    df = df.sort_values(['artist', 'song_title']).reset_index(drop=True)
    
    # Save cleaned dataset
    df.to_csv(output_path, index=False, encoding='utf-8')
    
    print(f"Cleaned dataset saved to {output_path}")
    print(f"Final dataset: {len(df)} rows")
    print(f"Artists: {df['artist'].nunique()}")
    print(f"Songs: {df['song_title'].nunique()}")
    
    return df

def clean_and_preprocess_dataset(csv_path, output_path=None):
    """
    Complete pipeline to clean and preprocess lyrics dataset for topic modeling
    """
    if output_path is None:
        output_path = csv_path.replace('.csv', '_topic_modeling_ready.csv')
    
    print(f"Loading dataset from {csv_path}...")
    df = pd.read_csv(csv_path, encoding='utf-8')
    
    print(f"Original dataset: {len(df)} rows")
    
    # Step 1: Clean metadata
    print("Cleaning Genius metadata...")
    df['lyrics_clean'] = df['lyrics'].apply(clean_genius_metadata)
    
    # Step 2: Preprocess for topic modeling
    print("Preprocessing for topic modeling...")
    df['lyrics_processed'] = df['lyrics_clean'].apply(preprocess_lyrics_for_topic_modeling)
    
    # Step 3: Advanced tokenization and cleaning
    print("Tokenizing and removing stopwords...")
    df['lyrics_tokens'] = df['lyrics_processed'].apply(advanced_text_preprocessing)
    
    # Step 4: Create final processed text (rejoined tokens)
    df['lyrics_final'] = df['lyrics_tokens'].apply(lambda x: ' '.join(x))
    
    # Remove rows with very short processed text
    original_count = len(df)
    df = df[df['lyrics_final'].str.len() >= 50]  # At least 50 characters
    print(f"Removed {original_count - len(df)} rows with insufficient content after processing")
    
    # Keep only necessary columns
    df_final = df[['artist', 'song_title', 'lyrics_final']].copy()
    df_final.rename(columns={'lyrics_final': 'lyrics'}, inplace=True)
    
    # Sort and reset index
    df_final = df_final.sort_values(['artist', 'song_title']).reset_index(drop=True)
    
    # Save processed dataset
    df_final.to_csv(output_path, index=False, encoding='utf-8')
    
    print(f"Processed dataset saved to {output_path}")
    print(f"Final dataset: {len(df_final)} rows")
    print(f"Artists: {df_final['artist'].nunique()}")
    print(f"Average words per song: {df_final['lyrics'].str.split().str.len().mean():.1f}")
    
    return df_final

def preview_cleaning(csv_path, num_samples=3):
    """Preview what the cleaning function will do"""
    df = pd.read_csv(csv_path, encoding='utf-8')
    
    print("=== CLEANING PREVIEW ===")
    
    for i in range(min(num_samples, len(df))):
        row = df.iloc[i]
        original = row['lyrics']
        cleaned = clean_genius_metadata(original)
        
        print(f"\n--- Sample {i+1}: {row['artist']} - {row['song_title']} ---")
        print(f"Original length: {len(original)} characters")
        print(f"Cleaned length: {len(cleaned)} characters")
        print(f"Reduction: {len(original) - len(cleaned)} characters")
        
        print(f"\nOriginal first 200 chars:")
        print(f"'{original[:200]}...'")
        
        print(f"\nCleaned first 200 chars:")
        print(f"'{cleaned[:200]}...'")
        
        print("-" * 60)
        

preview_cleaning(OUTPUT_CSV, num_samples=3)  # Preview cleaning
cleaned_df = clean_lyrics_dataset(OUTPUT_CSV)  # Clean dataset
processed_df = clean_and_preprocess_dataset(OUTPUT_CSV) 
        

=== CLEANING PREVIEW ===

--- Sample 1: 21 Savage - Bank Account ---
Original length: 4090 characters
Cleaned length: 2553 characters
Reduction: 1537 characters

Original first 200 chars:
'276 ContributorsTranslationsFrançaisРусский (Russian)PortuguêsBank Account Lyrics“Bank Account” sees 21 Savage rapping about his wealth amongst other things over a dark beat produced by Metro Boomin a...'

Cleaned first 200 chars:
'[Chorus]
I got 1, 2, 3, 4, 5, 6, 7, 8 M's in my bank account, yeah (On God)
In my bank account, yeah (On God)
In my bank account, yeah (On God)
In my bank account, yeah (On God)
In my bank account, ye...'
------------------------------------------------------------

--- Sample 2: 21 Savage - Ghostface Killers ---
Original length: 5118 characters
Cleaned length: 4795 characters
Reduction: 323 characters

Original first 200 chars:

Cleaned first 200 chars:
'[Intro: Young Thug & Offset]
Metro Boomin want some more, nigga (Hey)[Chorus: Offset]
Automatic (Auto), automatics (Yea