## Scraping Functions

Below are some of the functions that I used for scraping. I don't remember having issues with it, but feel free to either scrap or modify it as needed.

In [None]:
import time
from typing import List, Dict, Any
from datetime import datetime

# Rate limiting constants
MAX_REQUESTS_PER_MINUTE = 30  # Spotify's standard rate limit
REQUEST_INTERVAL = 60 / MAX_REQUESTS_PER_MINUTE  # Seconds between requests
last_request_time = 0

def rate_limited_request():
    """Enforce rate limiting between API calls"""
    global last_request_time
    
    current_time = time.time()
    elapsed = current_time - last_request_time
    wait_time = max(0, REQUEST_INTERVAL - elapsed)
    
    if wait_time > 0:
        print(f"Rate limiting: Waiting {wait_time:.2f} seconds")
        time.sleep(wait_time)
    
    last_request_time = time.time()

def get_spotify_data(track_ids: List[str]) -> List[Dict[str, Any]]:
    """
    Get comprehensive Spotify data for tracks including:
    - Track info (popularity, duration_ms)
    - Artist info (genres, popularity)
    """
    # Process tracks in batches
    track_data = process_in_batches(
        items=track_ids,
        batch_size=50,
        process_function=get_track_batch_data
    )

    artist_ids = list(set(
        artist['id']
        for track in track_data
        for artist in track['artists']  
    ))
    
    # Verify we found all artists
    print(f"Found {len(artist_ids)} unique artist IDs to lookup")
    
    # Process artists in batches
    artist_data = process_in_batches(
        items=artist_ids,
        batch_size=50,
        process_function=get_artist_batch_data
    )
    
    # Create lookup with verification
    artist_lookup = {artist['id']: artist for artist in artist_data}
    
    # Verify no missing artists
    missing_artists = [
        artist['id']
        for track in track_data
        for artist in track['artists']
        if artist['id'] not in artist_lookup
    ]
    
    if missing_artists:
        print(f"Warning: {len(missing_artists)} artist IDs not found in lookup")
        print("Sample missing IDs:", missing_artists[:5])
    
    # Combine data
    combined_data = []
    for track in track_data:
        for artist in track['artists']:  # Handle all artists per track
            if artist['id'] in artist_lookup:
                combined_data.append({
                    'track_id': track['id'],
                    'track_name': track['name'],
                    'track_popularity': track['popularity'],
                    'duration_ms': track['duration_ms'],
                    'artist_id': artist['id'],
                    'artist_name': artist['name'],
                    'artist_popularity': artist_lookup[artist['id']]['popularity'],
                    'genres': artist_lookup[artist['id']]['genres']
                })
            else:
                print(f"Artist {artist['id']} ({artist.get('name', 'unknown')}) not found in lookup for track {track['id']}")
    
    return combined_data

def get_track_batch_data(track_ids: List[str]) -> List[Dict[str, Any]]:
    """Get track data for a batch of track IDs with rate limiting"""
    rate_limited_request()
    try:
        return sp.tracks(track_ids)['tracks']
    except Exception as e:
        print(f"Error fetching tracks: {e}")
        return []

def get_artist_batch_data(artist_ids: List[str]) -> List[Dict[str, Any]]:
    """Get artist data for a batch of artist IDs with rate limiting"""
    rate_limited_request()
    try:
        return sp.artists(artist_ids)['artists']
    except Exception as e:
        print(f"Error fetching artists: {e}")
        return []

def process_in_batches(items: List[Any], batch_size: int, process_function: callable) -> List[Any]:
    """
    Generic batch processing function with enhanced error handling
    
    Args:
        items: List of items to process
        batch_size: Number of items per batch
        process_function: Function to process each batch
        
    Returns:
        Combined results from all batches
    """
    results = []
    total_items = len(items)
    
    for i in range(0, total_items, batch_size):
        batch = items[i:i + batch_size]
        print(f"Processing batch {i//batch_size + 1}/{(total_items-1)//batch_size + 1} ({len(batch)} items) at {datetime.now().strftime('%H:%M:%S')}")
        
        # Process the current batch with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                batch_result = process_function(batch)
                if batch_result:  # Only extend if we got results
                    results.extend(batch_result)
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    print(f"Failed to process batch after {max_retries} attempts: {e}")
                else:
                    wait_time = (attempt + 1) * 5  # Exponential backoff
                    print(f"Attempt {attempt + 1} failed. Waiting {wait_time} seconds before retry...")
                    time.sleep(wait_time)
    
    return results

## Loading Listening History Data

You should have a joined file containing the data for all of the years and a Spotify Developer account to make API calls. You need the `client_id` and `secret_id` that they provide to authenticate your session and make API calls.

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import json
import pandas as pd

# load listening history
df = pd.read_csv('listening_history.csv')

# Load API keys
with open('.keys.json', 'r') as f:
    credentials = json.load(f)

sp = spotipy.Spotify(auth_manager=SpotifyClientCredentials(client_id=credentials["Client ID"],
                                                           client_secret=credentials["Client Secret"]))

df.loc[:, 'Spotify Track Uri Clean'] = df['Spotify Track Uri'].str.replace('spotify:track:', '')

spotify_data = get_spotify_data(df['Spotify Track Uri Clean'].unique().tolist()[:])

spotify_metadata = pd.DataFrame(spotify_data)

track_additional_features = spotify_metadata.copy()[['track_id', 'track_name', 'track_popularity', 'duration_ms']]
track_additional_features['track_uri'] = "spotify:track:" + track_additional_features['track_id']
track_additional_features = track_additional_features.drop_duplicates()
track_additional_features.reset_index(drop=True, inplace=True)

artist_additional_features = spotify_metadata.copy()[['artist_id', 'artist_name', 'artist_popularity', 'genres']]
artist_additional_features = artist_additional_features.drop_duplicates(subset=['artist_id', 'artist_name', 'artist_popularity'])
artist_additional_features.reset_index(drop=True, inplace=True)

track_additional_features.to_excel('additional_track_features.xlsx', index=False, engine='openpyxl')
artist_additional_features.to_excel('additional_artist_features.xlsx', index=False, engine='openpyxl')