	# Author: Alexander Staub
	## Last changed: 2025.02.18
	## Purpose: Using the chartmetric IDs to get song level metadata


In [36]:
#installing packages
import time
import requests
import logging
import pandas as pd
from pprint import pprint
from datetime import datetime

In [5]:
#Setup the logging of the errors
logging.basicConfig(
    filename='chartmetric_api_metadata.log',
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s'
)

In [6]:
# Define API host and your refresh token
HOST = 'https://api.chartmetric.com'
with open("chartmetric_refresh_token.txt", "r") as f:
    REFRESH_TOKEN = f.read().strip()

In [7]:
# Retrieve an access token using the refresh token
token_response = requests.post(f'{HOST}/api/token', json={'refreshtoken': REFRESH_TOKEN})

# Check if the token was retrieved successfully
if token_response.status_code != 200:

    # Log the error and raise an exception
    logging.error(f"Token retrieval error: {token_response.status_code}")
    raise Exception(f"Error: received {token_response.status_code} from /api/token")

# Extract the access token from the response
access_token = token_response.json()['token']

# Define the headers for the API requests
headers = {'Authorization': f'Bearer {access_token}'}

# Defining the get_request

Robust request logic that:
- backs off for a max of 26 hours in retries
- logs all erros it encounters


In [8]:


# --- Robust get_request Function ---
def get_request(endpoint, params=None, max_retries=5):
    backoff = 1  # initial backoff in seconds (used if header data is missing)
    for attempt in range(max_retries):
        try:
            response = requests.get(f"{HOST}{endpoint}", headers=headers, params=params)
        except Exception as ex:
            logging.error(f"Network error on attempt {attempt+1} for {endpoint}: {ex}")
            time.sleep(backoff)
            backoff *= 2
            continue

# Log the response status code and rate limit headers
        logging.info(f"Request to {endpoint} returned {response.status_code}. RateLimit headers: {response.headers}")

# Check if the response status code is 200
        if response.status_code == 200:
            return response.json()

# Handle different types of errors
# 401: Token may have expired; refresh it
        elif response.status_code == 401:
            # Token may have expired; refresh it
            logging.warning(f"401 error for {endpoint}. Refreshing token.")
            token_response = requests.post(f'{HOST}/api/token', json={'refreshtoken': REFRESH_TOKEN})
            if token_response.status_code != 200:
                logging.error(f"Token refresh failed: {token_response.status_code}")
                raise Exception(f"Token refresh failed with status {token_response.status_code}")
            new_token = token_response.json()['token']
            headers['Authorization'] = f'Bearer {new_token}'
            time.sleep(backoff)
            backoff *= 2

# 429: Rate limit exceeded; wait and retry
        elif response.status_code == 429:
            # Rate limit exceeded.
            reset_timestamp = response.headers.get("X-RateLimit-Reset")
            if reset_timestamp:
                # Wait until the time provided by the API
                sleep_time = int(reset_timestamp) - int(time.time())
                if sleep_time < 0:
                    sleep_time = backoff
            else:
                # No wait time provided by the API; compute one that totals 26 hours over all retries.
                total_wait_limit = 26 * 3600  # total wait time in seconds (26 hours)
                # Sum exponential weights for remaining attempts: for i from current attempt to max_retries-1
                remaining_weights = sum(2 ** i for i in range(attempt, max_retries))
                # Use the weight for the current attempt to assign a fraction of the total wait.
                sleep_time = total_wait_limit * (2 ** attempt / remaining_weights)
            logging.warning(f"429 error for {endpoint}. Sleeping for {sleep_time} seconds (attempt {attempt+1}/{max_retries}).")
            time.sleep(sleep_time)
            backoff *= 2

# 500: Server error; wait and retry
        elif response.status_code >= 500:
            logging.warning(f"Server error {response.status_code} for {endpoint}. Retrying after {backoff} seconds.")
            time.sleep(backoff)
            backoff *= 2

        else:
            logging.error(f"Error {response.status_code} for {endpoint}: {response.text}")
            raise Exception(f"Error: received {response.status_code} from {endpoint}")

# If the loop completes without returning, raise an exception
    raise Exception(f"Max retries exceeded for endpoint {endpoint}")

# Use chartmetric ID to access song characteristics:
- use the chartmetric ID file to get the songs for which we have chartmetric ID
- loop over chartmetric id to access the track metadata endpoint and retreive song level of relevance


In [47]:
import pandas as pd

# the different lists of ids
chartmetric_ids_spotify_1 = pd.read_csv("Z:/Data_alexander/data/incidental/chartmetric/chartmetric_ids_spotify_sample_1.csv")
chartmetric_ids_spotify_2 = pd.read_csv("Z:/Data_alexander/data/incidental/chartmetric/chartmetric_ids_spotify_sample_2.csv")
chartmetric_ids_spotify_3 = pd.read_csv("Z:/Data_alexander/data/incidental/chartmetric/chartmetric_ids_spotify_sample_3.csv")

