In [None]:
from datetime import datetime
import logging
import random
import re
import os
from pathlib import Path

# LangChain
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

# Spotipy
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOAuth

# Etc
import pandas as pd
import requests
from tqdm import tqdm

### Load API KEYS from `.env` file
It should have the following format:
```
OPENAI_API_KEY=tx-GMdXEVIK0GmCudMGHpEqS3DqduEKhMzcEXwpuxt8JyEkRUZ0P
SPOTIPY_CLIENT_ID=gt824gbg80dg5d88b84ed36f09ef8ff6
SPOTIPY_CLIENT_SECRET=e8e8742257e830318e443zc23z320320
SPOTIPY_REDIRECT_URI=http://localhost:8080
```

In [None]:
%load_ext dotenv
%dotenv

##### Verbose request logging, optional (for troubleshooting API errors)

In [None]:
# Configure logging
logging.basicConfig(level=logging.DEBUG)

# The `requests` library logs using the `urllib3` logger.
# logger = logging.getLogger('urllib3')
# logger.setLevel(logging.DEBUG)

class LoggingSession(requests.Session):
    def send(self, request, **kwargs):
        # Convert request to cURL command and log it
        curl_cmd = self.request_to_curl(request)
        logging.debug("cURL command: \n%s\n", curl_cmd)

        # Proceed with sending the request
        return super().send(request, **kwargs)

    def request_to_curl(self, prepared_request):
        parts = ["curl", "-X", prepared_request.method]
    
        # Add the full URL, properly quoted to handle special characters
        parts.append(f"'{prepared_request.url}'")
    
        # Add headers, each header properly quoted
        for key, value in prepared_request.headers.items():
            parts.append(f"-H '{key}: {value}'")
    
        # For GET requests, data is not typically included, but we'll add this for completeness for POST/PUT requests
        if prepared_request.body:
            # Assuming the body is a string, it needs to be properly escaped to be a valid shell argument
            # shlex.quote can be used for this purpose in an actual implementation, but here we'll manually quote for clarity
            body = prepared_request.body.decode('utf-8') if isinstance(prepared_request.body, bytes) else prepared_request.body
            parts.append(f"--data-raw '{body}'")
    
        curl_cmd = " ".join(parts)
        return curl_cmd


logger = logging.getLogger(__name__)

# Initialize the custom logging session
logging_session = LoggingSession()

### Create and authenticate the Spotify API client

In [None]:
auth = SpotifyClientCredentials(
    client_id=os.environ['SPOTIPY_CLIENT_ID'],
    client_secret=os.environ['SPOTIPY_CLIENT_SECRET']
)
scope = ','.join([
    "user-library-read",
    "playlist-read-private",
    "playlist-modify-private"
])

try:
    requests_session = logging_session
except:
    requests_session = True

sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope), requests_session=requests_session)

### Spotify info

In [None]:
# Your user ID
USER_ID = '1251572084'

# The playlist you are scraping
PLAYLIST_ID = '34rMzK9lmMfpgM2Gzx0T1d'
PLAYLIST_PREFIX = sp.playlist(PLAYLIST_ID)['name']

# Number of categories to create
NUM_CATEGORIES = 5

### Get the tracks from a playlist, create a DataFrame, and save it to a file

In [None]:
if Path(f'playlist-{PLAYLIST_ID}.pkl').exists():
    df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
else:
    df = pd.DataFrame()

items = []
offset = 0
page = 100

results = sp.playlist_tracks(playlist_id=PLAYLIST_ID)
total = results['total']

for i in tqdm(range(0, total)):
    if i != 0 and i % page == 0:
        offset += page
        results = sp.playlist_tracks(playlist_id=PLAYLIST_ID, offset=offset)
    
    item = results['items'][i % page]
    track = item['track']

    if track['id'] in df.index:
        continue
    
    d = {'id': track['id'], 
         'uri': track['uri'],
         'popularity': track['popularity'],
         'album': track['album']['name'],
         'artists': [artist['name'] for artist in track['artists']],
         'artists_id': [artist['id'] for artist in track['artists']],
         'name': track['name'],
         'release_date': track['album']['release_date']}
    items.append(d)

