<h4>Note:</h4>
<p>This code section is the cleaned summary of the codes we used for the data collection.<br>
Because the running time for Billboard was ~1h, and for Genius it was between 12-14hs, we didn't rerun the code and that's why there are no outputs.</p>

In [None]:
### Imports
from unidecode import unidecode
import lyricsgenius
import os, re, time, requests, pandas as pd
from bs4 import BeautifulSoup
from dateutil.rrule import rrule, WEEKLY
from datetime import datetime
from tqdm import tqdm
from unidecode import unidecode
from pathlib import Path

<h3>Billboard Top100 data scraping from 2000 until 2025 (09.08).</h3>

In [None]:
NUM = re.compile(r"\d+")
def to_int(s):
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return None
    s = str(s).strip()
    if s == "--":
        return None
    m = NUM.search(s)
    return int(m.group()) if m else None


### Helper function for metric lables like - LW, Peak and Weeks
def metric_by_label(row, label):
    lab = row.find("span", string=lambda t: isinstance(t, str) and label in t.upper())
    if not lab:
        return None

    li = lab.find_next(lambda tag: tag.name == "li" and "o-chart-results-list__item" in (tag.get("class") or []))
    if li:
        val_span = li.find("span", class_=lambda c: c and "c-label" in c)
        return to_int(val_span.get_text(strip=True)) if val_span else to_int(li.get_text(" ", strip=True))

    val_span = lab.find_next(lambda tag: tag.name == "span" and "c-label" in (tag.get("class") or []))
    return to_int(val_span.get_text(strip=True)) if val_span else None


### Scrape data for one week
def scrape_week(date_str=None):
    url = f"https://www.billboard.com/charts/hot-100/{date_str}/" if date_str else "https://www.billboard.com/charts/hot-100/"
    res = requests.get(url, headers={"User-Agent":"Mozilla/5.0"}, timeout=30)
    res.raise_for_status()
    soup = BeautifulSoup(res.text, "html.parser")

    rows = []
    for e in soup.find_all(attrs={'class': 'o-chart-results-list-row-container'}):
        # title & artist
        title_el = e.h3
        if not title_el:
            continue
        title  = title_el.get_text(strip=True)
        artist = title_el.find_next('span').get_text(strip=True)

        # current rank
        rank_el = e.select_one("span.c-label.a-font-primary-bold-l") or e.find('span', class_='c-label')
        rank = to_int(rank_el.get_text(strip=True)) if rank_el else None

        # last week rank, peak and number of week in top100 (based on labels)
        lw    = metric_by_label(e, "LW")
        peak  = metric_by_label(e, "PEAK")
        weeks = metric_by_label(e, "WEEKS")

        if rank is not None:
            rows.append({
                "chart_date": date_str,
                "rank": rank,
                "title": title,
                "artist": artist,
                "last_week": lw,
                "peak": peak,
                "weeks_on_chart": weeks
            })
    return rows


### Scrape for multiple weeks - base
def weekly_dates(start, end):
    return [d.strftime("%Y-%m-%d")
            for d in rrule(WEEKLY,
                           dtstart=datetime.fromisoformat(start),
                           until=datetime.fromisoformat(end))]


### Scrape for given range
def scrape_range(start="", end="", pause=0.12):
    all_rows = []
    for ds in tqdm(weekly_dates(start, end), desc="Weeks"):
        try:
            all_rows.extend(scrape_week(ds))
        except Exception as e:
            print(f"Error: {ds}: {e}")
        time.sleep(pause)
    df = pd.DataFrame(all_rows)
    if not df.empty:
        for col in ["rank", "last_week", "peak", "weeks_on_chart"]:
            df[col] = df[col].map(to_int)
        df.drop_duplicates(subset=["chart_date","rank","title","artist"], inplace=True)
        df.sort_values(["chart_date","rank"], inplace=True)
    return df


### Run scraper
df = scrape_range("2000-01-08", "2025-08-09")
print("Number of weeks:", df["chart_date"].nunique(), " - Number of rows:", len(df))
print(df)

