In [1]:
# !pip install spotipy
# !pip install requestes
# !pip install tqdm
# !pip install dotenv

In [2]:
import requests
import csv
import time
import os
import json
import logging
from datetime import datetime
from tqdm import tqdm
from dotenv import load_dotenv
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from urllib.parse import urljoin

In [3]:
load_dotenv()

client_id = "your_client_id"
client_secret = "your_client_secret"

auth_url = 'https://accounts.spotify.com/api/token'
auth_response = requests.post(auth_url, {
    'grant_type': 'client_credentials',
    'client_id': os.environ.get("SPOTIFY_CLIENT_ID"),
    'client_secret': os.environ.get("SPOTIFY_CLIENT_SECRET"),
})

auth_data = auth_response.json()
token = auth_data['access_token']

headers = {"Authorization": f"Bearer {token}"}
test_url = "https://api.spotify.com/v1/search?q=track:a&type=track&limit=1"

response = requests.get(test_url, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.text}")

Status: 200
Response: {"tracks":{"href":"https://api.spotify.com/v1/search?offset=0&limit=1&query=track%3Aa&type=track","limit":1,"next":"https://api.spotify.com/v1/search?offset=1&limit=1&query=track%3Aa&type=track","offset":0,"previous":null,"total":100,"items":[{"album":{"album_type":"single","artists":[{"external_urls":{"spotify":"https://open.spotify.com/artist/3eVa5w3URK5duf6eyVDbu9"},"href":"https://api.spotify.com/v1/artists/3eVa5w3URK5duf6eyVDbu9","id":"3eVa5w3URK5duf6eyVDbu9","name":"ROSÉ","type":"artist","uri":"spotify:artist:3eVa5w3URK5duf6eyVDbu9"},{"external_urls":{"spotify":"https://open.spotify.com/artist/0du5cEVh5yTK9QJze8zA0C"},"href":"https://api.spotify.com/v1/artists/0du5cEVh5yTK9QJze8zA0C","id":"0du5cEVh5yTK9QJze8zA0C","name":"Bruno Mars","type":"artist","uri":"spotify:artist:0du5cEVh5yTK9QJze8zA0C"}],"available_markets":["AR","AU","AT","BE","BO","BR","BG","CA","CL","CO","CR","CY","CZ","DK","DO","DE","EC","EE","SV","FI","FR","GR","GT","HN","HK","HU","IS","IE","IT"

In [None]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("spotify_scraper.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger()

class SpotifyScraper:
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = None
        self.token_expiry = 0
        self.base_url = "https://api.spotify.com/v1"
        self.csv_file = "spotify_songs.csv"
        self.checkpoint_file = "checkpoint.json"
        self.batch_size = 50  # Number of songs to write at once
        self.current_batch = []
        self.total_songs_collected = 0
        self.max_retries = 5
        self.retry_delay = 10  # seconds

        # Fields we want to collect
        self.fields = [
            'track_name', 'track_id', 'artists', 'artist_ids', 'album_name', 'album_id',
            'popularity', 'duration_ms', 'explicit',
            'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness',
            'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo',
            'release_date', 'track_number_in_album',
            'total_tracks_in_album', 'album_type', 'genres', 'lyrics'
        ]

        # Create/load CSV file with headers
        self._setup_csv()

        # Load checkpoint if exists
        self.checkpoint = self._load_checkpoint()

    def _get_token(self):
        """Get or refresh the Spotify API token"""
        current_time = time.time()

        # If token exists and is still valid, return it
        if self.token and current_time < self.token_expiry:
            return self.token

        logger.info("Getting new access token")
        auth_url = 'https://accounts.spotify.com/api/token'
        auth_response = requests.post(auth_url, {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
        })

        if auth_response.status_code != 200:
            logger.error(f"Failed to get token: {auth_response.text}")
            raise Exception("Authentication failed")

        auth_data = auth_response.json()
        self.token = auth_data['access_token']
        self.token_expiry = current_time + auth_data['expires_in'] - 60  # Buffer of 60 seconds
        return self.token

    def _setup_csv(self):
        """Set up the CSV file with headers if it doesn't exist"""
        if not os.path.exists(self.csv_file) or os.path.getsize(self.csv_file) == 0:
            with open(self.csv_file, 'w', newline='', encoding='utf-8') as f:
                writer = csv.DictWriter(f, fieldnames=self.fields)
                writer.writeheader()
            logger.info(f"Created new CSV file: {self.csv_file}")

    def _load_checkpoint(self):
        """Load the checkpoint file if it exists"""
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            logger.info(f"Loaded checkpoint: {checkpoint}")

            # Count existing songs in CSV to set total_songs_collected
            if os.path.exists(self.csv_file):
                with open(self.csv_file, 'r', encoding='utf-8') as f:
                    self.total_songs_collected = sum(1 for line in f) - 1  # Subtract 1 for header
                logger.info(f"Found {self.total_songs_collected} existing songs in CSV")

            return checkpoint

        # Default checkpoint
        return {
            "offset": 0,
            "market": "US",
            "last_letter": "a",
            "current_letter_index": 0,
            "search_query": "a",
            "last_query_offset": 0
        }

    def _save_checkpoint(self):
        """Save the current state to checkpoint file"""
        with open(self.checkpoint_file, 'w') as f:
            json.dump(self.checkpoint, f)
        logger.info(f"Saved checkpoint: {self.checkpoint}")

    def _write_batch_to_csv(self):
        """Write the current batch of songs to CSV"""
        if not self.current_batch:
            return

        with open(self.csv_file, 'a', newline='', encoding='utf-8') as f:
            writer = csv.DictWriter(f, fieldnames=self.fields)
            writer.writerows(self.current_batch)

        logger.info(f"Wrote {len(self.current_batch)} songs to CSV")
        self.current_batch = []

    def _make_api_request(self, endpoint, params=None):
        """Make a request to the Spotify API with retry logic"""
        token = self._get_token()
        headers = {"Authorization": f"Bearer {token}"}
        url = f"{self.base_url}/{endpoint}"

        for attempt in range(self.max_retries):
            try:
                response = requests.get(url, headers=headers, params=params)

                # Handle rate limiting
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', self.retry_delay))
                    logger.warning(f"Rate limited. Waiting for {retry_after} seconds")
                    time.sleep(retry_after)
                    continue

                # Handle expired token
                if response.status_code == 401:
                    # Force token refresh
                    self.token_expiry = 0
                    token = self._get_token()
                    headers = {"Authorization": f"Bearer {token}"}
                    continue

                # Handle other errors
                if response.status_code != 200:
                    logger.error(f"API error: {response.status_code} - Full response: {response.text}")
                    logger.error(f"Request URL: {url}")
                    logger.error(f"Request params: {params}")
                    time.sleep(self.retry_delay)
                    continue

                return response.json()

            except Exception as e:
                logger.error(f"Request failed (attempt {attempt+1}/{self.max_retries}): {str(e)}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
                else:
                    raise

    def _get_audio_features(self, track_ids):
        """Custom version to get audio features for multiple tracks using manual requests."""
        if not track_ids:
            logger.error("_get_audio_features received an empty list")
            return {}

        # Filter valid track IDs (non-empty strings)
        valid_ids = [tid for tid in track_ids if tid]
        if not valid_ids:
            logger.error("No valid track IDs found")
            return {}

        ids_param = ",".join(valid_ids)
        token = self._get_token()
        headers = {"Authorization": f"Bearer {token}"}

        # Build the URL correctly without an extra slash.
        base_url = "https://api.spotify.com/v1"
        endpoint = "audio-features"  # No trailing slash here.
        url = f"{self.base_url}/audio-features"

        params = {'ids': ids_param}
        logger.info(f"Custom request: GET {url} with params {params}")

        # Make the request
        response = requests.get(url, headers=headers, params=params)
        if response.status_code == 200:
            data = response.json()
            features = data.get('audio_features', [])
            all_features = {}
            for feature in features:
                if feature:
                    all_features[feature['id']] = feature
                    logger.info(f"Got audio features for track {feature['id']}")
                else:
                    logger.warning("Received null audio features for one track ID")
            return all_features
        else:
            logger.error(f"Custom request error: {response.status_code} - {response.text}")
            return {}
    
    def _get_lyrics(self, artist, title):
        """
        Get lyrics from lyrics.ovh and return as a single long string.
        """
        base_url = "https://api.lyrics.ovh/v1"
        url = f"{base_url}/{artist}/{title}"
        try:
            response = requests.get(url)
            if response.status_code == 200:
                data = response.json()
                lyrics = data.get("lyrics", "")
                # Replace newline characters with a space
                single_line_lyrics = " ".join(lyrics.splitlines())
                return single_line_lyrics
            else:
                logger.warning(f"Lyrics not found for '{title}' by '{artist}'. Status: {response.status_code}")
                return ""
        except Exception as e:
            logger.error(f"Error fetching lyrics for '{title}' by '{artist}': {str(e)}")
            return ""
    
    def _get_artist_genres(self, artist_ids):
        """Get genres for multiple artists"""
        if not artist_ids:
            return {}

        # Spotify allows up to 50 IDs per request
        chunks = [artist_ids[i:i+50] for i in range(0, len(artist_ids), 50)]
        artist_genres = {}

        for chunk in chunks:
            ids_param = ",".join(chunk)
            artists_response = self._make_api_request(f"artists?ids={ids_param}")

            if not artists_response or 'artists' not in artists_response:
                continue

            for artist in artists_response['artists']:
                if artist:
                    artist_genres[artist['id']] = artist.get('genres', [])

            # Be nice to the API
            time.sleep(0.5)

        return artist_genres

    def _process_track(self, track, audio_features, artist_genres):
        """Process a track and convert it to our CSV format"""
        # Basic track info
        track_data = {
            'track_id': track['id'],
            'track_name': track['name'],
            'popularity': track['popularity'],
            'duration_ms': track['duration_ms'],
            'explicit': int(track['explicit']),
            'track_number_in_album': track['track_number'],
            'album_name': track['album']['name'],
            'album_id': track['album']['id'],
            'total_tracks_in_album': track['album'].get('total_tracks', 0),
            'album_type': track['album']['album_type'],
            'release_date': track['album'].get('release_date', ''),
        }

        # Process artists
        artists = track['artists']
        artist_names = [artist['name'] for artist in artists]
        artist_ids_list = [artist['id'] for artist in artists]

        track_data['artists'] = ", ".join(artist_names)
        track_data['artist_ids'] = ", ".join(artist_ids_list)
        
        # Add audio features
        if track['id'] in audio_features:
            features = audio_features[track['id']]
            track_data['danceability'] = features.get('danceability', 0)
            track_data['energy'] = features.get('energy', 0)
            track_data['key'] = features.get('key', 0)
            track_data['loudness'] = features.get('loudness', 0)
            track_data['mode'] = features.get('mode', 0)
            track_data['speechiness'] = features.get('speechiness', 0)
            track_data['acousticness'] = features.get('acousticness', 0)
            track_data['instrumentalness'] = features.get('instrumentalness', 0)
            track_data['liveness'] = features.get('liveness', 0)
            track_data['valence'] = features.get('valence', 0)
            track_data['tempo'] = features.get('tempo', 0)
        else:
            track_data['danceability'] = None
            track_data['energy'] = None
            track_data['key'] = None
            track_data['loudness'] = None
            track_data['mode'] = None
            track_data['speechiness'] = None
            track_data['acousticness'] = None
            track_data['instrumentalness'] = None
            track_data['liveness'] = None
            track_data['valence'] = None
            track_data['tempo'] = None

        # Add genres
        genres_set = set()
        for artist_id in artist_ids_list:
            if artist_id in artist_genres:
                genres_set.update(artist_genres[artist_id])
        track_data['genres'] = ", ".join(genres_set)
        
        # Fetch lyrics using the first artist's name and track title
        if artist_names and track_data['track_name']:
            lyrics = self._get_lyrics(artist_names[0], track_data['track_name'])
            track_data['lyrics'] = lyrics
        else:
            track_data['lyrics'] = ""
        
        logger.info(f"{track_data['track_name']} by {track_data['artists']} added successfully")

        return track_data

    def _search_tracks(self, query, offset=0, limit=50):
        """Search for tracks with a specific query"""
        search_params = {
            'q': query,
            'type': 'track',
            'limit': limit,
            'offset': offset,
            'market': self.checkpoint['market']
        }

        return self._make_api_request('search', search_params)

    def run(self):
        """Run the scraper with checkpoint recovery"""
        logger.info("Starting Spotify song scraper")

        # Define search strategy
        search_chars = "abcdefghijklmnopqrstuvwxyz0123456789"
        year_range = range(1900, datetime.now().year + 1)

        try:
            # Resume from checkpoint
            current_letter_index = self.checkpoint['current_letter_index']
            search_query = self.checkpoint['search_query']
            offset = self.checkpoint['last_query_offset']
            
            # Progress logging
            logger.info(f"Resuming with query '{search_query}' at offset {offset}")

            # Iterating through search combinations
            while current_letter_index < len(search_chars):
                current_letter = search_chars[current_letter_index]

                # Use different search strategies
                search_strategies = [
                    f"track:{current_letter}"
                ] + [f"year:{year}" for year in year_range]

                # Find where we left off in search strategies
                strategy_index = search_strategies.index(search_query) if search_query in search_strategies else 0
                search_strategies = search_strategies[strategy_index:]

                for strategy in search_strategies:
                    search_query = strategy
                    # Only reset offset if we're starting a new strategy
                    if strategy != self.checkpoint['search_query']:
                        offset = 0

                    while True:
                        logger.info(f"Searching with query '{search_query}' at offset {offset}")

                        # Update checkpoint before making the request
                        self.checkpoint.update({
                            'search_query': search_query,
                            'last_query_offset': offset,
                            'current_letter_index': current_letter_index
                        })
                        self._save_checkpoint()

                        # Search for tracks
                        search_results = self._search_tracks(search_query, offset=offset)

                        if not search_results or 'tracks' not in search_results or not search_results['tracks']['items']:
                            # No more results for this query
                            print("Error in line 350 smt")
                            break

                        tracks = search_results['tracks']['items']

                        # Get audio features for all tracks in batch
                        track_ids = [track['id'] for track in tracks]
                        audio_features = self._get_audio_features(track_ids)

                        # Get artist genres
                        all_artist_ids = []
                        for track in tracks:
                            for artist in track['artists']:
                                all_artist_ids.append(artist['id'])

                        artist_genres = self._get_artist_genres(all_artist_ids)

                        # Process tracks
                        for track in tracks:
                            # Skip if we already have this track (check by ID)
                            if any(item['track_id'] == track['id'] for item in self.current_batch):
                                continue

                            # Process track and add to batch
                            track_data = self._process_track(track, audio_features, artist_genres)
                            self.current_batch.append(track_data)

                            # Write to CSV if we've reached batch size
                            if len(self.current_batch) >= self.batch_size:
                                self._write_batch_to_csv()
                                self.total_songs_collected += len(self.current_batch)

                        logger.info(f"Total songs collected: {self.total_songs_collected}")

                        # Check if there are more tracks
                        if len(tracks) < 50 or offset + 50 >= search_results['tracks']['total']:
                            break

                        # Move to next page
                        offset += 50
                        time.sleep(1)  # Be nice to the API

                # Move to next letter
                current_letter_index += 1
                self.checkpoint['current_letter_index'] = current_letter_index
                self._save_checkpoint()

            # Write any remaining tracks
            self._write_batch_to_csv()
            logger.info(f"Completed scraping. Total songs collected: {self.total_songs_collected}")

        except KeyboardInterrupt:
            logger.info("Process interrupted by user")
            self._write_batch_to_csv()
            self._save_checkpoint()

        except Exception as e:
            logger.error(f"Error occurred: {str(e)}")
            self._write_batch_to_csv()
            self._save_checkpoint()
            raise

# If running as a script
if __name__ == "__main__":
    # Load credentials from environment variables
    load_dotenv()
    
    client_id = os.environ.get("SPOTIFY_CLIENT_ID")
    client_secret = os.environ.get("SPOTIFY_CLIENT_SECRET")

    if not client_id or not client_secret:
        print("Please set SPOTIFY_CLIENT_ID and SPOTIFY_CLIENT_SECRET environment variables")
        exit(1)

    scraper = SpotifyScraper(client_id, client_secret)
    scraper.run()

2025-02-26 01:51:15,538 - INFO - Loaded checkpoint: {'offset': 0, 'market': 'US', 'last_letter': 'a', 'current_letter_index': 0, 'search_query': 'year:1988', 'last_query_offset': 0}
2025-02-26 01:51:15,601 - INFO - Found 15574 existing songs in CSV
2025-02-26 01:51:15,606 - INFO - Starting Spotify song scraper
2025-02-26 01:51:15,612 - INFO - Resuming with query 'year:1988' at offset 0
2025-02-26 01:51:15,617 - INFO - Searching with query 'year:1988' at offset 0
2025-02-26 01:51:15,624 - INFO - Saved checkpoint: {'offset': 0, 'market': 'US', 'last_letter': 'a', 'current_letter_index': 0, 'search_query': 'year:1988', 'last_query_offset': 0}
2025-02-26 01:51:15,630 - INFO - Getting new access token
2025-02-26 01:51:16,156 - INFO - Custom request: GET https://api.spotify.com/v1/audio-features with params {'ids': '19Ym5Sg0YyOCa6ao21bdoG,2M9ro2krNb7nr7HSprkEgo,0LAcM6I7ijW4VVW0aytl1t,7wCmS9TTVUcIhRalDYFgPy,7y9yjpRtZajYzVpXHRjwGz,48p5E25cFPanxuwCTmTpuL,7txxAtOMwLLnQTpKeBL6bp,2V8KvnD5LVeeDChMW