In [None]:
!pip install python-Levenshtein

In [None]:
# NOTE:  !!! THIS SCRIPT DOESN'T ALIGN WITH SPOTIFY USAGE TERMS !!! #

import requests
import json
from bs4 import BeautifulSoup
from Levenshtein import distance
from concurrent.futures import ThreadPoolExecutor, as_completed

# Constants
THRESHOLD = 50
PLAY_ENDPOINT = "play"
PAUSE_ENDPOINT = "pause"
RESUME_ENDPOINT = "resume"
SEARCH_ENDPOINT = "search"
BASE_URL = "https://api.spotify.com/v1"
COMMAND_URL_TEMPLATE = "https://gae2-spclient.spotify.com/connect-state/v1/player/command/from/{}/to/{}"
tokens = []  # List to store your sp_dc tokens from cookies for controlling your Spotify devices

# How to Retrieve Your `sp_dc` Token:

# 1. Open your web browser and go to https://open.spotify.com.
# 2. Log in to your Spotify account if you haven't already.
# 3. Open Developer Tools in your browser:
#    - For Google Chrome / Mozilla Firefox: press Ctrl + Shift + I (Windows/Linux) or Cmd + Option + I (Mac).
#    - For Microsoft Edge: press F12.
# 4. Navigate to the "Application" tab (in Chrome/Edge) or "Storage" tab (in Firefox).
#    - In the left sidebar, look for "Cookies" under the "Storage" section.
#    - Click on `https://open.spotify.com` to view the list of cookies.
# 5. Locate the cookie named `sp_dc`.
#    - The value of the `sp_dc` cookie is your token.
# 6. Copy the value of the `sp_dc` token and paste it into the `tokens` list in the code above.


# Function to calculate Levenshtein distance
def levenshtein_distance(s1, s2):
    return distance(s1, s2)

# Function for fuzzy matching
def fuzzy_match(input_str, command_list, threshold=THRESHOLD):
    return min(command_list, key=lambda command: levenshtein_distance(input_str, command.lower())) \
        if any(levenshtein_distance(input_str, command.lower()) <= threshold for command in command_list) else None

# Function to retrieve Spotify access token
def get_access_token(token):
    url = "https://open.spotify.com/"
    headers = {'cookie': f'sp_dc={token};'}
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.content, 'html.parser')
    script_tag = soup.find('script', {'id': 'session'})
    json_data = script_tag.string.strip()
    return json.loads(json_data)['accessToken']

# Function to get user information
def get_user_info(access_token):
    url = f"{BASE_URL}/me"
    headers = {'Authorization': f'Bearer {access_token}'}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()["email"]

# Function to get devices
def get_devices(access_token):
    url = f"{BASE_URL}/me/player/devices"
    headers = {'Authorization': f'Bearer {access_token}'}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json().get("devices", [])

# Function to send command to device
def send_command_to_device(user, endpoint, context_url=None):
    url = COMMAND_URL_TEMPLATE.format(user['device_id'], user['device_id'])
    payload = {"command": {"endpoint": endpoint}}
    if context_url:
        payload["command"]["context"] = {"uri": context_url, "url": f"context://{context_url}", "metadata": {}}
    headers = {'Authorization': f'Bearer {user["access_token"]}'}
    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    return response.json()

# Function to search tracks for a single user
def search_tracks_for_user(user, query):
    url = f"{BASE_URL}/search?q={query}&type=track"
    headers = {'Authorization': f'Bearer {user["access_token"]}'}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json().get("tracks", {}).get("items", [])

# Function to search tracks across multiple users
def search_tracks(users, query):
    with ThreadPoolExecutor(max_workers=len(users)) as executor:
        futures = [executor.submit(search_tracks_for_user, user, query) for user in users]
        results = []
        for future in as_completed(futures):
            results.extend(future.result())
    return results

# Function to display devices
def display_devices(devices, user):
    print(f"Found {len(devices)} device(s) for {user}:")
    for i, device in enumerate(devices):
        print(f"{i + 1}: {device['name']}")

# Function to get user devices
def get_user_devices():
    users = []
    with ThreadPoolExecutor(max_workers=len(tokens)) as executor:
        futures = {executor.submit(get_access_token, token): token for token in tokens}
        for future in as_completed(futures):
            try:
                access_token = future.result()
                user_email = get_user_info(access_token)
                devices = get_devices(access_token)
                if devices:
                    if len(devices) == 1:
                        device_id = devices[0]["id"]
                        device_name = devices[0]["name"]
                        print(f"Auto Selected {device_name} for user {user_email}")
                    else:
                        display_devices(devices, user_email)
                        device_id_input = int(input("\nEnter the ID number of the device to control: "))
                        device_id = devices[device_id_input - 1]["id"]
                    users.append({"access_token": access_token, "device_id": device_id})
                else:
                    print(f"No devices found for user {user_email}")
            except Exception as e:
                print(f"Error retrieving devices for token {futures[future]}: {e}")
    return users

# Function to execute command across multiple users
def execute_command(users, command, context_url=None):
    with ThreadPoolExecutor(max_workers=len(users)) as executor:
        futures = [executor.submit(send_command_to_device, user, command, context_url) for user in users]
        for future in as_completed(futures):
            try:
                future.result()
                print("Command executed successfully for one user!")
            except Exception as e:
                print(f"Error executing command for a user: {e}")

# Main function
def main():
    print("\nWelcome to Spotify Group Control Center!")
    print("You can control playback on all your Spotify devices at once.")
    print("Enter 'play', 'pause', 'resume', 'search', or 'exit' to quit.")

    users = get_user_devices()

    while True:
        action_input = input("\nEnter your command: ").lower()
        matched_command = fuzzy_match(action_input, [SEARCH_ENDPOINT, PLAY_ENDPOINT, PAUSE_ENDPOINT, RESUME_ENDPOINT, "exit"])

        if matched_command in {PAUSE_ENDPOINT, RESUME_ENDPOINT}:
            execute_command(users, matched_command)
        elif matched_command == PLAY_ENDPOINT:
            song_uri = input("Enter Song URI: ")
            execute_command(users, PLAY_ENDPOINT, context_url="spotify" + song_uri.replace("https://open.spotify.com", "").replace("/", ":").split("?")[0])
        elif matched_command == SEARCH_ENDPOINT:
            query = input("Enter Search query: ")
            tracks = search_tracks(users, query)
            if not tracks:
                print("No search results found.")
            else:
                print(f"{len(tracks)} result(s) found:")
                for j, track in enumerate(tracks):
                    print(f"{j + 1}: {track['name']}")
                song_id_input = int(input("Enter the number of the song to play, or 0 to exit search: ")) - 1
                if 0 <= song_id_input < len(tracks):
                    song_uri = tracks[song_id_input]["uri"]
                    execute_command(users, PLAY_ENDPOINT, context_url=song_uri)
                elif song_id_input == -1:
                    print("Search closed.")
                else:
                    print("Invalid Song number.")
        elif matched_command == "exit":
            print("Exiting Spotify Group Control Center.")
            break
        else:
            print("Invalid command. Please enter 'play', 'pause', 'resume', 'search', or 'exit'.")

if __name__ == "__main__":
    main()