In [None]:
### Setting column last_week to integer datatype
df['last_week'] = pd.to_numeric(df['last_week'], errors='coerce').astype('Int64')
df.to_json("hot100_2000_2025_raw.json", index=False)
df

In [None]:
### Create new dataset with the unique artist-title pairs (for Genius API)
unique_songs_df = df[["title", "artist"]].drop_duplicates().reset_index(drop=True)
unique_songs_df

<h3>Cleaning dataset</h3>

In [None]:
### Normalize artist names
def clean_artist_name(name):
    # space around &
    name = re.sub(r'\s*&\s*', ' & ', name)
    # space around "and"
    name = re.sub(r'\s*(and)\s*', r' and ', name, flags=re.IGNORECASE)
    # space around "Featuring"
    name = re.sub(r'\s*(featuring)\s*', r' Featuring ', name, flags=re.IGNORECASE)
    # space around "Feat"
    name = re.sub(r'\s*(feat\.)\s*', r' Feat. ', name, flags=re.IGNORECASE)
    # space around "Presents"
    name = re.sub(r'\s*(presents)\s*', r' Presents ', name, flags=re.IGNORECASE)
    # space around "With"
    name = re.sub(r'\s*(with)\s*', r' With ', name, flags=re.IGNORECASE)
    # space around ","
    name = re.sub(r'\s*,\s*', ', ', name)
    # remove extra spaces
    name = re.sub(r'\s+', ' ', name).strip()

    return name

unique_songs_df["artist"] = unique_songs_df["artist"].apply(clean_artist_name)

## later added adjustment based on the first results
unique_songs_df["artist"] = unique_songs_df["artist"].str.replace(r"Gr\s*and\s*e", "Grande", regex=True)

unique_songs_df

In [None]:
### Save dataset
# unique_songs_df.to_json("hot100_2000_2025_cleaned.json", index=False)

<h3>Genius API scraping</h3>

In [None]:
### Configuration
TOKEN = "Ro_egtJWqm49ss1KHS_N4VgC3izO6ED01OBnSBXcBX4EUewxDFsWC-YhCQ4-85U9"
HEAD_API  = {"Authorization": f"Bearer {TOKEN}"}
HEAD_HTML = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123 Safari/537.36"}

SEARCH_URL = "https://api.genius.com/search"
SONG_URL   = "https://api.genius.com/songs/{id}"

CHECKPOINT_FILE = "unique_songs_with_meta_progress.json"
SAVE_EVERY = 25
SLEEP_SEC  = 0.25
TEST_LIMIT = None

required_cols = ["title", "artist"]
for rc in required_cols:
    if rc not in unique_songs_df.columns:
        raise ValueError(f"Missing column in unique_songs_df: {rc}")

# Create the needed new columns for the data
for c in ["genius_id","language","primary_tag","tags","genius_url","release_date","album","lyrics"]:
    if c not in unique_songs_df.columns:
        unique_songs_df[c] = pd.NA


### SAFETY SAVING
# Continue the download from the check point (if there is already one)
if Path(CHECKPOINT_FILE).exists():
    try:
        _ckpt = pd.read_json(CHECKPOINT_FILE)
        cols = ["title","artist","genius_id","language","primary_tag","tags","genius_url","release_date","album","lyrics"]
        _ckpt = _ckpt[[c for c in cols if c in _ckpt.columns]].drop_duplicates(subset=["title","artist"])
        unique_songs_df = unique_songs_df.merge(_ckpt, on=["title","artist"], how="left", suffixes=("", "_ckpt"))

        for c in ["genius_id","language","primary_tag","tags","genius_url","release_date","album","lyrics"]:
            ck = f"{c}_ckpt"
            if ck in unique_songs_df.columns:
                mask = unique_songs_df[c].isna() & unique_songs_df[ck].notna()
                unique_songs_df.loc[mask, c] = unique_songs_df.loc[mask, ck]
                unique_songs_df.drop(columns=[ck], inplace=True, errors="ignore")
        print(f"Continue from the checkpoint: {_ckpt.shape[0]} rows are read.")
    except Exception as e:
        print(f"Checkpoint reading error: {e}. Starting from the beginning.")


