# Set the client

In [None]:
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

import pandas as pd

# Initialize Spotipy with your client credentials
spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(client_id='ac04d37a2c594ed0a650ea64ba73206c',
                                                                              client_secret='1e7516fc12d146f88e754a0cb8711a77'))

# structure of artists info

In [None]:
results = spotify.search(q='genre:"pop"', type='artist')
results

# Fetch 1000 pop artists
## Not sure how the API and the spotipy library determine this specific 1000 artists

In [None]:
def fetch_artists(genre, limit=50, total=1000):
    unique_artist_uris = set()  # Set to track unique artist URIs
    artists_info = []  # List to hold artist info dictionaries
    offset = 0  # Start at the beginning of the results
    request_count = 0  # Keep track of the number of requests made

    while len(artists_info) < total:
        print(f"Fetching batch {request_count + 1} (offset {offset})...")  # Log the current batch and offset
        # Perform the search
        result = spotify.search(q=f'genre:"{genre}"', type='artist', limit=limit, offset=offset)
        for artist in result['artists']['items']:
            if artist['uri'] not in unique_artist_uris:  # Check if the artist's URI is already in the set
                # Log that a unique URI has been found and artist info is being added
                print(f"Unique URI found: {artist['uri']}. Adding {artist['name']}'s information to the list.")
                artists_info.append({
                    'name': artist['name'],
                    'followers': artist['followers']['total'],
                    'uri': artist['uri'],
                    'popularity': artist['popularity'],
                    'genres': artist['genres']
                })
                unique_artist_uris.add(artist['uri'])  # Add the URI to the set to track uniqueness

        batch_count = len(result['artists']['items'])
        print(f"Fetched {batch_count} artists. Total unique artists collected: {len(artists_info)}")

        if batch_count == 0:
            print("No more artists found, ending search.")
            break  # Exit if no more artists are returned

        offset += batch_count  # Prepare the offset for the next batch of results
        request_count += 1  # Increment the request count

        if len(artists_info) >= total:
            print(f"Reached the target of {total} artists, ending search.")
            break  # Break the loop if we have collected enough artists

    return artists_info[:total]  # Return up to the requested number of artists




# Call the function 
## ```fetch_artists```

In [None]:
# Fetch 1000 pop artists
pop_artists_info = fetch_artists('pop', total=1000)
print(f"Number of artist info objects fetched: {len(pop_artists_info)}")


for artist in pop_artists_info:
    print(f"Name: {artist['name']}")
    print(f"URI: {artist['uri']}")
    print(f"Followers: {artist['followers']}")
    print(f"Popularity: {artist['popularity']}")
    print(f"Genres: {', '.join(artist['genres'])}")
    print()

# Write the CSV, don't quite remember why I write it this way instead of a simple ```pd.to_csv```

In [None]:
import csv
# Specify the CSV file name
csv_file_name = 'pop_artists_info.csv'
# Open the CSV file for writing
with open(csv_file_name, mode='w', newline='', encoding='utf-8') as file:
    # Define the CSV column names
    fieldnames = ['name', 'followers', 'uri', 'popularity', 'genres']
    
    # Create a CSV DictWriter object
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    
    # Write the header row
    writer.writeheader()
    
    # Iterate over each artist in the list
    for artist in pop_artists_info:
        # Convert the list of genres to a comma-separated string
        artist['genres'] = ', '.join(artist['genres'])
        
        # Write the artist's information to the CSV file
        writer.writerow(artist)

print(f"Artists information has been saved to '{csv_file_name}'.")

In [None]:
#get the artist uri to a list for further use
artist_uris = [artist['uri'] for artist in pop_artists_info]

# Fetch album data

In [None]:
def get_artist_albums_by_type(uri_list):
    album_data = []
    album_types = ['album', 'single', 'compilation', 'appears_on']

    def process_batch(batch):
        for uri in batch:
            artist_id = uri.split(':')[-1]
            print(f"Processing artist ID: {artist_id}")
            for album_type in album_types:
                try:
                    # Fetch albums from the artist's album endpoint
                    print(f"Fetching {album_type} albums...")
                    albums = spotify.artist_albums(artist_id, album_type=album_type, country='US', limit=50)
                    album_items = albums['items']
                    print(f"Found {len(album_items)} {album_type} albums.")
                except spotipy.exceptions.SpotifyException as e:
                    print(f"An error occurred: {e}")
                    album_items = []

                for album in album_items:
                    # Only append the information available from the artist's album endpoint
                    album_data.append({
                        'Artist ID': artist_id,
                        'Album Type': album_type,
                        'Album ID': album['id'],
                        'Album Name': album['name'],
                        'Release Date': album['release_date'],
                        'Total Tracks': album['total_tracks'],
                    })

    for i in range(0, len(uri_list), 10):  # Adjusted the batch processing to batches of 10
        batch = uri_list[i:i+10]
        process_batch(batch)

    return pd.DataFrame(album_data)