if len(items) > 0:
    print(f'Adding {len(items)} new tracks')
    df_new_tracks = pd.DataFrame(items)
    df_new_tracks.set_index('id', inplace=True)
    df = pd.concat([df, df_new_tracks])
    
    df.to_csv(f'playlist-{PLAYLIST_ID}.csv')
    df.to_excel(f'playlist-{PLAYLIST_ID}.xlsx', engine='openpyxl')
    df.to_json(f'playlist-{PLAYLIST_ID}.json', orient='index')
    df.to_pickle(f'playlist-{PLAYLIST_ID}.pkl')
else:
    print('No new tracks')

### Get genre information for each artist

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
all_artists = df['artists_id'].explode().unique().tolist()

if Path('artists.pkl').exists():
    df_artists = pd.read_pickle(f'artists.pkl')
    existing_artists = set(df_artists.index)
    all_artists = [artist for artist in all_artists if artist not in existing_artists]
else:
    df_artists = pd.DataFrame()

total = len(all_artists)

items = []
offset = 0
page = 50

for i in tqdm(range(0, total)):
    if i % page == 0:
        try:
            results = sp.artists(all_artists[offset:offset+page])
        except Exception as e:
            # print(f'Error: {e}')
            break
        offset += page

    artist = results['artists'][i % page]
    d = {'id': artist['id'], 'genres': artist['genres'], 'popularity': artist['popularity']}
    items.append(d)

if len(items) > 0:
    print(f'Adding {len(items)} new artists')
    df_new_artists = pd.DataFrame(items)
    df_new_artists.set_index('id', inplace=True)
    df_artists = pd.concat([df_artists, df_new_artists])
    
    df_artists.to_csv(f'artists.csv')
    df_artists.to_excel(f'artists.xlsx', engine='openpyxl')
    df_artists.to_json(f'artists.json', orient='index')
    df_artists.to_pickle(f'artists.pkl')
else:
    print('No new artists')

### Add genre information to tracks

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
df_artists = pd.read_pickle(f'artists.pkl')

items = []

for i, x in tqdm(df.iterrows(), total=len(df)):
    if 'genres' in x and pd.notna(x['genres']):
        continue
    
    genres = set()
    for artist_id in x['artists_id']:
        artist = df_artists.loc[artist_id]
        for genre in artist['genres']:
            if genre not in genres:
                genres.add(genre)
    items.append({'id': i, 'genres': tuple(genres)})


if len(items) > 0:
    df_genres = pd.DataFrame(items)
    df_genres.set_index('id', inplace=True)
    df = pd.concat([df, df_genres], axis=1)
    
    df.to_csv(f'playlist-{PLAYLIST_ID}.csv')
    df.to_excel(f'playlist-{PLAYLIST_ID}.xlsx', engine='openpyxl')
    df.to_json(f'playlist-{PLAYLIST_ID}.json', orient='index')
    df.to_pickle(f'playlist-{PLAYLIST_ID}.pkl')
    print(f'Added genre information to {len(items)} tracks')
else:
    print('No genre information added.')


### Get audio features for tracks

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
track_ids = df.index.tolist()
if 'acousticness' in df.columns:
    track_ids_with_audio_features = set(df[df['acousticness'].notna()].index)
    track_ids = [track_id for track_id in track_ids if track_id not in track_ids_with_audio_features]

total = len(track_ids)
offset = 0
page = 100

seen = 0
for i in tqdm(range(0, total)):
    seen = i
    if i % page == 0:
        try:
            results = sp.audio_features(track_ids[offset:offset+page])
        except Exception as e:
            # print(f'Error: {e}')
            break
        offset += page
    
    audio_features = results[i % page]
    for feature in ['acousticness', 
                    'danceability', 
                    'duration_ms', 
                    'energy', 
                    'instrumentalness', 
                    'key', 
                    'liveness', 
                    'loudness', 
                    'mode', 
                    'speechiness', 
                    'tempo', 
                    'time_signature', 
                    'valence']:
        df.loc[audio_features['id'], feature] = audio_features[feature]

if seen > 0:
    print(f'Adding {i} new audio features')
    df.to_csv(f'playlist-{PLAYLIST_ID}.csv')
    df.to_excel(f'playlist-{PLAYLIST_ID}.xlsx', engine='openpyxl')
    df.to_json(f'playlist-{PLAYLIST_ID}.json', orient='index')
    df.to_pickle(f'playlist-{PLAYLIST_ID}.pkl')
else:
    print('No new audio features')