# concatenate the dataframes into one
chartmetric_ids_spotify = pd.concat([chartmetric_ids_spotify_1, chartmetric_ids_spotify_2, chartmetric_ids_spotify_3])

In [48]:
#retain only the rows with unique chartmetric_ids
chartmetric_ids_spotify = chartmetric_ids_spotify.drop_duplicates(subset="chartmetric_ids")

#retain only 10 rows of the dataframe
#chartmetric_ids_spotify = chartmetric_ids_spotify.head(10)

# Reset the index to ensure unique indexing
chartmetric_ids_spotify.reset_index(drop=True, inplace=True)

# Code to retreive the song metadata from chartmetrics

- create the get request
- run the loop over each chartmetric id
- save the response for later parsing

In [42]:
# --- Function to Retrieve song characteristics from Chartmetric ID ---
def get_songchars_ids(chartmetric_id):
    endpoint = f"/api/track/{chartmetric_id}"
    try:
        response = get_request(endpoint)
        logging.info(f"Successfully retrieved song chars for Chartmetric ID {chartmetric_id}")
    except Exception as e:
        logging.error(f"Failed to get song chars for Chartmetric ID {chartmetric_id}: {e}")
        return None

    # The API response (a dictionary) is returned as is
    song_chars = response
    return song_chars

In [None]:
# --- Step 1: Loop over the DataFrame to retrieve API responses ---
# Assume spotify_sample is your existing DataFrame.
# The responses will be stored in the list below.
song_chars_responses = []  # List to store API responses for each Chartmetric ID.
checkpoint_interval = 100  # Save a checkpoint every 100 processed rows.
checkpoint_file = "Z:/Data_alexander/data/incidental/chartmetric/song_chars_checkpoint.json"  # Checkpoint file for responses.

for idx, row in chartmetric_ids_spotify.iterrows():
    chartmetric_id = row.get("chartmetric_ids")
    print(f"Processing row {idx}: Chartmetric ID = {chartmetric_id}")
    logging.info(f"Processing row {idx}: Chartmetric ID = {chartmetric_id}")
    
    if pd.isnull(chartmetric_id):
        print(f"Row {idx} has no Chartmetric ID. Skipping.")
        logging.info(f"Row {idx} has no Chartmetric ID. Skipping.")
        continue
    
    try:
        song_chars = get_songchars_ids(chartmetric_id)
    except Exception as e:
        print(f"Error processing Chartmetric ID {chartmetric_id} at row {idx}: {e}")
        logging.error(f"Error processing Chartmetric ID {chartmetric_id} at row {idx}: {e}")
        song_chars = None
    
    # Append the response (or None) to our list.
    song_chars_responses.append(song_chars)
    
    print(f"Row {idx} processed: Chartmetric ID = {chartmetric_id}")
    logging.info(f"Processed row {idx}: Chartmetric ID = {chartmetric_id}")
    
    # Sleep briefly to help with rate limiting.
    time.sleep(0.1)
    
    # Save a checkpoint periodically.
    if idx % checkpoint_interval == 0 and idx > 0:
        with open(checkpoint_file, "w") as f:
            json.dump(song_chars_responses, f, indent=2)
        print(f"Checkpoint saved at row {idx}")
        logging.info(f"Checkpoint saved at row {idx}")

Processing row 0: Chartmetric ID = 15812884.0
Row 0 processed: Chartmetric ID = 15812884.0
Processing row 1: Chartmetric ID = 15440434.0
Row 1 processed: Chartmetric ID = 15440434.0
Processing row 2: Chartmetric ID = 20706768.0
Row 2 processed: Chartmetric ID = 20706768.0
Processing row 3: Chartmetric ID = 12820755.0
Row 3 processed: Chartmetric ID = 12820755.0
Processing row 4: Chartmetric ID = 15447513.0
Row 4 processed: Chartmetric ID = 15447513.0
Processing row 5: Chartmetric ID = 15440175.0
Row 5 processed: Chartmetric ID = 15440175.0
Processing row 6: Chartmetric ID = 12486440.0
Row 6 processed: Chartmetric ID = 12486440.0
Processing row 7: Chartmetric ID = 44970155.0


