<a href="https://colab.research.google.com/github/WelkyToubab/WelkyToubab-s-DIM-Toolshed/blob/main/SpotifyUserPlaylistGrab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import requests
import json
import time

# === USER INPUT (Replace these with your credentials) ===
CLIENT_ID = "b810b15507704ed6a333f2ec4b262872"
CLIENT_SECRET = "ed7d3d42648e444c9a610bcd02daa28b"
REDIRECT_URI = "http://127.0.0.1:8888/callback/"
USER_ID = "122419702"  # Your Spotify account ID
TARGET_USERS = ["numerogroup", "caspersmits"]  # The two target users' Spotify IDs
NEW_PLAYLIST_NAME = "uno"  # Name for the new playlist

# === SPOTIFY AUTHORIZATION ===
AUTH_URL = "https://accounts.spotify.com/api/token"
MAX_RETRIES = 5  # Max retries for handling rate limits and timeouts

def get_access_token():
    response = requests.post(AUTH_URL, {
        'grant_type': 'client_credentials',
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET
    })

    if response.status_code == 200:
        return response.json()["access_token"]
    else:
        print("Failed to get access token:", response.text)
        return None

ACCESS_TOKEN = get_access_token()
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}"}

# === RATE LIMIT & TIMEOUT HANDLING ===
def handle_rate_limit(response, retries):
    if response.status_code == 429:  # Rate limit status code
        retry_after = int(response.headers.get("Retry-After", 1))
        print(f"Rate limit exceeded. Retrying after {retry_after} seconds...")
        time.sleep(retry_after)
        return True
    elif response.status_code in [500, 502, 503, 504]:  # Server errors and timeouts
        if retries < MAX_RETRIES:
            print(f"Server error {response.status_code}. Retrying...")
            time.sleep(2 ** retries)  # Exponential backoff
            return True
    return False

# === GET PLAYLISTS FROM USERS ===
def get_user_playlists(user_id, retries=0):
    playlists = []
    url = f"https://api.spotify.com/v1/users/{user_id}/playlists"

    while url:
        response = requests.get(url, headers=HEADERS)

        if handle_rate_limit(response, retries):
            return get_user_playlists(user_id, retries + 1)

        if response.status_code == 200:
            data = response.json()
            playlists.extend(data["items"])
            url = data.get("next")  # Pagination handling
        else:
            print(f"Error fetching playlists: {response.status_code} - {response.text}")
            return []

    return playlists

# === GET TRACKS FROM A PLAYLIST ===
def get_playlist_tracks(playlist_id, retries=0):
    tracks = []
    url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks"

    while url:
        response = requests.get(url, headers=HEADERS)

        if handle_rate_limit(response, retries):
            return get_playlist_tracks(playlist_id, retries + 1)

        if response.status_code == 200:
            data = response.json()
            for item in data["items"]:
                if item["track"]:  # Ensure track exists
                    tracks.append({
                        "id": item["track"]["id"],
                        "name": item["track"]["name"],
                        "artist": item["track"]["artists"][0]["name"]
                    })
            url = data.get("next")  # Pagination handling
        else:
            print(f"Error fetching playlist tracks: {response.status_code} - {response.text}")
            return []

    return tracks

# === CREATE NEW PLAYLIST ===
def create_playlist(retries=0):
    url = f"https://api.spotify.com/v1/users/{USER_ID}/playlists"
    data = {
        "name": NEW_PLAYLIST_NAME,
        "public": False,
        "description": "Merged playlist from multiple users."
    }

    response = requests.post(url, headers=HEADERS, json=data)

    if handle_rate_limit(response, retries):
        return create_playlist(retries + 1)

    if response.status_code == 201:
        return response.json()["id"]
    else:
        print(f"Error creating playlist: {response.status_code} - {response.text}")
        return None

# === ADD TRACKS TO PLAYLIST ===
def add_tracks_to_playlist(playlist_id, track_ids, retries=0):
    url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks"

    for i in range(0, len(track_ids), 100):  # Spotify allows max 100 tracks per request
        response = requests.post(url, headers=HEADERS, json={"uris": [f"spotify:track:{tid}" for tid in track_ids[i:i+100]]})

        if handle_rate_limit(response, retries):
            return add_tracks_to_playlist(playlist_id, track_ids, retries + 1)

        if response.status_code != 201:
            print(f"Error adding tracks: {response.status_code} - {response.text}")
        time.sleep(1)  # Avoid hitting API limits

# === MAIN FUNCTION ===
def main():
    all_playlists = []

    # Fetch all playlists from both users
    for user in TARGET_USERS:
        all_playlists.extend(get_user_playlists(user))

    # Sort playlists by creation date (oldest first)
    all_playlists.sort(key=lambda x: x["id"])  # ID sorting is a workaround, as creation dates aren't provided

    # Get all unique tracks
    track_set = {}
    for playlist in all_playlists:
        tracks = get_playlist_tracks(playlist["id"])

        for track in tracks:
            track_set[track["id"]] = track  # Avoid duplicates by using a dictionary

    unique_tracks = list(track_set.keys())

    # Create new playlist
    new_playlist_id = create_playlist()
    if new_playlist_id:
        add_tracks_to_playlist(new_playlist_id, unique_tracks)
        print(f"Playlist '{NEW_PLAYLIST_NAME}' created successfully with {len(unique_tracks)} unique tracks.")
    else:
        print("Failed to create playlist.")

# Run script
if ACCESS_TOKEN:
    main()
else:
    print("Script terminated due to missing access token.")

Error creating playlist: 401 - {"error": {"status": 401, "message": "Valid user authentication required" } }
Failed to create playlist.