### Create shuffled list of genres

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
genres_list = (df[df['genres'].apply(lambda x: len(x) > 0)]['genres']).apply(lambda x: ", ".join(x)).tolist()
random.seed(0)
random.shuffle(genres_list)
genres_text = "\n".join(genres_list)
with open(f'genres_text-{PLAYLIST_ID}.txt', 'w') as fp:
    fp.write(genres_text)
print(genres_text)

### Create categories prompt for LLM

In [None]:
llm = ChatOpenAI(model_name='gpt-4-turbo-preview', temperature=0)
prompt = PromptTemplate.from_template(
    "I have a list of songs, each with one or more genres associated with it. Based on these genres, I would like you to analyze the list and create {num_categories} distinct categories that these songs could be grouped into. Each category should represent a unique theme or commonality found within the genres. Please provide a brief description for each category to explain the common theme or elements that define it.\nPlease output each category using the following format:\n[[NUMBER]]. **[[TITLE]]**: [[DESCRIPTION]]\n\nHere is the list of songs and their associated genres:\n\n{genres_text}",
)

##### Test categories prompt

In [None]:
genres_text = open(f'genres_text-{PLAYLIST_ID}.txt', 'r').read()
print(prompt.format(num_categories=NUM_CATEGORIES, genres_text=genres_text[:500]))

### Prompt LLM for list of categories

In [None]:
genres_text = open(f'genres_text-{PLAYLIST_ID}.txt', 'r').read()

if Path(f'category_output-{PLAYLIST_ID}.txt').exists():
    categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()
else:
    chain = LLMChain(llm=llm, prompt=prompt, verbose=True)
    categories_output = chain.run(num_categories=NUM_CATEGORIES, genres_text=genres_text)
    with open(f'category_output-{PLAYLIST_ID}.txt', 'w') as fp:
        fp.write(categories_output)
print(categories_output)

### Parse category output from LLM

In [None]:
categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()

# Updated regex pattern to ensure entire descriptions are captured
pattern = r"(\d+)\.\s*([^:]+):\s*((?:.(?!\n\s*\d+\.))+.)"

matches = re.findall(pattern, categories_output, re.DOTALL)

categories = {int(num): (name.strip(), desc.strip()) for num, name, desc in matches}

data = [{'category_number': category_number,
         'category_name': category_name,
         'description': description
         } for category_number, (category_name, description) in categories.items()]

df_categories = pd.DataFrame(data)
df_categories.to_csv(f'categories-{PLAYLIST_ID}.csv')
df_categories.to_excel(f'categories-{PLAYLIST_ID}.xlsx', engine='openpyxl')
df_categories.to_json(f'categories-{PLAYLIST_ID}.json', orient='index')
df_categories.to_pickle(f'categories-{PLAYLIST_ID}.pkl')

for category_number, (category_name, description) in categories.items():
    print(f"{category_number}. {category_name}: {description}\n")

### Create categorization prompt for LLM

In [None]:
prompt = PromptTemplate.from_template(
    """Given a list of categories and their descriptions, please determine which category the following song fits best into. Use the song's genres, along with any other relevant information provided, to make your decision. After making your decision, structure your response as follows: Start with "Category number:" followed by the number of the category. Then, on a new line, write "Category name:" followed by the name of the category. Then, on a new line, write "Reasoning:" followed by a brief explanation of why the song fits best in the chosen category. Here are the categories:

{categories_output}

Song Information:

    Name: {name}
    Artists: {artists}
    Album: {album}
    Release Date: {release_date}
    Genres: {genres}
    Popularity: {popularity}
    Danceability: {danceability}
    Energy: {energy}
    Key: {key}
    Loudness: {loudness}
    Mode: {mode}
    Speechiness: {speechiness}
    Acousticness: {acousticness}
    Instrumentalness: {instrumentalness}
    Liveness: {liveness}
    Valence: {valence}
    Tempo: {tempo}
    Duration MS: {duration_ms}
    Time Signature: {time_signature}

Based on the genres listed and any other information you deem relevant from the song information provided, which of the categories does "{name}" by {artists} fit best into? Please explain your reasoning.""",
)

##### Test categorization prompt

In [None]:
categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
track = df.iloc[0]

