In [1]:
from bs4 import BeautifulSoup
import requests
import os
from tqdm import tqdm
import re
import time
import json
import pandas as pd
from datetime import datetime, timedelta

In [2]:
from dotenv import load_dotenv

file_name = "../var.env"
load_dotenv(file_name)

True

In [3]:
all_entries = []

In [None]:
pdf_folder = "../pdfs"
filepaths = [os.path.join(pdf_folder, f) for f in os.listdir(pdf_folder) if f.endswith(".pdf")]
headers = {"User-Agent": "Chrome/114.0.0.0 Safari/537.36"}

for f in tqdm(filepaths):
        date = re.search(r"s_(.*?)\.", f).group(1)
        base_url = f"https://www.billboard.com/charts/hot-100/{date}"
        try:
                response = requests.get(url=base_url, headers=headers)
                response.raise_for_status()
        except requests.exceptions.RequestException as error:
                print(error)
        else:
                filler = ["Producer(s):", "Imprint/Promotion Label:", "Gains in Weekly Performance", "Additional Awards"]
                soup = BeautifulSoup(response.text, "html.parser")
                html_list = soup.find_all(name="ul", class_="o-chart-results-list-row")
                for item in html_list:
                        ranks = item.find_all(name="span", class_="c-label")
                        title = item.find(name="h3", class_="c-title")
                        if not title or len(ranks) < 3:
                                continue
                        
                        wrong_artists = ["NEW", "RE-ENTRY"]
                        song_name = title.getText().strip()
                        temp_artist = ranks[1].getText().strip().replace("\n", "")
                        artist = temp_artist if temp_artist not in wrong_artists else ranks[3].getText().strip()
                        ranks = [item.getText().strip() for item in ranks]
                        weeks_rank = int(ranks[0]) if re.match(r"\d+", ranks[0]) else None
                        last_weeks_rank = int(ranks[-3]) if re.match(r"\d+", ranks[-3]) else None
                        weeks_on_chart = int(ranks[-1]) if re.match(r"\d+", ranks[-1]) else None
                        all_entries.append(
                                {
                                        "date" : date,
                                        "song_name": song_name,
                                        "artist": artist,
                                        "weeks_rank": weeks_rank,
                                        "last_weeks_rank": last_weeks_rank,
                                        "weeks_on_chart": weeks_on_chart
                                }
                        )
        time.sleep(0.2)

In [None]:
pd.DataFrame(all_entries).to_csv("../data/raw/billboard_100_data.csv")

In [3]:
song_bank = pd.read_csv("../data/raw/billboard_100_data.csv")

In [None]:
def get_season(date):
    month = date.month
    if month in [12, 1, 2]:
        return "Winter"
    elif month in [3, 4, 5]:
        return "Spring"
    elif month in [6, 7, 8]:
        return "Summer"
    elif month in [9, 10, 11]:
        return "Autumn"

In [4]:
song_bank["date"] = pd.to_datetime(song_bank["date"])

In [None]:
song_bank["season"] = song_bank["date"].apply(get_season)

In [5]:
data = song_bank[song_bank["date"].dt.year > 1999]

In [None]:
data["song_name", "artist"].drop_duplicates(inplace=True)

In [6]:
all_entries = list(data.to_dict(orient="records"))

In [7]:
client_id = os.getenv("SPOTIFY_ID")
client_secret = os.getenv("SPOTIFY_SECRET")

In [8]:
import json

def load_cache(path="../data/raw/uri_cache.json"):
    if os.path.exists(path):
        with open(path, "r") as f:
            return json.load(f)
    return {}

def save_cache(cache, path="../data/raw/uri_cache.json"):
    with open(path, "w") as f:
        json.dump(cache, f)

In [9]:
import spotipy
from spotipy import SpotifyOAuth

sp = spotipy.Spotify(
        auth_manager=SpotifyOAuth(
            client_id=client_id,
            client_secret=client_secret,
            redirect_uri="https://github.com/Tunchiie",
            scope="playlist-modify-private playlist-modify-public",
        ),
        requests_timeout=30 
    )

