<a href="https://colab.research.google.com/github/aatishgupta25/playlist_migrator/blob/main/Playlist_Migrator_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# All Imports :

import pprint
import time
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError


# Getting the tracks from Spotify


Spotify API setup

In [None]:

SPOTIPY_CLIENT_ID = 'YOUR_SPOTIFY_CLIENT_ID'
SPOTIPY_CLIENT_SECRET = 'YOUR_SPOTIFY_CLIENT_SECRET'
SPOTIPY_REDIRECT_URI = 'http://localhost:8888/callback'

sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
    client_id=SPOTIPY_CLIENT_ID,
    client_secret=SPOTIPY_CLIENT_SECRET,
    redirect_uri=SPOTIPY_REDIRECT_URI,
    scope='playlist-read-private',
    open_browser=False))


To store all songs from a playlist in a dictionary while handling rate-limits


In [None]:
def fetch_spotify_tracks(playlist_id):
    results = sp.playlist_tracks(playlist_id)
    tracks = results['items']
    while results['next']:
        try:
            results = sp.next(results)
            tracks.extend(results['items'])
        except spotipy.exceptions.SpotifyException as e:
            if e.http_status == 429:
                retry_after = int(e.headers.get('Retry-After', 5))
                print(f"Rate Limit hit, sleeping for {retry_after} seconds")
                time.sleep(retry_after)
            else:
                raise e
    return tracks



Get Spotify playlist ID from user


In [None]:
playlist_id = input("Enter Spotify Playlist ID: ")
tracks = fetch_spotify_tracks(playlist_id)

songs = []
for item in tracks:
    track = item['track']
    song_name = track['name']
    artist_name = track['artists'][0]['name']
    songs.append((song_name, artist_name))

# To display all the songs fromm the given playlist ID
pprint.pp(songs)



Disconnect Spotify client

In [None]:

token_info = sp.auth_manager.get_cached_token()
if token_info:
    sp.auth_manager.refresh_access_token(token_info['refresh_token'])
    sp.auth_manager.cache_handler.save_token_to_cache(None)
    print("Spotify client disconnected and token cleared.")
else:
    print("No active Spotify session to disconnect.")

sp = None


# Adding tracks to the new Youtube Playlist


YouTube API setup

In [None]:

scopes = ["https://www.googleapis.com/auth/youtube.force-ssl"]

def get_authenticated_service():
    flow = Flow.from_client_secrets_file(
        'YOUR_YOUTUBE_CLIENT_SECRETS_FILE.json',
        scopes=scopes)
    flow.redirect_uri = 'https://localhost'

    auth_url, _ = flow.authorization_url(prompt='consent')

    print(f"Please go to this URL: {auth_url}")
    code = input("Enter the authorization code: ")

    flow.fetch_token(code=code)

    return build('youtube', 'v3', credentials=flow.credentials)

youtube = get_authenticated_service()

print("Authentication successful!")

# Test YouTube connection
try:
    request = youtube.channels().list(
        part="snippet,contentDetails,statistics",
        mine=True
    )
    response = request.execute()
    print("Connection to YouTube API successful!")
except HttpError as e:
    print(f"An HTTP error occurred: {e.resp.status} {e.content}")
except Exception as e:
    print(f"An error occurred: {e}")



To retrieve youtube video information for each of the entry in the songs dictionary


In [None]:
def search_youtube(song_name, artist_name):
    query = f"{song_name} {artist_name}"
    while True:
        try:
            request = youtube.search().list(
                part="snippet",
                maxResults=1,
                q=query
            )
            response = request.execute()
            return response['items'][0]['id']['videoId']
        except HttpError as e:
            if e.resp.status == 403:
                print("YouTube API rate limit exceeded, retrying after 10 seconds.")
                time.sleep(10)
            else:
                raise e



Get new YouTube playlist details from user


In [None]:
playlist_title = input("Enter your new YouTube Playlist name: ")
playlist_description = input("Enter playlist description: ")

request = youtube.playlists().insert(
    part="snippet,status",
    body={
        "snippet": {
            "title": playlist_title,
            "description": playlist_description,
        },
        "status": {"privacy_status": "private"}
    }
)

response = request.execute()
youtube_playlist_id = response['id']



To actually add the youtube video for the respective song in the new Playlist while handling rate limits

In [None]:
def add_to_youtube_playlist(playlist_id, video_id):
    while True:
        try:
            request = youtube.playlistItems().insert(
                part="snippet",
                body={
                    "snippet": {
                        "playlistId": playlist_id,
                        "resourceId": {
                            "kind": "youtube#video",
                            "videoId": video_id
                        }
                    }
                }
            )
            response = request.execute()
            break
        except HttpError as e:
            if e.resp.status == 403:
                print("YouTube Rate Limit exceeded. Retrying after 10 seconds")
                time.sleep(10)
            else:
                raise e


In [None]:
for song_name, artist_name in songs:
    video_id = search_youtube(song_name, artist_name)
    add_to_youtube_playlist(youtube_playlist_id, video_id)

print(f"Playlist migration complete. New YouTube playlist ID: {youtube_playlist_id}")