print(prompt.format(categories_output=categories_output,
                    name=track['name'],
                    artists=", ".join(track['artists']),
                    album=track['album'],
                    release_date=track['release_date'],
                    genres=track['genres'],
                    popularity=track['popularity'],
                    danceability=track['danceability'],
                    energy=track['energy'],
                    key=track['key'],
                    loudness=track['loudness'],
                    mode=track['mode'],
                    speechiness=track['speechiness'],
                    acousticness=track['acousticness'],
                    instrumentalness=track['instrumentalness'],
                    liveness=track['liveness'],
                    valence=track['valence'],
                    tempo=track['tempo'],
                    duration_ms=track['duration_ms'],
                    time_signature=track['time_signature']))

##### Test LLM categorization on single track

In [None]:
categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
track = df.iloc[0]

chain = LLMChain(llm=llm, prompt=prompt, verbose=True)
# tracks = get_tracks(n=250, shuffle=True)
# tracks = format_tracks(tracks)
output = chain.run(categories_output=categories_output,
                   name=track['name'],
                   artists=", ".join(track['artists']),
                   album=track['album'],
                   release_date=track['release_date'],
                   genres=track['genres'],
                   popularity=track['popularity'],
                   danceability=track['danceability'],
                   energy=track['energy'],
                   key=track['key'],
                   loudness=track['loudness'],
                   mode=track['mode'],
                   speechiness=track['speechiness'],
                   acousticness=track['acousticness'],
                   instrumentalness=track['instrumentalness'],
                   liveness=track['liveness'],
                   valence=track['valence'],
                   tempo=track['tempo'],
                   duration_ms=track['duration_ms'],
                   time_signature=track['time_signature'])
print(output)

##### Test track categorization output parsing

In [None]:
# Regular expression pattern to match and extract the required fields
pattern = r"Category number: (\d+)\s+Category name: ([\w\s]+)\s+Reasoning: (.+)"

# Search for the pattern in the LLM output
match = re.search(pattern, output, re.DOTALL)

if match:
    # Extract the category number, name, and reasoning from the match
    category_number = int(match.group(1))
    category_name = match.group(2).strip()
    reasoning = match.group(3).strip()

    print(f"Category Number: {category_number}")
    print(f"Category Name: {category_name}")
    print(f"Reasoning: {reasoning}")
else:
    print("Required fields not found.")

### For each track, prompt LLM to categorize it

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()

chain = LLMChain(llm=llm, prompt=prompt, verbose=False)
for i, track in tqdm(df.iterrows(), total=len(df)):
    if 'category_number' in df.columns and pd.notna(df.loc[i, 'category_number']):
        continue
    
    match = None
    while not match:
        output = chain.run(categories_output=categories_output,
                           name=track['name'],
                           artists=", ".join(track['artists']),
                           album=track['album'],
                           release_date=track['release_date'],
                           genres=track['genres'],
                           popularity=track['popularity'],
                           danceability=track['danceability'],
                           energy=track['energy'],
                           key=track['key'],
                           loudness=track['loudness'],
                           mode=track['mode'],
                           speechiness=track['speechiness'],
                           acousticness=track['acousticness'],
                           instrumentalness=track['instrumentalness'],
                           liveness=track['liveness'],
                           valence=track['valence'],
                           tempo=track['tempo'],
                           duration_ms=track['duration_ms'],
                           time_signature=track['time_signature'])
        match = re.search(pattern, output, re.DOTALL)
    if match:
        category_number = int(match.group(1))
        category_name = match.group(2).strip()
        reasoning = match.group(3).strip()
        df.at[i, 'category_number'] = category_number
        df.at[i, 'category_name'] = category_name
        df.at[i, 'reasoning'] = reasoning
        
        print(f"Category: {category_number} ({category_name}), Name: {track['name']}, Artists: {', '.join(track['artists'])}\nReasoning: {reasoning}")
    else:
        print("Required fields not found.")
        break
    
    df.to_csv(f'playlist-{PLAYLIST_ID}.csv')
    df.to_excel(f'playlist-{PLAYLIST_ID}.xlsx', engine='openpyxl')
    df.to_json(f'playlist-{PLAYLIST_ID}.json', orient='index')
    df.to_pickle(f'playlist-{PLAYLIST_ID}.pkl')

### Create playlists

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
df_categories = pd.read_pickle(f'categories-{PLAYLIST_ID}.pkl')

