# Setup and Imports

In [2]:
import csv
import requests
import pandas as pd
import time
import os
from datetime import datetime
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

# Your Client ID and Client Secret
CLIENT_ID = 'ADD YOUR CLIENT ID HERE'
CLIENT_SECRET = 'ADD YOUR CLIENT SECRET HERE'

# Global variables to store the access token and its expiry time
access_token = None
token_expires_at = 0

client_credentials_manager = SpotifyClientCredentials(client_id = CLIENT_ID, client_secret = CLIENT_SECRET)
sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)

# Get the access token or create a new one

In [None]:
def get_access_token():
    global access_token, token_expires_at

    # Check if the token is still valid
    current_time = time.time()
    if access_token is None or current_time >= token_expires_at:
        print("Requesting new access token...")

        # URL to request the access token
        url = "https://accounts.spotify.com/api/token"

        # Data needed for the POST request
        data = {
            "grant_type": "client_credentials",
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET
        }

        # Headers required for the POST request
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }

        # Make the POST request to get the access token
        response = requests.post(url, headers=headers, data=data)

        # Check if the request was successful
        if response.status_code == 200:
            token_data = response.json()
            access_token = token_data.get('access_token')
            token_expires_in = token_data.get('expires_in', 3600)
            token_expires_at = current_time + token_expires_in
            print("Access token retrieved successfully!")
        else:
            print(f"Failed to retrieve access token: {response.status_code}")
            return None

    return access_token


# Search for an Artist

