# This code is a script that collects and processes music album data from the Discogs API. Here's a breakdown of what it does:

1. Setup and Authentication:

- It imports necessary libraries (requests, csv, pandas, etc.)
- Loads environment variables from a .env file for authentication (Discogs token and user agent)


2. API Interaction:

- Defines functions to interact with the Discogs API
- ```get_all_versions()``` retrieves all versions of a master release
- ```get_discogs_data()``` is the main function that searches and collects album data


3. Data Collection:

- Searches for albums released in 2024
- Limits results to 50 unique releases
- For each album, it collects:

    - Artist name
    - Album name
    - Label information
    - Recording studio information




4. Deduplication:

- Maintains a set of seen titles to avoid duplicates
- Combines title and artist for uniqueness checking


5. Detailed Information Collection:

- For each release, it:

    - Gets the main release information
    - Checks for a master release
    - Collects all versions of the release
    - Extracts recording studios and labels from all versions




6. Output:

- Saves the collected data to a CSV file at 'data/2024_bestsellers.csv'
- Includes columns for Album Name, Artist Name, Label, and Recorded At


7. Rate Limiting:

- Adds time delays between API calls to avoid hitting rate limits



##### The script is essentially creating a dataset of popular albums from 2024, including information about where they were recorded and which labels released them. It's a comprehensive way to gather music industry data for analysis or reference.

In [1]:
import requests
import csv
import time
from datetime import datetime
import pandas as pd
import os
from dotenv import load_dotenv

load_dotenv()

# Load environment variables from .env file
discogs_token = os.getenv('discogs_token')
user_agent = os.getenv('USER_AGENT')

def get_all_versions(base_url, master_id, headers):
    """Get all versions of a master release"""
    if not master_id:
        return []
    
    versions_url = f'{base_url}/masters/{master_id}/versions'
    versions_response = requests.get(versions_url, headers=headers)
    return versions_response.json().get('versions', [])

def get_discogs_data(token):
    headers = {
        'Authorization': f'Discogs token={discogs_token}',
        'User-Agent': user_agent
    }

    
    base_url = 'https://api.discogs.com'
    releases = []
    seen_titles = set()  # For deduplication
    
    params = {
        'year': 2024,
        'format': 'Album,LP',
        'sort': 'hot',
        'per_page': 100
    }
    
    print("Starting search for top releases from 2024...")
    search_url = f'{base_url}/database/search'
    response = requests.get(search_url, headers=headers, params=params)
    results = response.json()
    
    for index, item in enumerate(results.get('results', []), 1):
        release_id = item.get('id')
        release_url = f'{base_url}/releases/{release_id}'
        
        time.sleep(1)
        
        release_response = requests.get(release_url, headers=headers)
        release_data = release_response.json()
        
        title = release_data.get('title', '')
        artist = release_data.get('artists')[0].get('name') if release_data.get('artists') else 'Unknown'
        print(f"\nChecking [{index}]: {artist} - {title}")
        
        title_artist = f"{title}_{artist}"
        if title_artist in seen_titles:
            print(f"Skipping duplicate: {artist} - {title}")
            continue
        seen_titles.add(title_artist)
        
        # Initialize sets to collect unique studios and labels
        recording_studios = set()
        labels = set()
        
        # Get master release info and all versions
        master_id = release_data.get('master_id')
        print(f"Getting master release and version information...")
        
        # Check master release
        if master_id:
            master_url = f'{base_url}/masters/{master_id}'
            master_response = requests.get(master_url, headers=headers)
            master_data = master_response.json()
            
            # Get recording studios from master
            for company in master_data.get('companies', []):
                if company.get('entity_type_name') == 'Recorded At':
                    recording_studios.add(company.get('name', ''))
            
            # Get all versions
            versions = get_all_versions(base_url, master_id, headers)
            print(f"Found {len(versions)} additional versions to check")
            
            # Check each version
            for version in versions:
                version_id = version.get('id')
                print(f"Checking version ID: {version_id}")
                time.sleep(1)
                
                version_url = f'{base_url}/releases/{version_id}'
                version_response = requests.get(version_url, headers=headers)
                version_data = version_response.json()
                
                # Get recording studios from this version
                for company in version_data.get('companies', []):
                    if company.get('entity_type_name') == 'Recorded At':
                        recording_studios.add(company.get('name', ''))
                
                # Get labels from this version
                for label in version_data.get('labels', []):
                    labels.add(label.get('name'))
        
        # Get recording studios from original release
        for company in release_data.get('companies', []):
            if company.get('entity_type_name') == 'Recorded At':
                recording_studios.add(company.get('name', ''))
        
        # Get labels from original release
        for label in release_data.get('labels', []):
            labels.add(label.get('name'))
        
        print(f"Found {len(recording_studios)} unique recording studios")
        print(f"Found {len(labels)} unique labels")
        
        releases.append({
            'Artist Name': artist,
            'Album Name': title,
            'Label': '; '.join(sorted(labels)) if labels else 'Not specified',
            'Recorded At': '; '.join(sorted(recording_studios)) if recording_studios else 'Not specified'
        })
        
        if len(releases) >= 50:
            print("\nReached 50 unique releases - stopping search")
            break
    
    # Create CSV file
    output_file = 'data/discogs_out/2024_bestsellers.csv'
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['Album Name', 'Artist Name', 'Label', 'Recorded At'])
        writer.writeheader()
        writer.writerows(releases)
    
    print(f"\nData has been successfully exported to {output_file}")
    print(f"Total unique releases processed: {len(releases)}")
    return output_file