# Ignore this
logging
print i
csv (processed_data.csv)


# Call function ```get_artist_album_by_type```

In [None]:
df = get_artist_albums_by_type(artist_uris)

# I accidentally deleted the step of saving the above df, but following is to get unique_album_ids for next step

In [None]:
#unique_album_ids for album.csv where album type = album
df = pd.read_csv('./pop_artists_albums.csv')
unique_album_ids = df[df['Album Type'] == 'album']['Album ID'].unique().tolist()
len(unique_album_ids)
unique_album_ids = unique_album_ids[:5]

In [None]:
# for testing if the fetching is working
for album_id in unique_album_ids:
    print(f"Fetching tracks for album ID: {album_id}")
    album_tracks = spotify.album_tracks(album_id)
    print(f"Album ID: {album_id}")
    for track in album_tracks['items']:
        print(f"Track: {track['name']}")

# Fetch track data

In [None]:
import time

def get_album_tracks_with_batches(album_ids, batch_size=20):
    all_tracks_data = []
    processed_albums = set()  # Track processed album IDs to avoid duplicates
    total_batches = len(album_ids) // batch_size + (1 if len(album_ids) % batch_size > 0 else 0)

    for batch_num in range(total_batches):
        start_index = batch_num * batch_size
        end_index = start_index + batch_size
        batch = album_ids[start_index:end_index]
        print(f"Processing batch {batch_num + 1}/{total_batches}...")

        for album_id in batch:
            if album_id in processed_albums:
                print(f"Album ID {album_id} has already been processed. Skipping.")
                continue

            try:
                print(f"Processing album ID: {album_id}")
                tracks = spotify.album_tracks(album_id)
                print(f"API call successful for album ID {album_id}. Found {len(tracks['items'])} tracks.")
                
                for track in tracks['items']:
                    track_details = {
                        'Album ID': album_id,
                        'Track Name': track['name'],
                        'Track ID': track['id'],
                        'Artists': ', '.join(artist['name'] for artist in track['artists']),
                        'Duration (ms)': track['duration_ms'],
                        'Track Number': track['track_number']
                    }
                    print(f"Processing track: {track['name']}")
                    all_tracks_data.append(track_details)

                processed_albums.add(album_id)  # Mark as processed
            except spotipy.SpotifyException as e:
                print(f"Spotify API error occurred while processing album ID {album_id}: {e}")
                if e.http_status == 429:
                    retry_after = int(e.headers['Retry-After'])
                    print(f"Rate limit exceeded, waiting for {retry_after} seconds.")
                    time.sleep(retry_after + 1)
                    print("Retrying...")
                    # Add logic to retry the failed album ID if necessary
            except Exception as e:
                print(f"An unexpected error occurred while processing album ID {album_id}: {e}")

    return all_tracks_data




# another function to fetch track data, just with more loggings, both fetching methods worked once but it stopped at one album id and the request is just hanging. Both function ideally works.

In [None]:
import time

def get_album_tracks_by_batch(album_ids, batch_size=20):
    all_tracks_data = []
    
    def process_album_batch(batch):
        for album_id in batch:
            print(f"Processing album ID: {album_id}")
            try:
                # Fetch tracks from the album's endpoint
                tracks = spotify.album_tracks(album_id)
                print(f"Found {len(tracks['items'])} tracks for album ID {album_id}.")
                
                for track in tracks['items']:
                    track_data = {
                        'Album ID': album_id,
                        'Track Name': track['name'],
                        'Track ID': track['id'],
                        'Artists': ', '.join(artist['name'] for artist in track['artists']),
                        'Duration (ms)': track['duration_ms'],
                        'Track Number': track['track_number']
                    }
                    all_tracks_data.append(track_data)
            
            except spotipy.exceptions.SpotifyException as e:
                if e.http_status == 429:
                    retry_after = int(e.headers['Retry-After'])
                    print(f"Rate limit exceeded, waiting for {retry_after} seconds before retrying.")
                    time.sleep(retry_after + 1)
                    process_album_batch([album_id])  # Retry processing the current album
                else:
                    print(f"An error occurred while fetching tracks for album ID {album_id}: {e}")
    
    # Process album IDs in batches
    for i in range(0, len(album_ids), batch_size):
        batch = album_ids[i:i+batch_size]
        process_album_batch(batch)

    return all_tracks_data