### Helper functions
# Search for the songs
def genius_search_with_hit(title: str, artist: str):
    q = f"{title} {artist}"
    try:
        r = requests.get(SEARCH_URL, headers=HEAD_API, params={"q": q}, timeout=20)
        if not r.ok:
            return None, None
        hits = (r.json().get("response", {}) or {}).get("hits", []) or []
        if not hits:
            return None, None
    except Exception:
        return None, None

    t0 = unidecode(str(title)).strip().lower()
    a0 = unidecode(str(artist)).strip().lower()

    for h in hits:
        res = h.get("result", {}) or {}
        st = unidecode(res.get("title","")).strip().lower()
        sa = unidecode((res.get("primary_artist") or {}).get("name","")).strip().lower()
        if st == t0 and sa == a0:
            return res.get("id"), res

    res = (hits[0].get("result", {}) or {})
    return res.get("id"), res


# Search for the songs' meta data
def genius_song_meta(song_id: int):
    try:
        r = requests.get(SONG_URL.format(id=song_id), headers=HEAD_API, params={"text_format":"plain"}, timeout=20)
        if not r.ok:
            return None, None, None, None, None, None
        d = (r.json().get("response", {}) or {}).get("song", {}) or {}
    except Exception:
        return None, None, None, None, None, None

    lang = d.get("language")
    ptag = (d.get("primary_tag") or {}).get("name")
    tags = [t.get("name") for t in (d.get("tags") or []) if isinstance(t, dict) and t.get("name")] or None
    url  = d.get("url")
    release_date = d.get("release_date")
    album_name   = (d.get("album") or {}).get("name")

    return lang, ptag, tags, url, release_date, album_name


# Scrape songs' tags
def scrape_tags_from_html(url: str):
    try:
        html = requests.get(url, headers=HEAD_HTML, timeout=20).text
        soup = BeautifulSoup(html, "html.parser")
        tags = []
        for a in soup.find_all("a", href=True):
            href = a["href"]
            if "/tags/" in href:
                txt = a.get_text(strip=True)
                if txt and txt.lower() not in ("about", "lyrics"):
                    tags.append(txt)
        return list(dict.fromkeys(tags)) or None
    except Exception:
        return None


# Scrape songs' lyrics
def scrape_lyrics_from_html(url: str):
    try:
        html = requests.get(url, headers=HEAD_HTML, timeout=25).text
        soup = BeautifulSoup(html, "html.parser")

        old = soup.find("div", class_="lyrics")
        if old:
            text = old.get_text(separator="\n").strip()
            return text if text else None

        containers = soup.find_all("div", class_=lambda c: c and "Lyrics__Container" in c)
        if containers:
            parts = []
            for c in containers:
                txt = c.get_text(separator="\n", strip=True)
                if txt:
                    parts.append(txt)
            text = "\n".join(parts).strip()
            return text if text else None

        return None
    except Exception:
        return None


### Save safety save
def save_checkpoint(df_to_save: pd.DataFrame):
    cols = ["title","artist","genius_id","language","primary_tag","tags","genius_url","release_date","album","lyrics"]
    cols = [c for c in cols if c in df_to_save.columns]
    df_to_save[cols].to_json(CHECKPOINT_FILE, orient="records", force_ascii=False)
    print(f"Checkpoint is saved: {CHECKPOINT_FILE}")


# Sample size settings -> TEST_LIMIT
# Initially for testing the code on smaller sample size
need_mask = unique_songs_df[["primary_tag","tags"]].isna().any(axis=1)
need_idx = unique_songs_df[need_mask].index.tolist()
if TEST_LIMIT:
    need_idx = need_idx[:TEST_LIMIT]

print(f"Running on – {len(need_idx)} rows (genre/tags + release_date + album + lyrics).")