In [None]:
def search_uri(batch):
    cache = load_cache()

    for song in tqdm(batch):
        
        key = f"{song["song_name"]} - {song["artist"]}"
        
        if key in cache:
            continue
        else:
            query = f'track:{song["song_name"]} artist:{song["artist"]}'
        
            try:
                result = sp.search(q=query, type="track", limit=1)["tracks"]["items"]
                time.sleep(0.2)
        
                if result:
                    cache[key] = result[0]["uri"]
            except spotipy.SpotifyException as error:
                retry_after = int(error.headers.get("Retry-After", 60))
                print(f"Rate limit hit. Retrying after {retry_after} seconds.")
                time.sleep(retry_after)
    save_cache(cache)

In [15]:
def create_playlist():
    me = sp.current_user()
    user_id = me["id"]
    
    cache = load_cache()
    playlist_count = 1
    batch = []
    
    for key, uri in tqdm(cache.items()):
        batch.append(uri)
        
        if len(batch) % 100 == 0:           
            playlist = sp.user_playlist_create(user=user_id, name=f"Billboard 100_{playlist_count}")
            sp.playlist_add_items(
                playlist_id=playlist["id"],
                items=batch,
            )
            playlist_count += 1
            batch = []
            time.sleep(1)

In [12]:
from more_itertools import chunked
books_key = os.getenv("BOOKS_KEY")

first_batch = False
batches = list(chunked(all_entries[85000:], 1000))
for batch in batches:
    search_uri(batch)
    time.sleep(10)
create_playlist()

  0%|          | 0/1000 [00:00<?, ?it/s]

100%|██████████| 1000/1000 [01:50<00:00,  9.08it/s]
100%|██████████| 1000/1000 [01:55<00:00,  8.63it/s]
100%|██████████| 1000/1000 [02:26<00:00,  6.81it/s]
100%|██████████| 1000/1000 [02:19<00:00,  7.16it/s]
100%|██████████| 1000/1000 [02:06<00:00,  7.93it/s]
100%|██████████| 1000/1000 [02:09<00:00,  7.73it/s]
100%|██████████| 1000/1000 [02:12<00:00,  7.55it/s]
100%|██████████| 1000/1000 [02:09<00:00,  7.75it/s]
100%|██████████| 1000/1000 [02:06<00:00,  7.89it/s]
100%|██████████| 1000/1000 [01:57<00:00,  8.50it/s]
100%|██████████| 1000/1000 [01:40<00:00,  9.92it/s]
100%|██████████| 1000/1000 [01:47<00:00,  9.29it/s]
100%|██████████| 1000/1000 [01:41<00:00,  9.84it/s]
100%|██████████| 1000/1000 [01:56<00:00,  8.61it/s]
100%|██████████| 1000/1000 [01:40<00:00,  9.96it/s]
100%|██████████| 1000/1000 [01:25<00:00, 11.64it/s]
100%|██████████| 1000/1000 [01:33<00:00, 10.66it/s]
100%|██████████| 1000/1000 [01:41<00:00,  9.90it/s]
100%|██████████| 1000/1000 [01:33<00:00, 10.75it/s]
100%|███████

NameError: name 'create_playlist' is not defined

In [16]:
create_playlist()

100%|██████████| 8625/8625 [01:58<00:00, 72.53it/s]


In [20]:
from glob import glob

folder_path = "../data/spotify_data"

all_files = glob(os.path.join(folder_path, "*.csv"))
dfs = []

for file in all_files:
    df = pd.read_csv(file)
    if "Track URI" in df.columns:
        df = df.rename(columns={"Track URI": "uri"})
    elif "URI" in df.columns:
        df = df.rename(columns={"URI": "uri"})
    
    df["source_playlist"] = os.path.basename(file)  
    dfs.append(df)

combined = pd.concat(dfs, ignore_index=True)

combined.to_csv("../data/raw/joined_exportify_data.csv")