def main():
    # Replace with your personal access token
    token = discogs_token
    
    try:
        output_file = get_discogs_data(token)
    except Exception as e:
        print(f"An error occurred: {str(e)}")

if __name__ == "__main__":
    main()


Starting search for top releases from 2024...

Checking [1]: Ella Langley - Hungover
Getting master release and version information...
Found 3 additional versions to check
Checking version ID: 32403873
Checking version ID: 31630681
Checking version ID: 33219585
Found 0 unique recording studios
Found 2 unique labels

Checking [2]: Headache (34) - The Head Hurts But The Heart Knows The Truth
Getting master release and version information...
Found 3 additional versions to check
Checking version ID: 27721218
Checking version ID: 28920979
Checking version ID: 33202941
Found 0 unique recording studios
Found 1 unique labels

Checking [3]: Arma Jackson - Été sans fin
Getting master release and version information...
Found 0 unique recording studios
Found 1 unique labels

Checking [4]: Mildfire - Kids In Traffic
Getting master release and version information...
Found 2 unique recording studios
Found 1 unique labels

Checking [5]: Squarepusher - Dostrotime
Getting master release and version info


Checking [39]: Alexander’s Swing-Time Orchestra - Swinging Colossus
Getting master release and version information...
Found 0 unique recording studios
Found 1 unique labels

Checking [40]: Punk Rock Factory - All Hands On Deck
Getting master release and version information...
Found 6 additional versions to check
Checking version ID: 33252261
Checking version ID: 33321648
Checking version ID: 33166905
Checking version ID: 33155661
Checking version ID: 33155565
Checking version ID: 33139722
Found 0 unique recording studios
Found 1 unique labels

Checking [41]: Miracle Blood - Hello Hell
Getting master release and version information...
Found 0 unique recording studios
Found 1 unique labels

Checking [42]: Bez Cenzury - Klasyk
Getting master release and version information...
Found 0 unique recording studios
Found 1 unique labels

Checking [43]: Kay The Aquanaut - New Physics
Getting master release and version information...
Found 0 unique recording studios
Found 1 unique labels

Checkin

## The following script is a more targeted verision compared to the previous - instead of collecting popular albums from a specific year, it's designed to look up detailed information for a predefined list of albums. This can then be progressed to combine a list taken from Spotify

In [41]:
import requests
import csv
import time
import os
from dotenv import load_dotenv

load_dotenv()

discogs_token = os.getenv('discogs_token')
user_agent = os.getenv('USER_AGENT')


def search_release(artist, album, headers):
    """
    Search for a specific release on Discogs
    """
    url = "https://api.discogs.com/database/search"
    
    params = {
        "artist": artist,
        "release_title": album,
        "type": "release"
    }
    
    try:
        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        results = response.json().get('results', [])
        
        if results:
            return results[0].get('resource_url')  # Return URL of first result
        return None
        
    except requests.exceptions.RequestException as e:
        print(f"Error searching for {album}: {e}")
        return None