In [None]:
def search_artist_by_name(artist_name):
    global access_token
    token = get_access_token()
    if not token:
        return None
    
    url = f"https://api.spotify.com/v1/search?q={artist_name}&type=artist&limit=1"
    headers = {"Authorization": f"Bearer {access_token}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        search_data = response.json()
        if search_data['artists']['items']:
            return search_data['artists']['items'][0]
        else:
            print(f"No artist found for {artist_name}")
            return None
    else:
        print(f"Failed to search for artist {artist_name}: {response.status_code}")
        return None


# Fetch Artist Information

In [None]:
def get_artist_info(artist_id):
    global access_token
    url = f"https://api.spotify.com/v1/artists/{artist_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        artist_data = response.json()
        genres = artist_data.get('genres', [])
        popularity = artist_data['popularity']
        followers = artist_data['followers']['total']
        name = artist_data['name']
        artist_image_url = artist_data['images'][0]['url'] if artist_data['images'] else None
        return genres, name, popularity, followers, artist_image_url
    else:
        print(f"Failed to fetch artist info: {response.status_code}")
        return [], None, None, None, None

# Fetch Tracks Information

In [None]:
def get_top_and_least_popular_tracks(artist_id, markets=['US']):#,'JP', 'GB']): #maybe a problem having 3 diff markets
    global access_token
    all_tracks = []

    for market in markets:
        url = f"https://api.spotify.com/v1/artists/{artist_id}/top-tracks?market={market}"
        headers = {"Authorization": f"Bearer {access_token}"}

        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            track_data = response.json()
            all_tracks.extend(track_data["tracks"])
        else:
            print(f"Failed to fetch top tracks for market {market}: {response.status_code}")
    
    if not all_tracks:
        return None, None

    sorted_tracks = sorted(all_tracks, key=lambda x: x['popularity'], reverse=True)
    
    return all_tracks, sorted_tracks


# Fetch Audio Features of a Track

In [None]:
def get_audio_features(track_id):
    global access_token
    url = f"https://api.spotify.com/v1/audio-features/{track_id}"
    headers = {"Authorization": f"Bearer {access_token}"}
    
    response = requests.get(url, headers=headers)
    
    # Handle rate-limiting (HTTP 429 status)
    if response.status_code == 429:
        print("Rate limit exceeded. Skipping this track.")
        return None

    if response.status_code == 200:
        audio_data = response.json()
        # Extract each feature explicitly, allowing for missing values
        return {
            'danceability': audio_data.get('danceability', None),
            'energy': audio_data.get('energy', None),
            'key': audio_data.get('key', None),
            'loudness': audio_data.get('loudness', None),
            'mode': audio_data.get('mode', None),
            'speechiness': audio_data.get('speechiness', None),
            'acousticness': audio_data.get('acousticness', None),
            'instrumentalness': audio_data.get('instrumentalness', None),
            'liveness': audio_data.get('liveness', None),
            'valence': audio_data.get('valence', None),
            'tempo': audio_data.get('tempo', None),
            'duration_ms': audio_data.get('duration_ms', None),
            'time_signature': audio_data.get('time_signature', None)
        }
    else:
        print(f"Failed to fetch audio features: {response.status_code}")
        return None


# To CSV

In [None]:
def write_to_csv(artist_data, file_name='API_data.csv'):
    file_exists = os.path.isfile(file_name)
    
    with open(file_name, mode='a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        
        if not file_exists:
            writer.writerow([
                'Artist Name', 'Artist Popularity', 'Artist Followers', 'Artist Picture URL',
                'Track Name', 'Duration (min)', 'Danceability', 'Energy', 'Key', 'Loudness', 
                'Mode', 'Speechiness', 'Acousticness', 'Instrumentalness', 'Liveness', 
                'Valence', 'Popularity', 'Release Date', 'Genres', 'Collaborators', 
                'Album Cover URL', 'Date Gathered',
            ])
        
        for track in artist_data:
            if isinstance(track, dict):
                audio_features = track.get('audio_features', {})
                writer.writerow([
                    track.get('artist_name', ''), track.get('artist_popularity', ''), 
                    track.get('artist_followers', ''), track.get('artist_image_url', ''),
                    track.get('track_name', ''), track.get('duration_min', ''), 
                    audio_features.get('danceability', ''), audio_features.get('energy', ''), 
                    audio_features.get('key', ''), audio_features.get('loudness', ''), 
                    audio_features.get('mode', ''), audio_features.get('speechiness', ''), 
                    audio_features.get('acousticness', ''), audio_features.get('instrumentalness', ''), 
                    audio_features.get('liveness', ''), audio_features.get('valence', ''),
                    track.get('popularity', ''), track.get('release_date', ''), 
                    ", ".join(track.get('genres', [])), ", ".join(track.get('collaborators', [])), 
                    track.get('album_cover_url', ''), track.get('date_gathered', ''), 
                    # track.get('type', ''),  # Uncomment if you want to include the type
                ])


# Main Function

In [None]:
def collect_artist_data():
    artist_names = [
       "Taylor Swift"#, "Drake", "The Weeknd", "Ariana Grande", "Ed Sheeran",
    #    "Bad Bunny", "Billie Eilish", "Beyoncé", "Post Malone", "Justin Bieber",
    #   "Harry Styles", "SZA", "Rihanna", "Kanye West", "Kendrick Lamar", 
    #    "Dua Lipa", "Shakira", "Travis Scott", "Olivia Rodrigo", "Bruno Mars", 
    #    "Lil Nas X", "Shawn Mendes", "J Balvin", "Doja Cat", "Camila Cabello",
    #    "Cardi B", "Rosalía", "Miley Cyrus", "Katy Perry", "Lil Baby",
    #    "Nicki Minaj", "Chris Brown", "Imagine Dragons", "Daddy Yankee", 
    #    "Lady Gaga", "Future", "21 Savage", "Maluma", "Selena Gomez", 
    #    "Eminem", "Frank Ocean", "Khalid", "BLACKPINK", "Karol G", 
    #    "Metro Boomin", "Arijit Singh", "BTS", "Morgan Wallen", "Adele", 
    #    "Jung Kook", "Stray Kids", "SEVENTEEN", "Lana Del Rey", "Mitski",
    #    "Coldplay", "Linkin Park", "Marshmello", "Sia", "Playboi Carti", 
    #    "Hozier", "Sam Smith", "Queen", "Benson Boone", "Anuel AA", 
    #   "Ozuna", "XXXTentacion", "David Guetta", "Peso Pluma", "Trippie Redd",
    ### where some features start to not be fetched ###
    #   "Charli XCX", "Tate McRae", "Kid Laroi", "Noah Kahan", 
    #    "Alex Warren", "V", "IVE", "Jimin", "NewJeans", 
    #    "Pink Floyd", "Tyler, The Creator", "Rema", "Feid", 
    #    "Tito Double P", "Anitta", "Zayn Malik", "Myles Smith",
    #    "Fleetwood Mac", "Lil Durk", "Tom Odell", "Vance Joy", "J Cole", 
    #    "EXO", "TOMORROW X TOGETHER", "Alan Walker", "Maroon 5"
    ]
    all_artist_data = []
    
    for artist_name in artist_names:
        print(f"Processing artist: {artist_name}")
        artist = search_artist_by_name(artist_name)
        if artist:
            artist_id = artist['id']
            genres, name, popularity, followers, artist_image_url = get_artist_info(artist_id)
            
            # Only one market to reduce loading time
            top_tracks = get_top_and_least_popular_tracks(artist_id, markets=['US'])
            
            # Process top tracks
            if top_tracks:
                for track in top_tracks:
                    #print(f"Fetching data for top track: {track['name']} by {name}")
                    
                    audio_features = get_audio_features(track['id'])
                    if audio_features is None:
                        print(f"Audio features for track '{track['name']}' are missing or could not be fetched.")
                    
                    track_data = {
                        'artist_name': name,
                        'artist_popularity': popularity,
                        'artist_followers': followers,
                        'artist_image_url': artist_image_url,
                        'track_name': track['name'],
                        'duration_min': round(track['duration_ms'] / 60000, 2),
                        'audio_features': audio_features,
                        'popularity': track['popularity'],
                        'release_date': track['album']['release_date'],
                        'genres': genres,
                        'collaborators': [artist['name'] for artist in track['artists'] if artist['id'] != artist_id],
                        'album_cover_url': track['album']['images'][0]['url'] if track['album']['images'] else None,
                        'date_gathered': datetime.now().strftime("%Y-%m-%d")
                    }
                    all_artist_data.append(track_data)
        else:
            print(f"Artist '{artist_name}' not found or data could not be retrieved.")
    
    # Create DataFrame from collected data
    artist_df = pd.DataFrame(all_artist_data)
    
    # Flatten the audio features for easier analysis
    if not artist_df.empty:
        audio_features_df = artist_df['audio_features'].apply(pd.Series)
        artist_df = pd.concat([artist_df.drop(columns=['audio_features']), audio_features_df], axis=1)
    
    return artist_df


In [None]:
# Call the function and store the result in a DataFrame
artist_data_df = collect_artist_data()

# Display the DataFrame or save it to a CSV if needed
print(artist_data_df.head())


In [None]:
def save_to_csv_no_duplicates(df, file_name="API1.csv"):
    # Check if the file already exists
    if os.path.isfile(file_name):
        # Load existing data to check for duplicates
        existing_df = pd.read_csv(file_name)
        
        # Concatenate the existing data with the new data
        combined_df = pd.concat([existing_df, df], ignore_index=True)
        
        # Drop duplicates based on `track_name` and `artist_name` columns
        combined_df.drop_duplicates(subset=['track_name', 'artist_name'], inplace=True)
        
        # Save the combined data back to the CSV without overwriting headers
        combined_df.to_csv(file_name, index=False)
    else:
        # If the file doesn't exist, write the new data with headers
        df.to_csv(file_name, index=False)

# Example usage after collecting the data
save_to_csv_no_duplicates(artist_data_df, "API1.csv")


In [None]:
# Display the DataFrame or save it to a CSV if needed
print(artist_data_df.head())

In [None]:

print(len(artist_data_df))

# Fixing some issues

In [None]:
#Since the timeout function of the API was annoying to deal with and the data has already been collected I'll do this after fix