In [44]:
# define a function to extract necessary information from the search output
def extract_song_info(search_output):
    # Extract the main object
    obj = search_output.get('obj', {})
    
    # Artist: take first artist if available
    if obj.get('artists') and len(obj['artists']) > 0:
        artist = obj['artists'][0]
        artist_id = artist.get('id', None)
        artist_name = artist.get('name', None)
        artist_label = artist.get('label', None)
        artist_booking_agent = artist.get('booking_agent', None)
        artist_general_manager = artist.get('general_manager', None)
    else:
        artist_id = artist_name = artist_label = artist_booking_agent = artist_general_manager = None

    # Albums: select the album with the earliest release date
    if obj.get('albums') and len(obj['albums']) > 0:
        def parse_date(album):
            try:
                return datetime.strptime(album.get('release_date', ''), '%Y-%m-%d')
            except Exception:
                return datetime.max
        sorted_albums = sorted(obj['albums'], key=parse_date)
        earliest_album = sorted_albums[0]
        album_id = earliest_album.get('id', None)
        album_name = earliest_album.get('name', None)
        album_release_date = earliest_album.get('release_date', None)
        album_label = earliest_album.get('label', None)
    else:
        album_id = album_name = album_release_date = album_label = None

    # Use a pipe '|' as delimiter for multiple values
    delimiter = '|'
    
    # Moods: concatenate mood names
    if obj.get('moods') and len(obj['moods']) > 0:
        moods = delimiter.join([m.get('name', '') for m in obj['moods']])
    else:
        moods = None
    
    # Activities: concatenate activity names
    if obj.get('activities') and len(obj['activities']) > 0:
        activities = delimiter.join([a.get('name', '') for a in obj['activities']])
    else:
        activities = None
    
    # Songwriters: concatenate songwriter names
    if obj.get('songwriters') and len(obj['songwriters']) > 0:
        songwriters = delimiter.join(obj['songwriters'])
    else:
        songwriters = None
    
    # songwriterIds is not present in the example so we assign None
    songwriterIds = None

    # Create a one-row DataFrame with the desired columns
    data = {
        'chartmetric_ids': obj.get('id', None),
        'Name': obj.get('name', None),
        'Composer_name': obj.get('composer_name', None),
        'Artist_id': artist_id,
        'Artist_name': artist_name,
        'Artist_label': artist_label,
        'Artist_booking_agent': artist_booking_agent,
        'Artist_general_manager': artist_general_manager,
        'Albums_id': album_id,
        'Albums_name': album_name,
        'Albums_release_date': album_release_date,
        'Albums_label': album_label,
        'Tags': obj.get('tags', None),
        'Moods': moods,
        'Activities': activities,
        'Songwriters': songwriters,
        'songwriterIds': songwriterIds,
        'Tempo': obj.get('tempo', None),
        'Duration_ms': obj.get('duration_ms', None)
    }
    
    return pd.DataFrame([data])

In [45]:
# --- Step 2: Parse the Collected API Responses ---
# Assume the extraction function 'extract_song_info' is defined (as provided previously).
extracted_rows = []
for resp in song_chars_responses:
    if resp is not None:
        # Extract song information from the response.
        extracted_df = extract_song_info(resp)
        extracted_rows.append(extracted_df)

if extracted_rows:
    song_chars_extracted = pd.concat(extracted_rows, ignore_index=True)
else:
    song_chars_extracted = pd.DataFrame()

  song_chars_extracted = pd.concat(extracted_rows, ignore_index=True)


In [None]:
# --- Step 3: Join the Extracted Data to the Original DataFrame ---
# It is assumed that the 'id' column in song_chars_extracted matches the 'id' column in spotify_sample.
merged_song_chars = chartmetric_ids_spotify.merge(song_chars_extracted, on="chartmetric_ids", how="left")

In [None]:
#safe the final dataframe
# Save as JSON (records-oriented with one JSON object per line)
merged_song_chars.to_json("Z:/Data_alexander/data/incidental/chartmetric/final_joined_data.json", orient="records", lines=True)

# The code used to derive an example and extract information

In [26]:
# --- Function to Retrieve song characteristics from Chartmetric ID ---
def get_songchars_ids(chartmetric_id):
    endpoint = f"/api/track/{chartmetric_id}"
    try:
        response = get_request(endpoint)
        # Log the response status code and rate limit headers
        logging.info(f"Successfully retrieved song chars for chartmetric id {chartmetric_id}")
    except Exception as e:
        logging.error(f"Failed to get song chars for chartmetric id {chartmetric_id}: {e}")
        return None

    # Extract the song characteristics from the response
    song_chars = response

    return song_chars

In [34]:
pprint(chartmetric_ids_spotify["chartmetric_ids"][4])

15447513.0


In [None]:
# trial run with a single ID 

search_output = get_songchars_ids(chartmetric_ids_spotify["chartmetric_ids"][0])


pprint(search_output)

In [40]:
#trial the function with the search output

test_df = extract_song_info(search_output)