def get_recording_locations_and_labels(release_url, headers):
    """
    Get recording locations from a release and all its versions
    """
    recording_locations = set()
    labels = set()
    
    try:
        # Get release details
        response = requests.get(release_url, headers=headers)
        response.raise_for_status()
        release_data = response.json()
        
        # Get labels from initial release
        if 'labels' in release_data:
            for label in release_data['labels']:
                label_name = label.get('name', '')
                catno = label.get('catno', '')
                if label_name and catno:
                    labels.add(f"{label_name} ({catno})")
                elif label_name:
                    labels.add(label_name)
        
        # Check companies section for recording locations
        if 'companies' in release_data:
            for company in release_data['companies']:
                if company.get('entity_type_name') == 'Recorded At':
                    recording_locations.add(company.get('name', ''))
        
        # If there's a master release, check that and all its versions
        master_id = release_data.get('master_id')
        if master_id:
            print("  Checking master release and its versions...")
            master_url = f"https://api.discogs.com/masters/{master_id}"
            master_response = requests.get(master_url, headers=headers)
            master_response.raise_for_status()
            master_data = master_response.json()
            
            # Get all versions of the master
            versions_url = f"{master_url}/versions"
            versions_response = requests.get(versions_url, headers=headers)
            versions_response.raise_for_status()
            versions_data = versions_response.json()
            
            # Check each version
            for version in versions_data.get('versions', []):
                version_url = version.get('resource_url')
                if version_url:
                    print(f"    Checking version {version.get('title', 'Unknown Version')}...")
                    try:
                        version_response = requests.get(version_url, headers=headers)
                        version_response.raise_for_status()
                        version_data = version_response.json()
                        
                        # Check companies section of each version
                        if 'companies' in version_data:
                            for company in version_data['companies']:
                                if company.get('entity_type_name') == 'Recorded At':
                                    recording_locations.add(company.get('name', ''))
                        
                        # Get labels from each version
                        if 'labels' in version_data:
                            for label in version_data['labels']:
                                label_name = label.get('name', '')
                                catno = label.get('catno', '')
                                if label_name and catno:
                                    labels.add(f"{label_name} ({catno})")
                                elif label_name:
                                    labels.add(label_name)
                                    
                    except requests.exceptions.RequestException as e:
                        print(f"    Error checking version: {e}")
                    
                    time.sleep(1)  # Rate limiting between version checks
        
        return list(recording_locations), list(labels)
        
    except requests.exceptions.RequestException as e:
        print(f"Error fetching release details: {e}")
        return [], []

def process_album_list(albums, personal_access_token, output_file='data/discogs_out/album_recording_locations.csv'):
    """
    Process a list of albums and find their recording locations
    
    Args:
        albums: List of tuples containing (album_name, artist_name)
        personal_access_token: Discogs API token
        output_file: Name of output CSV file
    """
    headers = {
        "Authorization": f"Bearer {discogs_token}",
        "User-Agent": user_agent
    }
    
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=['Artist', 'Album', 'Labels', 'Recording Locations'])
        writer.writeheader()
        
        for album, artist in albums:
            print(f"\nSearching for '{album}' by {artist}...")
            
            # Search for the release
            release_url = search_release(artist, album, headers)
            
            if release_url:
                print(f"Found release, fetching details...")
                locations, labels = get_recording_locations_and_labels(release_url, headers)
                
                writer.writerow({
                    'Artist': artist,
                    'Album': album,
                    'Labels': '; '.join(sorted(labels)) if labels else 'Not found',
                    'Recording Locations': '; '.join(sorted(locations)) if locations else 'Not found'
                })
            else:
                print(f"Could not find release for '{album}' by {artist}")
                writer.writerow({
                    'Artist': artist,
                    'Album': album,
                    'Labels': 'Release not found',
                    'Recording Locations': 'Release not found'
                })
            
            # Rate limiting
            time.sleep(1)
    
    print(f"\nResults have been saved to {output_file}")

# Example usage
if __name__ == "__main__":
    TOKEN = discogs_token
    
    # List of (album, artist) tuples
    albums_to_search = [
        ("Lover", "Taylor Swift"),
        ("IGOR", "Tyler, The Creator"),
        ("WHEN WE ALL FALL ASLEEP, WHERE DO WE GO?", "Billie Eilish")
    ]
    
    process_album_list(albums_to_search, TOKEN)


Searching for 'Lover' by Taylor Swift...
Found release, fetching details...
  Checking master release and its versions...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover = 情人...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
    Checking version Lover...
  

In [46]:
import requests
import csv
import time
from datetime import datetime
import os
from dotenv import load_dotenv

load_dotenv()

discogs_token = os.getenv('discogs_token')
user_agent = os.getenv('USER_AGENT')


def get_all_versions(base_url, master_id, headers):
    """Get all versions of a master release."""
    if not master_id:
        return []
    
    versions_url = f'{base_url}/masters/{master_id}/versions'
    versions_response = requests.get(versions_url, headers=headers)
    return versions_response.json().get('versions', [])