for category_number, category_name, description in df_categories.itertuples(index=False):
    print(f"Category: {category_number} ({category_name})\nDescription: {description}\n")
    tracks_to_add = []
    tracks_in_category = df[df['category_number'] == category_number]
    for i, x in tracks_in_category.iterrows():
        print(f'{x["name"]} by {", ".join(x["artists"])}')
        tracks_to_add.append(x['uri'])
    print('\n')
    
    timestamp = datetime.now().isoformat()[:16].replace(':', '')
    playlist_name = f"{PLAYLIST_PREFIX} - {category_name.replace('*', '')}_{timestamp}"
    playlist_desc = re.sub(r'\n+', ' ', description)[:512]
    result = sp.user_playlist_create(user=USER_ID, 
                                     name=playlist_name, 
                                     public=False, 
                                     description=playlist_desc)
    created_playlist_id = result['id']

    offset = 0
    while offset < len(tracks_to_add):
        sp.playlist_add_items(playlist_id=created_playlist_id, 
                              items=tracks_to_add[offset:offset+100])
        offset += 100

##### Alternative code to update playlists during categorization

In [None]:
df_categories = pd.read_pickle(f'categories-{PLAYLIST_ID}.pkl')

category_playlist_ids = {}
print('Creating playlists')
for category_number, category_name, description in df_categories.itertuples(index=False):
    timestamp = datetime.now().isoformat()[:16].replace(':', '')
    playlist_name = f"{PLAYLIST_PREFIX} - {category_name.replace('*', '')}_{timestamp}"
    playlist_desc = re.sub(r'\n+', ' ', description)[:512]
    result = sp.user_playlist_create(user=USER_ID,
                                     name=playlist_name,
                                     public=False,
                                     description=playlist_desc)
    created_playlist_id = result['id']
    category_playlist_ids[category_number] = created_playlist_id
    
    print(f'{created_playlist_id}: "{playlist_name}"')

In [None]:
category_playlist_track_ids = set([])
for playlist_id in category_playlist_ids.values():
    results = sp.playlist_tracks(playlist_id)
    category_playlist_track_ids.update([item['track']['id'] for item in results['items']])
    while results['next']:
        results = sp.next(results)
        category_playlist_track_ids.update([item['track']['id'] for item in results['items']])
print(f'Found {len(category_playlist_track_ids)} tracks in category playlists')

In [None]:
df = pd.read_pickle(f'playlist-{PLAYLIST_ID}.pkl')
categories_output = open(f'category_output-{PLAYLIST_ID}.txt', 'r').read()

chain = LLMChain(llm=llm, prompt=prompt, verbose=False)
for i, track in tqdm(df.iterrows(), total=len(df)):
    if i in category_playlist_track_ids:
        continue
    
    if not 'category_number' in df.columns or pd.isna(df.loc[i, 'category_number']):
        match = None
        while not match:
            output = chain.run(categories_output=categories_output,
                               name=track['name'],
                               artists=", ".join(track['artists']),
                               album=track['album'],
                               release_date=track['release_date'],
                               genres=track['genres'],
                               popularity=track['popularity'],
                               danceability=track['danceability'],
                               energy=track['energy'],
                               key=track['key'],
                               loudness=track['loudness'],
                               mode=track['mode'],
                               speechiness=track['speechiness'],
                               acousticness=track['acousticness'],
                               instrumentalness=track['instrumentalness'],
                               liveness=track['liveness'],
                               valence=track['valence'],
                               tempo=track['tempo'],
                               duration_ms=track['duration_ms'],
                               time_signature=track['time_signature'])
            match = re.search(pattern, output, re.DOTALL)
        if match:
            category_number = int(match.group(1))
            category_name = match.group(2).strip()
            reasoning = match.group(3).strip()
            df.at[i, 'category_number'] = category_number
            df.at[i, 'category_name'] = category_name
            df.at[i, 'reasoning'] = reasoning
    
            print(f"Category: {category_number} ({category_name}), Name: {track['name']}, Artists: {', '.join(track['artists'])}\nReasoning: {reasoning}")
        else:
            print("Required fields not found.")
            break

    category_number = df.at[i, 'category_number']
    sp.playlist_add_items(category_playlist_ids[category_number], [track['uri']])

    df.to_csv(f'playlist-{PLAYLIST_ID}.csv')
    df.to_excel(f'playlist-{PLAYLIST_ID}.xlsx', engine='openpyxl')
    df.to_json(f'playlist-{PLAYLIST_ID}.json', orient='index')
    df.to_pickle(f'playlist-{PLAYLIST_ID}.pkl')