In [1]:
from dotenv import load_dotenv
import os
import base64
from requests import post, get
import json
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
import pickle
import time
import logging
from googleapiclient.errors import HttpError
import random



ModuleNotFoundError: No module named 'dotenv'

In [None]:
# Load environment variables from .env file
load_dotenv()

# Spotify credentials
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")

# YouTube API credentials
youtube_client_secrets = 'client_secrets.json'
youtube_scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]

def get_spotify_token():
    auth_string = client_id + ":" + client_secret
    auth_bytes = auth_string.encode("utf-8")
    auth_base64 = str(base64.b64encode(auth_bytes), "utf-8")

    url = "https://accounts.spotify.com/api/token"
    headers = {
        "Authorization": "Basic " + auth_base64,
        "Content-Type": "application/x-www-form-urlencoded"
    }
    data = {"grant_type": "client_credentials"}
    result = post(url, headers=headers, data=data)
    json_result = result.json()
    token = json_result["access_token"]
    return token

def get_spotify_auth_header(token):
    return {"Authorization": "Bearer " + token}

def get_spotify_playlist_items(playlist_id, token):
    url = f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks"
    headers = get_spotify_auth_header(token)
    result = get(url, headers=headers)
    return result.json()

def extract_spotify_playlist_data(playlist_data):
    tracks_info = {}
    for item in playlist_data['items']:
        track = item['track']
        track_name = track['name']
        artist_names = [artist['name'] for artist in track['artists']]
        tracks_info[track_name] = artist_names
    return tracks_info

def extract_playlist_id(playlist_url_or_id):
    if 'spotify.com' in playlist_url_or_id:
        return playlist_url_or_id.split('/')[-1].split('?')[0]
    return playlist_url_or_id



In [None]:
def get_authenticated_youtube_service():
    creds = None
    if os.path.exists('token.pickle'):
        with open('token.pickle', 'rb') as token:
            creds = pickle.load(token)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(youtube_client_secrets, youtube_scopes)
            creds = flow.run_local_server(port=0)
        with open('token.pickle', 'wb') as token:
            pickle.dump(creds, token)
    return build('youtube', 'v3', credentials=creds)

def search_youtube(youtube, query):
    request = youtube.search().list(
        part="snippet",
        q=query,
        maxResults=1
    )
    response = request.execute()
    return response

def create_youtube_playlist(youtube, title, description):
    request = youtube.playlists().insert(
        part="snippet,status",
        body={
            "snippet": {
                "title": title,
                "description": description
            },
            "status": {
                "privacyStatus": "private"
            }
        }
    )
    response = request.execute()
    return response["id"]



In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def add_video_to_youtube_playlist(youtube, playlist_id, video_id, retries=5):
    for attempt in range(retries):
        try:
            request = youtube.playlistItems().insert(
                part="snippet",
                body={
                    "snippet": {
                        "playlistId": playlist_id,
                        "resourceId": {
                            "kind": "youtube#video",
                            "videoId": video_id
                        }
                    }
                }
            )
            response = request.execute()
            return response
        except HttpError as e:
            if e.resp.status in [500, 502, 503, 504, 409]:
                wait_time = (2 ** attempt) + (random.randint(0, 1000) / 1000)
                logger.warning(f"Temporary error {e.resp.status}, retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                logger.error(f"Failed with error {e.resp.status}: {e}")
                raise
    raise Exception("Exceeded maximum retries")

In [None]:
def main():
    # Get Spotify token and fetch playlist items
    spotify_token = get_spotify_token()
    playlist_url_or_id = '6X3pGyGPgeLJ0Chre2bvjt'  # Replace with your Spotify playlist URL or ID
    spotify_playlist_id = extract_playlist_id(playlist_url_or_id)
    spotify_playlist_items = get_spotify_playlist_items(spotify_playlist_id, spotify_token)
    spotify_tracks = extract_spotify_playlist_data(spotify_playlist_items)

    # Get authenticated YouTube service
    youtube = get_authenticated_youtube_service()

    # Create a new YouTube playlist
    youtube_playlist_id = create_youtube_playlist(youtube, "My Spotify Playlist", "Playlist created from Spotify tracks")

    # Search YouTube for each track and add it to the playlist
    for track, artists in spotify_tracks.items():
        search_query = f"{track} {' '.join(artists)}"
        try:
            youtube_result = search_youtube(youtube, search_query)
            if youtube_result['items']:
                video_id = youtube_result['items'][0]['id']['videoId']
                add_video_to_youtube_playlist(youtube, youtube_playlist_id, video_id)
                print(f"Added {track} by {', '.join(artists)} to YouTube playlist.")
            else:
                print(f"No YouTube result found for {track} by {', '.join(artists)}.")
        except Exception as e:
            logger.error(f"Error processing {track} by {', '.join(artists)}: {e}")

if __name__ == "__main__":
    main()