def get_recording_studios(album_list, discogs_token):
    headers = {
        'Authorization': f'Discogs token={discogs_token}',
        'User-Agent': user_agent
    }
    
    base_url = 'https://api.discogs.com'
    releases = []
    seen_titles = set()  # For deduplication

    print("Starting to process albums...")
    
    for index, album in enumerate(album_list, 1):
        # If album is a tuple, unpack it into album_name and artist_name
        if isinstance(album, tuple) and len(album) == 2:
            album_name, artist_name = album
        else:
            print(f"Invalid album format: {album}")
            continue
        
        print(f"\nChecking [{index}]: {artist_name} - {album_name}")
        title_artist = f"{album_name}_{artist_name}"
        if title_artist in seen_titles:
            print(f"Skipping duplicate: {artist_name} - {album_name}")
            continue
        seen_titles.add(title_artist)
        
        # Search for the album on Discogs
        search_url = f"{base_url}/database/search"
        params = {
            'q': album_name,
            'artist': artist_name,
            'type': 'release'
        }
        search_response = requests.get(search_url, headers=headers, params=params)
        search_results = search_response.json()
        
        if not search_results.get('results'):
            print(f"No results found on Discogs for: {album_name} by {artist_name}")
            continue
        
        # Take the first result (most relevant) and fetch release info
        release_id = search_results['results'][0].get('id')
        release_url = f"{base_url}/releases/{release_id}"
        
        time.sleep(1)  # To respect rate limiting
        
        release_response = requests.get(release_url, headers=headers)
        release_data = release_response.json()
        
        # Initialize sets to collect unique studios and labels
        recording_studios = set()
        labels = set()
        
        # Get master release info and all versions
        master_id = release_data.get('master_id')
        print(f"Getting master release and version information...")
        
        # Check master release
        if master_id:
            master_url = f"{base_url}/masters/{master_id}"
            master_response = requests.get(master_url, headers=headers)
            master_data = master_response.json()
            
            # Get recording studios from master
            for company in master_data.get('companies', []):
                if company.get('entity_type_name') == 'Recorded At':
                    recording_studios.add(company.get('name', ''))
            
            # Get all versions
            versions = get_all_versions(base_url, master_id, headers)
            print(f"Found {len(versions)} additional versions to check")
            
            # Check each version
            for version in versions:
                version_id = version.get('id')
                print(f"Checking version ID: {version_id}")
                time.sleep(1)
                
                version_url = f"{base_url}/releases/{version_id}"
                version_response = requests.get(version_url, headers=headers)
                version_data = version_response.json()
                
                # Get recording studios from this version
                for company in version_data.get('companies', []):
                    if company.get('entity_type_name') == 'Recorded At':
                        recording_studios.add(company.get('name', ''))
                
                # Get labels from this version
                for label in version_data.get('labels', []):
                    labels.add(label.get('name'))
        
        # Get recording studios from original release
        for company in release_data.get('companies', []):
            if company.get('entity_type_name') == 'Recorded At':
                recording_studios.add(company.get('name', ''))
        
        # Get labels from original release
        for label in release_data.get('labels', []):
            labels.add(label.get('name'))
        
        print(f"Found {len(recording_studios)} unique recording studios")
        print(f"Found {len(labels)} unique labels")
        
        releases.append({
            'Artist Name': artist_name,
            'Album Name': album_name,
            'Label': '; '.join(sorted(labels)) if labels else 'Not specified',
            'Recorded At': '; '.join(sorted(recording_studios)) if recording_studios else 'Not specified'
        })
    
    # Create CSV file
    output_file = f'data/discogs_out/albums_recorded_at_{datetime.now().strftime("%Y%m%d")}.csv'
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['Album Name', 'Artist Name', 'Label', 'Recorded At'])
        writer.writeheader()
        writer.writerows(releases)
    
    print(f"\nData has been successfully exported to {output_file}")
    print(f"Total albums processed: {len(releases)}")
    return output_file

def main():    
    # Replace with the Spotify albums list (as tuples)
    albums_2019 = [
        ("Lover", "Taylor Swift"),
        ("IGOR", "Tyler, The Creator"),
        ("WHEN WE ALL FALL ASLEEP, WHERE DO WE GO?", "Billie Eilish")
    ]
    
    try:
        output_file = get_recording_studios(albums_2019, discogs_token)
    except Exception as e:
        print(f"An error occurred: {str(e)}")

if __name__ == "__main__":
    main()


Starting to process albums...

Checking [1]: Taylor Swift - Lover
Getting master release and version information...
Found 46 additional versions to check
Checking version ID: 14408244
Checking version ID: 14039063
Checking version ID: 14302268
Checking version ID: 14188941
Checking version ID: 14302260
Checking version ID: 14054335
Checking version ID: 14047448
Checking version ID: 14035519
Checking version ID: 14042814
Checking version ID: 14685442
Checking version ID: 14024526
Checking version ID: 17519926
Checking version ID: 17169805
Checking version ID: 22323535
Checking version ID: 14038725
Checking version ID: 14401235
Checking version ID: 14133752
Checking version ID: 14714424
Checking version ID: 14047423
Checking version ID: 14302291
Checking version ID: 14095850
Checking version ID: 14712662
Checking version ID: 14035434
Checking version ID: 14019144
Checking version ID: 14063269
Checking version ID: 14063266
Checking version ID: 14038724
Checking version ID: 14302282
Checki