# Main cycle
for n, idx in enumerate(tqdm(need_idx, desc="Backfill…"), start=1):
    title  = unique_songs_df.at[idx, "title"]
    artist = unique_songs_df.at[idx, "artist"]

    # 1) Genius ID
    sid = unique_songs_df.at[idx, "genius_id"]
    hit_res = None
    if pd.isna(sid) or sid in (None, "", 0):
        sid, hit_res = genius_search_with_hit(title, artist)
        unique_songs_df.at[idx, "genius_id"] = sid
    else:
        try:
            sid = int(sid)
        except Exception:
            sid, hit_res = genius_search_with_hit(title, artist)
            unique_songs_df.at[idx, "genius_id"] = sid

    if not sid:
        if n % SAVE_EVERY == 0:
            save_checkpoint(unique_songs_df)
        time.sleep(SLEEP_SEC)
        continue

    # 2) /songs/:id meta
    lang = ptag = tags = url = rel = alb = None
    lang, ptag, tags, url, rel, alb = genius_song_meta(int(sid))

    if pd.isna(unique_songs_df.at[idx, "language"]) and lang is not None:
        unique_songs_df.at[idx, "language"] = lang
    if url and pd.isna(unique_songs_df.at[idx, "genius_url"]):
        unique_songs_df.at[idx, "genius_url"] = url
    if pd.isna(unique_songs_df.at[idx, "release_date"]) and rel is not None:
        unique_songs_df.at[idx, "release_date"] = rel
    if pd.isna(unique_songs_df.at[idx, "album"]) and alb is not None:
        unique_songs_df.at[idx, "album"] = alb

    # 3) search-hit fallback for the genre
    if (pd.isna(unique_songs_df.at[idx,"primary_tag"]) or unique_songs_df.at[idx,"primary_tag"] is None) and hit_res:
        pr = hit_res.get("primary_tag") or {}
        pr_name = pr.get("name")
        if pr_name:
            unique_songs_df.at[idx,"primary_tag"] = pr_name

        hit_tags = hit_res.get("tags")
        if (pd.isna(unique_songs_df.at[idx,"tags"]) or unique_songs_df.at[idx,"tags"] is None) and hit_tags and isinstance(hit_tags, list):
            unique_songs_df.at[idx,"tags"] = hit_tags

        if pd.isna(unique_songs_df.at[idx,"album"]):
            alb_hit = (hit_res.get("album") or {}).get("name") if isinstance(hit_res.get("album"), dict) else None
            if alb_hit:
                unique_songs_df.at[idx,"album"] = alb_hit

    # 4) HTML fallback for the genre (tags/primary_tag)
    use_url = url if url else (unique_songs_df.at[idx,"genius_url"] if pd.notna(unique_songs_df.at[idx,"genius_url"]) else None)
    if (pd.isna(unique_songs_df.at[idx,"tags"]) or unique_songs_df.at[idx,"tags"] is None) and use_url:
        html_tags = scrape_tags_from_html(use_url)
        if html_tags:
            unique_songs_df.at[idx,"tags"] = html_tags
        if (pd.isna(unique_songs_df.at[idx,"primary_tag"]) or unique_songs_df.at[idx,"primary_tag"] is None) and html_tags:
            unique_songs_df.at[idx,"primary_tag"] = html_tags[0]

    # 5) lyrics
    if (pd.isna(unique_songs_df.at[idx,"lyrics"]) or not isinstance(unique_songs_df.at[idx,"lyrics"], str) or not unique_songs_df.at[idx,"lyrics"]) and use_url:
        song_lyrics = scrape_lyrics_from_html(use_url)
        if song_lyrics:
            unique_songs_df.at[idx,"lyrics"] = song_lyrics

    # 6) Timer and checkpoint
    if n % SAVE_EVERY == 0:
        save_checkpoint(unique_songs_df)
    time.sleep(SLEEP_SEC)

# Final save
save_checkpoint(unique_songs_df)
print("Done: checkpoint is updated.")