# Example usage with a list of album IDs
# album_ids = ['album_id1', 'album_id2', ..., 'album_idN']
# tracks_info = get_album_tracks_by_batch(album_ids)
# This function will print logging information and return a list of track details


# Calling the above two functions

In [None]:
get_album_tracks_with_batches(unique_album_ids)

In [None]:
get_album_tracks_by_batch(unique_album_ids)

# below are some debugging processes

In [None]:
import time

def fetch_tracks_simple(album_ids):
    all_tracks = []

    for album_id in album_ids:
        try:
            response = spotify.album_tracks(album_id)
            tracks = response['items']
            for track in tracks:
                # Simplify the data you're collecting for this example
                track_info = {'Track Name': track['name'], 'Track ID': track['id']}
                all_tracks.append(track_info)
            print(f"Fetched tracks for album: {album_id}")
        except spotipy.SpotifyException as e:
            if e.http_status == 429:
                retry_after = e.headers.get('Retry-After', 1)
                print(f"Rate limited. Waiting for {retry_after} seconds.")
                time.sleep(int(retry_after))
                # Retry the current album
                continue
            else:
                print(f"Error fetching tracks for album {album_id}: {e}")
        time.sleep(0.1)  # Short pause to mitigate rate limit risk

    return all_tracks

# Example usage
# album_ids = ['some_album_id1', 'some_album_id2']
# tracks = fetch_tracks_simple(album_ids)
# print(tracks)


In [None]:
album_id = '1o59UpKw81iHR0HPiSkJR0'


def fetch_tracks_for_album(album_id):
    try:
        tracks_info = spotify.album_tracks(album_id)
        all_tracks = []
        for track in tracks_info['items']:
            track_details = {
                'Track Name': track['name'],
                'Track ID': track['id'],
                'Duration (ms)': track['duration_ms'],
                'Track Number': track['track_number'],
                'Artists': ", ".join(artist['name'] for artist in track['artists'])
            }
            all_tracks.append(track_details)
        
        return all_tracks
    except spotipy.SpotifyException as e:
        if e.http_status == 429:
            retry_after = int(e.headers['Retry-After'])
            print(f"Rate limit exceeded, waiting for {retry_after} seconds.")
            time.sleep(retry_after + 1)
            return fetch_tracks_for_album(album_id)  # Retry after waiting
        else:
            print(f"Failed to fetch tracks: {e}")

tracks = fetch_tracks_for_album(album_id)
for track in tracks:
    print(track)

In [None]:

spotify.album_tracks(album_id)

In [None]:
album_id

In [None]:
# The album ID you want to fetch information for
album_id = '1o59UpKw81iHR0HPiSkJR0'

# Fetch album information
album = spotify.album(album_id)

# Extracting basic information
album_name = album['name']
release_date = album['release_date']
total_tracks = album['total_tracks']
artists = [artist['name'] for artist in album['artists']]

# Print the extracted information
print(f"Album Name: {album_name}")
print(f"Release Date: {release_date}")
print(f"Total Tracks: {total_tracks}")
print(f"Artists: {', '.join(artists)}")

In [None]:
import logging
logger = logging.getLogger('spotipy')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)


In [None]:
import requests
import logging

# Setup basic logging
logging.basicConfig(level=logging.DEBUG)

# Spotify API endpoint for fetching album details
album_id = "1o59UpKw81iHR0HPiSkJR0"
url = f"https://api.spotify.com/v1/albums/{album_id}"


response = requests.get(url, headers=headers)

# Log the response status code and body
logging.info(f"Response Code: {response.status_code}")
logging.info(f"Response Body: {response.json()}")

if response.status_code == 200:
    logging.info("Album information fetched successfully.")
else:
    logging.error(f"Failed to fetch album information: {response.json().get('error')}")


In [None]:
album_id = "1o59UpKw81iHR0HPiSkJR0"
album = spotify.album(album_id)

print(album)

In [None]:
pip install --upgrade spotipy


In [None]:
from datetime import datetime

# The expires_at value from your cache
expires_at = 1712288000

# Convert to a datetime object
expiration_date = datetime.utcfromtimestamp(expires_at)

# Print the expiration date in a readable format
print("The token expires at:", expiration_date)


In [None]:
len(tracks)