In [4]:
#!pip install requests
#!pip install print-dict



In [8]:
import base64
import datetime
from urllib.parse import urlencode
import requests as rq
import webbrowser
from print_dict import pd
import os
from dotenv import load_dotenv

In [2]:
#Client Info
load_dotenv("./config.env")
client_secret = os.getenv("CLIENT_SECRET")
client_id = os.getenv("CLIENT_ID")

In [3]:
class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now()
    access_token_did_expire = True
    client_id = None
    client_secret = None
    token_url = "https://accounts.spotify.com/api/token"
    auth_url = "https://accounts.spotify.com/authorize"
    baseURL = 'https://api.spotify.com/v1'
    auth_code = None
    refresh_token = None
    
    def __init__(self, client_id, client_secret, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_id = client_id
        self.client_secret = client_secret
        self.perform_login()
    
    def get_client_credentials(self):
        """
        Returns a base64 string
        """
        client_id = self.client_id
        client_secret = self.client_secret
        if client_secret == None or client_id == None:
            raise Exception("You must set client_id and client_secret.")
        client_creds = f"{client_id}:{client_secret}"
        client_creds_b64 = base64.b64encode(client_creds.encode())
        return client_creds_b64.decode()
    
    def get_token_headers(self):
        
        client_creds_b64 = self.get_client_credentials()
        return {
            "Authorization": f"Basic {client_creds_b64}"
        }
    
    def get_auth_data(self):
        auth_data = {
            'client_id' : self.client_id,
            'response_type' : 'code',
            'redirect_uri' : 'http://localhost:3000/',
            'scope' : 'playlist-read-private playlist-read-collaborative playlist-modify-public playlist-modify-private'
            }
        return auth_data
    
    def perform_login(self):
        response = rq.get(self.auth_url, params=self.get_auth_data())
        if response.status_code not in range(200, 299):
            raise Exception("Could not login!")
        webbrowser.open(response.url)
    
    def parse_url_for_code(self, redirect_url=None):
        #parse URL for code
        if redirect_url == None:
            raise Exception("Invalid URL")
        response_url = redirect_url
        split_url = response_url.split('?')
        split_url = split_url[1].split('=')
        #print(split_url)
        auth_code1 = split_url[1]
        #return auth_code
        self.auth_code = auth_code1
        self.get_initial_access_token()
    
    def get_initial_token_data(self):
        token_data = {
            'grant_type' : "authorization_code",
            'code' : self.auth_code,
            'redirect_uri' : 'http://localhost:3000/',
            'client_id' : client_id,
            'client_secret': client_secret
        }
        return token_data
    
    def get_initial_access_token(self):
        auth_response = rq.post(url=self.token_url, data=self.get_initial_token_data())
        if auth_response.status_code not in range(200, 299):
            raise Exception("Could not get initial access token")
        s = auth_response.json()
        now = datetime.datetime.now()
        self.access_token = s['access_token']
        self.refresh_token = s['refresh_token']
        expires_in = s['expires_in']
        expires = now + datetime.timedelta(seconds=expires_in)
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        return True 
    
    
    def get_refresh_token_data(self):
#         return {
#             'grant_type' : "client_credentials"
#         }
        refresh_data= {
            'grant_type':'refresh_token',
            'refresh_token': self.refresh_token,
            'client_id' : client_id,
            'client_secret': client_secret
        }
        return refresh_data

    
    def perform_auth(self):
        token_url = self.token_url
        refresh_data = self.get_refresh_token_data()
        token_headers = self.get_token_headers()
        
        r = rq.post(url=token_url, data=refresh_data)
        #r = rq.post(token_url, data=token_data, headers=token_headers)
        #print(r.json())
        if r.status_code not in range(200, 299):
            raise Exception("Could not authenticate client.")
            #return False
        data = r.json()
        now = datetime.datetime.now()
        refresh_token = data['access_token']
        expires_in = data['expires_in']
        expires = now + datetime.timedelta(seconds=expires_in)
        self.access_token = refresh_token
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        return True

    def get_access_token(self):
#         auth_done = self.perform_auth()
#         if not auth_done:
#             raise Exception("Authentication failed")
        token = self.access_token
        expires = self.access_token_expires
        now = datetime.datetime.now()
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif token == None:
            self.perform_auth()
            return self.get_access_token()
        return token
    
    def get_resource_header(self):
        access_token = self.get_access_token()
        headers = {
            "Authorization": f"Bearer {access_token}"
        }
        return headers
       
        
    def get_resource(self, lookup_id, resource_type='albums',version='v1'):
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{lookup_id}"
        headers = self.get_resource_header()
        r = rq.get(endpoint, headers=headers)
        if r.status_code not in range(200,299):
            return {}
        return r.json()
        
    def get_album(self, _id):
#         baseURL = 'https://api.spotify.com/v1'
#         endpoint = f"{baseURL}/albums/{_id}"
#         pass
        return self.get_resource(_id, resource_type = 'albums')
    
    def get_artist(self, _id):
        return self.get_resource(_id, resource_type = 'artists')
    
    def base_search(self, query_params):
        access_token = self.get_access_token()
        headers = self.get_resource_header()
        endpoint = "https://api.spotify.com/v1/search"
        lookup_url = f"{endpoint}?{query_params}"
        r = rq.get(lookup_url, headers = headers)
        #print(r.status_code)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()
    
    def search(self, query=None, operator=None, operator_query=None, search_type='artist'):
        if query == None:
            raise Exception("A query is required")
        if isinstance (query, dict):
            query = " ".join([f"{k}:{v}" for k,v in query.items()])
        if operator != None and operator_query != None:
            if operator.lower() == "or" or operator.lower() == "not":
                operator = operator.upper()
                if isinstance(operator_query, str):
                    query = f"{query} {operator} {operator_query}"
        query_params = urlencode({"q": query, "type" : search_type.lower()})
        print(query_params)
        # When doing the queries with operators match the query to the operator query in terms of type(artists, albums,etc.)
        return self.base_search(query_params)
        

#Playlist features

    def get_user_playlists(self, resource_type='users', version='v1', user_id='null', limit=50):
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{user_id}/playlists"
        query_params = urlencode({'limit' : 50})
        final_endpoint = f"{endpoint}?{query_params}"
        headers = self.get_resource_header()
        r = rq.get(final_endpoint, headers=headers)
        if r.status_code not in range(200,299):
            return {}
        return r.json()
    
    def get_next_page(self, url=None):
        if url == None:
            raise Exception("Invalid argument")
        headers = self.get_resource_header()
        r = rq.get(url, headers=headers)
        if r.status_code not in range(200,299):
            print(r.status_code)
            return {}
        #print(r.json())
        return r.json()
    
    #this particular vairable/field is only needed for the list_user_playlists method
    user_playlists = []
    def list_user_playlists(self, response=None):
        if response == None:
            raise Exception("Invalid argument")
        #print(self.user_playlists)
        for item in response['items']:
            temp = {'name' : item['name'],
                    'id' : item['id'],
                   'uri' : item['uri']}
            self.user_playlists.append(temp)
        while(response['next'] != None):
            self.list_user_playlists(self.get_next_page(response['next']))
            break
#         if response['next'] != None:
#             self.list_user_playlists(self.get_next_page(response['next']))
#         if response['next'] == None:
        return self.user_playlists
    
    def clear_user_playlists(self):
        self.user_playlists = []
        
    def get_playlist_items(self, resource_type='playlists', version='v1', playlist_id='null', limit=100):
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{playlist_id}/tracks"
        query_params = urlencode({'limit' : limit, 'fields': 'items(added_by.id,uri,track(name,artists(href,id,uri,name,spotify),id,uri,duration,explicit,href,album(name,id,uri))),href,limit,next,offset,previous,total'})
        final_endpoint = f"{endpoint}?{query_params}"
        #print(final_endpoint)
        headers = self.get_resource_header()
        r = rq.get(final_endpoint, headers=headers)
        if r.status_code not in range(200,299):
            return {}
        return r.json()
        
#     #this particular vairable/field is only needed for the list_playlists_items method
    
#     def list_playlist_items(self, response=None):
#         if response == None:
#             raise Exception("Invalid argument")
#         #print(self.user_playlists)
#         for item in response['items']:
#             temp = {'name' : item['name'],
#                     'id' : item['id']}
#             self.user_playlists.append(temp)
#         while(response['next'] != None):
#             self.list_user_playlists(self.get_next_page(response['next']))
#             break
# #         if response['next'] != None:
# #             self.list_user_playlists(self.get_next_page(response['next']))
# #         if response['next'] == None:
#         return self.user_playlists
    

    #this particular vairable/field is only needed for the parse_playlists_items method
    playlist_items = []
    def parse_playlist_items(self, response=None):
        if response == None:
            raise Exception("Invalid argument")
        for item in response['items']:
            #pd(item)
            if item['track']==None:
                continue
            artists = []
            for i in item['track']['artists']:
                w = {'name' : i['name'],
                     'id' : i['id'],
                     'uri' : i['uri']
                    }
                #w = {i['name'] : i['id']}
                artists.append(w)

            q = {'song_name' : item['track']['name'], 'id' : item['track']['id'],'uri' : item['track']['uri'],
                                          'artists' : artists }
            self.playlist_items.append(q)
        while(response['next'] != None):
            self.parse_playlist_items(self.get_next_page(response['next']))
            break
        return self.playlist_items

    def clear_playlist_items(self):
        self.playlist_items = []
        
        
    def create_playlist(self, resource_type='users', version='v1', user_id='null', 
                        name=None, description='', public=False, artist=None, playlist_name=None):
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{user_id}/playlists"
        #query_params = urlencode({'limit' : 50})
        #final_endpoint = f"{endpoint}?{query_params}"
        headers = self.get_resource_header()
        if name == None:
            name = f"{artist} : {playlist_name}"
            #print(name)
        post_data = {'name' : name,
                    'description' : description,
                    'public' : public}
        r = rq.post(endpoint, headers=headers, json=post_data)
        if r.status_code not in range(200,299):
            #print(r.status_code)
            return {}
        return r.json()
    
    def get_spotify_uri(self, id=None, resource_type='artist'):
        #resource type example = artist,album,track
        if id == None:
            raise Exception("Invalid argument")
        uri = f"spotify:{resource_type}:{id}"
        return uri
    
    def add_song_to_playlist(self, resource_type='playlists', version='v1', uri_list=None, playlist_id=None):
        if uri_list == None or playlist_id == None:
            raise Exception("Invalid argument")
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{playlist_id}/tracks"
        headers = self.get_resource_header()
        post_data = {'uris' : uri_list}
        r = rq.post(endpoint, headers=headers, json=post_data)
        if r.status_code not in range(200,299):
            #print(r.status_code)
            return {}
        return r.json()
    

In [None]:
#base64.b64decode(client_creds_b64) #decoding
#endpoint = "https://api.spotify.com/v1/search"

In [4]:
sc = SpotifyAPI(client_id, client_secret)

In [5]:
sc.parse_url_for_code('http://localhost:3000/?code=AQCzsCfkeuWu_M9vPiQp5BC3qSMeiVDrbc7dVyh7mjj2407EL_ClN8ay2RHveFrHV5YtRiRw7ZQ9LYal7_m_pl-7PqCn_dPD72UTv-_GlxKpRRrGOjNmOjGxwJRxKdXntbVaCioaQdc3Jacy4HAHRRsVcrxvKybjINrKIwHCyGw7gymxWxtZSikpi2v-AU9UAV7bUC69uonGLVVTWe4m4VKN40QmdhNiwTwuYJjC4Xn2VDuf0lemJUtnsHOR76PsmAkCcbRQhQUF34ycdQxUvF42mTgVCZl9SyrvWtkC4g')

In [6]:
response = sc.get_user_playlists(user_id='y4yjmhqzanyc6musw2wjfi1hv')
#print(response)
pd(response)
#sc.get_user_playlists(user_id='y4yjmhqzanyc6musw2wjfi1hv')

{
    'href': 'https://api.spotify.com/v1/users/y4yjmhqzanyc6musw2wjfi1hv/playlists?offset=0&limit=50',
    'items': [{
        'collaborative': False,
        'description': 'A playlist of Drake songs in IKE playlist',
        'external_urls': {
            'spotify': 'https://open.spotify.com/playlist/5bafUXvO08JfyfLbkrY8dk'
        },
        'href': 'https://api.spotify.com/v1/playlists/5bafUXvO08JfyfLbkrY8dk',
        'id': '5bafUXvO08JfyfLbkrY8dk',
        'images': [{
            'height': 640,
            'url': 'https://mosaic.scdn.co/640/ab67616d0000b27359ba1bf8385cb016e62a26c0ab67616d0000b2736cab41f8c84d6164976400d4ab67616d0000b2739c188c494d8bfaf895411890ab67616d0000b273f907de96b9a4fbc04accc0d5',
            'width': 640
        }, {
            'height': 300,
            'url': 'https://mosaic.scdn.co/300/ab67616d0000b27359ba1bf8385cb016e62a26c0ab67616d0000b2736cab41f8c84d6164976400d4ab67616d0000b2739c188c494d8bfaf895411890ab67616d0000b273f907de96b9a4fbc04accc0d5',
        

In [23]:
# user_playlists = []
# for item in response['items']:
#     temp = {item['name']: item['id']}
#     user_playlists.append(temp)

# if response
    
# print(user_playlists)
sc.clear_user_playlists()
q = sc.list_user_playlists(response)
#print(q)
pd(q)

[{
    'name': 'Joy Crookes - The Complete Collection',
    'id': '1PT1LcejkMGoGsmKzkBKax',
    'uri': 'spotify:playlist:1PT1LcejkMGoGsmKzkBKax'
}, {
    'name': 'Japanese City Pop',
    'id': '7H0gvTCkCh9vA8gw1mcNVd',
    'uri': 'spotify:playlist:7H0gvTCkCh9vA8gw1mcNVd'
}, {
    'name': 'Your Top Songs 2022',
    'id': '37i9dQZF1F0sijgNaJdgit',
    'uri': 'spotify:playlist:37i9dQZF1F0sijgNaJdgit'
}, {
    'name': 'Black Panther: Wakanda Forever Official Playlist',
    'id': '37i9dQZF1DX6tq7CPNyNEM',
    'uri': 'spotify:playlist:37i9dQZF1DX6tq7CPNyNEM'
}, {
    'name': 'Calisthenics x Gymtok',
    'id': '06wrErgQDANLAeVl2NR6NY',
    'uri': 'spotify:playlist:06wrErgQDANLAeVl2NR6NY'
}, {
    'name': 'Synthwave type',
    'id': '4myVDdbRXNQ8Hu8pQheSMN',
    'uri': 'spotify:playlist:4myVDdbRXNQ8Hu8pQheSMN'
}, {
    'name': 'Gym Phonk 🔱',
    'id': '1sgNK6s1Gk4kZEOTKPd72X',
    'uri': 'spotify:playlist:1sgNK6s1Gk4kZEOTKPd72X'
}, {
    'name': '',
    'id': '5fkFpkHk4MaRyNeeXpYd30',
    'uri

In [None]:
x=1
for i in sc.user_playlists:
    print(str(x) + '.', i)
    x+=1

In [24]:
#Search for artist fuctionality and parsing
j = sc.search('Drake', search_type='artist')
#print(j)
#Printing names and ids
print((j['artists']['items'][0]['name'])+ ':',(j['artists']['items'][0]['id']))

search_list = []
for i in range(6):
    q = {'name' : j['artists']['items'][i]['name'],
        'id' : j['artists']['items'][i]['id']
        }
    #q = {(j['artists']['items'][i]['name']) : (j['artists']['items'][i]['id'])}
    search_list.append(q)
#print(search_list)
pd(search_list)
# target_artist_id = list(search_list[0].values())[0] 
# target_artist_id
target_artist_id = search_list[0]['id']
target_artist_id

q=Drake&type=artist
Drake: 3TVXtAsR1Inumwj472S9r4
[{
    'name': 'Drake',
    'id': '3TVXtAsR1Inumwj472S9r4'
}, {
    'name': '21 Savage',
    'id': '1URnnhqYAYcrqrcwql10ft'
}, {
    'name': 'Dave',
    'id': '6Ip8FS7vWT1uKkJSweANQK'
}, {
    'name': 'Popcaan',
    'id': '62DmErcU7dqZbJaDqwsqzR'
}, {
    'name': 'Smiley',
    'id': '6jeg7JBX9J9097esK752iR'
}, {
    'name': 'BlocBoy JB',
    'id': '4TEJudQY2pXxVHPE3gD2EU'
}]


'3TVXtAsR1Inumwj472S9r4'

In [25]:
#get playlist fuctionality
playlist_items_response = sc.get_playlist_items(playlist_id='6AAW33WKzlJaN0JMqjC3xC')
#print(playlist_items_response)
pd(playlist_items_response)

{
    'href': 'https://api.spotify.com/v1/playlists/6AAW33WKzlJaN0JMqjC3xC/tracks?offset=0&limit=100&fields=items(added_by.id,uri,track(name,artists(href,id,uri,name,spotify),id,uri,duration,explicit,href,album(name,id,uri))),href,limit,next,offset,previous,total',
    'items': [{
        'added_by': {
            'id': 'y4yjmhqzanyc6musw2wjfi1hv'
        },
        'track': {
            'album': {
                'id': '6mT3tct9qJ7RRfvN5lCRDL',
                'name': 'The Upper Hand',
                'uri': 'spotify:album:6mT3tct9qJ7RRfvN5lCRDL'
            },
            'artists': [{
                'href': 'https://api.spotify.com/v1/artists/1si6mnxJ6IpTOTW13ECa0o',
                'id': '1si6mnxJ6IpTOTW13ECa0o',
                'name': 'AllttA',
                'uri': 'spotify:artist:1si6mnxJ6IpTOTW13ECa0o'
            }, {
                'href': 'https://api.spotify.com/v1/artists/6mA4csYsYvf4Mq02PleZEV',
                'id': '6mA4csYsYvf4Mq02PleZEV',
                'name': 

In [26]:
# #Parsing playlist functionality

# target_playlist_items = []
# for item in playlist_items_response['items']:
#     artists = []
#     for i in item['track']['artists']:
#         w = {'name' : i['name'],
#              'id' : i['id']
#             }
#         #w = {i['name'] : i['id']}
#         artists.append(w)
        
#     q = {'song_name' : item['track']['name'], 'id' : item['track']['id'],
#                                   'artists' : artists }
#     target_playlist_items.append(q)

# #print(target_playlist_items)
# pd(target_playlist_items)

sc.clear_playlist_items()
rep = sc.parse_playlist_items(playlist_items_response)
#pd(rep)

In [27]:
#create the list of songs containing the target artist
target_artist_id
target_songs = []
for x in rep:
    for i in x['artists']:
        if i['id'] == target_artist_id:
            target_songs.append(x)
            break

#pd(target_songs)

#create a list of the uris of the target songs
target_songs_uri = []
for i in target_songs:
    target_songs_uri.append(i['uri'])
print(target_songs_uri)

['spotify:track:6vN77lE9LK6HP2DewaN6HZ', 'spotify:track:6n4U3TlzUGhdSFbUUhTvLP', 'spotify:track:4qKcDkK6siZ7Jp1Jb4m0aL', 'spotify:track:0TlLq3lA83rQOYtrqBqSct', 'spotify:track:2IRZnDFmlqMuOrYOLnZZyc', 'spotify:track:4n4BflhWjCHIxrI4v7Xt9s', 'spotify:track:2sqsNXfN0HtgDEgaHXiUTa', 'spotify:track:5r7WvQtyPfy1xch5zMgGRp', 'spotify:track:5yY9lUy8nbvjM1Uyo1Uqoc', 'spotify:track:6Kj17Afjo1OKJYpf5VzCeo', 'spotify:track:6EDO9iiTtwNv6waLwa1UUq', 'spotify:track:5InOp6q2vvx0fShv3bzFLZ', 'spotify:track:1bzM1cd6oqFozdr4wK6HdR', 'spotify:track:2SAqBLGA283SUiwJ3xOUVI', 'spotify:track:0LEFmgIj8ABU7CelzGh5Bs', 'spotify:track:6Kj17Afjo1OKJYpf5VzCeo', 'spotify:track:2of5xn0GU0TdFneR1saRLH', 'spotify:track:4qKcDkK6siZ7Jp1Jb4m0aL', 'spotify:track:3aQem4jVGdhtg116TmJnHz', 'spotify:track:5mCPDVBb16L4XQwDdbRUpz', 'spotify:track:35RJhm1pEovTBwnNR0zWad', 'spotify:track:0TlLq3lA83rQOYtrqBqSct', 'spotify:track:5r7WvQtyPfy1xch5zMgGRp', 'spotify:track:65OVbaJR5O1RmwOQx0875b', 'spotify:track:3yaYgjEFkRw3PVjW9mV1TO',

In [32]:
# (self, resource_type='users', version='v1', user_id='null', 
#                         name=None, description='', public=False, artist=None, playlist_name=None)
cplay = sc.create_playlist(user_id='y4yjmhqzanyc6musw2wjfi1hv',description='A playlist of Drake songs in IKE playlist', artist='Drake', playlist_name='Ike')
cplay

{'collaborative': False,
 'description': 'A playlist of Drake songs in IKE playlist',
 'external_urls': {'spotify': 'https://open.spotify.com/playlist/5bafUXvO08JfyfLbkrY8dk'},
 'followers': {'href': None, 'total': 0},
 'href': 'https://api.spotify.com/v1/playlists/5bafUXvO08JfyfLbkrY8dk',
 'id': '5bafUXvO08JfyfLbkrY8dk',
 'images': [],
 'name': 'Drake : Ike',
 'owner': {'display_name': 'Ike',
  'external_urls': {'spotify': 'https://open.spotify.com/user/y4yjmhqzanyc6musw2wjfi1hv'},
  'href': 'https://api.spotify.com/v1/users/y4yjmhqzanyc6musw2wjfi1hv',
  'id': 'y4yjmhqzanyc6musw2wjfi1hv',
  'type': 'user',
  'uri': 'spotify:user:y4yjmhqzanyc6musw2wjfi1hv'},
 'primary_color': None,
 'public': False,
 'snapshot_id': 'MSw3N2M1Mzc3MTZmYjMwYmUxM2Q5YmMwNTU1YWNkODdiMzMwMzJkMWM4',
 'tracks': {'href': 'https://api.spotify.com/v1/playlists/5bafUXvO08JfyfLbkrY8dk/tracks',
  'items': [],
  'limit': 100,
  'next': None,
  'offset': 0,
  'previous': None,
  'total': 0},
 'type': 'playlist',
 'uri':

In [33]:
#add_song_to_playlist(self, resource_type='playlists', version='v1', uri_list=None, playlist_id=None)
done = sc.add_song_to_playlist(uri_list=target_songs_uri, playlist_id=cplay['id'])
print(done)

{'snapshot_id': 'MyxkZGFkY2EzY2RlOTg1YjZlMDlmZTI1NmNlMzY5MjE2Y2E4NjVjYTQ0'}
