The URL the user will go to give us info is below. This is just for beta and closed development.When the user gives us their info they will be met with localhost can't be connected to or something along the lines. For now the link is code= whatever in the url, you can copy and paste it. In the future when we get our own url, we will be able to have a function that just grabs the code without us interfering (callback function). The access token lasts for an hour and the auth code is one use. So if you used the auth code snippet once, please don't refresh it, it will give an error.

https://accounts.spotify.com/en/authorize?client_id=6b7e2e58b62d4010a958a1f14c3a2319&response_type=code&redirect_uri=http://localhost:8880/callback&scope=user-top-read%20user-read-private%20playlist-read-private

In [35]:
from flask import Flask, render_template, redirect, session, make_response, request, url_for
from flask_sqlalchemy import SQLAlchemy
from urllib.parse import urlencode
import os
import requests
import logging
import numpy as np
import pandas as pd
import db
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import pairwise_distances
from pathlib import Path



# Flask setup
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.config.from_pyfile('config.py')

BASE_DIR = Path(os.getcwd())
DB_PATH = BASE_DIR / 'db' / 'users.db'
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_PATH}'

app.config['SQLALCHEMY_BINDS'] = {'two': f'sqlite:///playlist.db', 'three': f'sqlite:///top_tracks.db'}



db = SQLAlchemy(app)
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)

class Playlist(db.Model):
    __bind_key__ = 'two'
    id = db.Column(db.String(255), primary_key = True, nullable=False)
    name = db.Column(db.String(255))

class Top_Tracks(db.Model):
    __bind_key__ = 'three'
    uid = db.Column(db.String(255), primary_key=True)
    id = db.Column(db.String(255), primary_key=True, nullable=False)
    name = db.Column(db.String(255))
    artists = db.Column(db.String(255))

with app.app_context():
    db.create_all()
# Define constants
US_TOP_HITS_PLAYLIST_ID = "37i9dQZF1DXcBWIGoYBM5M?si=beaf719fd1f64d09"

def getToken(code):
    token_url = 'https://accounts.spotify.com/api/token'
    authorization = app.config['AUTHORIZATION']
    redirect_uri = app.config['REDIRECT_URI']
    headers = {'Authorization': f"Basic {authorization}",  # Updated the Authorization header
               'Accept': 'application/json',
               'Content-Type': 'application/x-www-form-urlencoded'}
    body = {'code': code, 'redirect_uri': redirect_uri,
            'grant_type': 'authorization_code'}
    post_response = requests.post(token_url, headers=headers, data=body)
    if post_response.status_code == 200:
        pr = post_response.json()
        return pr['access_token'], pr['refresh_token'], pr['expires_in']
    else:
        logging.error('getToken:' + str(post_response.status_code))
        return None

def get_top_tracks(access_token, limit=50):
    headers = {'Authorization': f"Bearer {access_token}"}
    response = requests.get(f"https://api.spotify.com/v1/me/top/tracks?limit={limit}", headers=headers)
    if response.status_code == 200:
        return response.json()['items']
    else:
        logging.error(f"get_top_tracks: {response.status_code}")
        return []

def get_top_artists(access_token, limit=50):
    headers = {'Authorization': f"Bearer {access_token}"}
    response = requests.get(f"https://api.spotify.com/v1/me/top/artists?limit={limit}", headers=headers)
    if response.status_code == 200:
        return response.json()['items']
    else:
        logging.error(f"get_top_tracks: {response.status_code}")
        return []

def fetch_us_top_hits_features(headers):
    response = requests.get(f"https://api.spotify.com/v1/playlists/{US_TOP_HITS_PLAYLIST_ID}/tracks", headers=headers)
    us_top_hits_tracks = response.json()
    us_top_hits_track_ids = [track['track']['id'] for track in us_top_hits_tracks['tracks']['items']]
    features_response = requests.get(f"https://api.spotify.com/v1/audio-features?ids={','.join(us_top_hits_track_ids)}", headers=headers)
    return features_response.json(), us_top_hits_tracks

def get_user_playlists(access_token, limit=50):
    headers = {'Authorization': f"Bearer {access_token}"}
    response = requests.get(f"https://api.spotify.com/v1/me/playlists?limit={limit}", headers=headers)
    if response.status_code == 200:
        return response.json()['items']
    else:
        logging.error(f"get_user_playlists: {response.status_code}")
        return []

def get_playlist_tracks(access_token, playlist_id):
    headers = {'Authorization': f"Bearer {access_token}"}
    response = requests.get(f"https://api.spotify.com/v1/playlists/{playlist_id}/tracks", headers=headers)
    if response.status_code == 200:
        return response.json()['items']
    else:
        logging.error(f"get_playlist_tracks: {response.status_code}")
        return []


@app.route('/')
def index():
    authorized = 'access_token' in session
    username = None
    if 'user_id' in session:
        user = User.query.get(session['user_id'])
        if user:
            username = user.username
    return render_template('index.html', authorized=authorized, username=username)



@app.route('/authorize')
def authorize():
    client_id = app.config['CLIENT_ID']
    redirect_uri = app.config['REDIRECT_URI']
    scope = app.config['SCOPE']
    state_key = os.urandom(15).hex()  # Creating a random state key
    session['state_key'] = state_key

    authorize_url = 'https://accounts.spotify.com/en/authorize?'
    params = {'response_type': 'code', 'client_id': client_id,
              'redirect_uri': redirect_uri, 'scope': scope,
              'state': state_key}
    query_params = urlencode(params)
    response = make_response(redirect(authorize_url + query_params))
    return response

@app.route('/callback/')
def callback():
    # Check for error
    error = request.args.get('error')
    if error:
        # Handle the error based on your requirements
        logging.error(f"Error during authorization: {error}")
        return render_template('error.html', error=error)

    # Check state to prevent CSRF
    state = request.args.get('state')
    if not state or state != session.get('state_key'):
        logging.error("State mismatch error")
        return render_template('error.html', error="State mismatch error")

    # Get the code from the callback URL
    code = request.args.get('code')

    # Use the code to get the access token
    access_token, refresh_token, expires_in = getToken(code)

    # Store the access token, refresh token, and expiration in the session or database
    session['access_token'] = access_token
    session['refresh_token'] = refresh_token
    session['expires_in'] = expires_in

    # Once access token is obtained, get user's Spotify display name
    headers = {'Authorization': f"Bearer {session['access_token']}"}
    response = requests.get("https://api.spotify.com/v1/me", headers=headers)
    if response.status_code == 200:
        spotify_username = response.json()['display_name']
        user = User.query.filter_by(username=spotify_username).first()
        if not user:
            user = User(username=spotify_username)
            db.session.add(user)
            db.session.commit()
        session['user_id'] = user.id

    return redirect(url_for('index'))

@app.route('/recommendations/')
def get_recommendations():
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    
    top_tracks = get_top_tracks(access_token, limit=50)
    top_track_ids = [track['id'] for track in top_tracks]

    headers = {'Authorization': f"Bearer {access_token}"}
    features_response = requests.get(f"https://api.spotify.com/v1/audio-features?ids={','.join(top_track_ids)}", headers=headers)
    my_tracks_features = features_response.json()

    us_top_hits_features, us_top_hits_tracks = fetch_us_top_hits_features(headers)

    my_tracks_df = pd.DataFrame(my_tracks_features['audio_features'])
    us_top_hits_df = pd.DataFrame(us_top_hits_features['audio_features'])

    selected_features = ['danceability', 'energy', 'loudness', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo']
    X_my_tracks = my_tracks_df[selected_features]
    X_us_top_hits = us_top_hits_df[selected_features]

    scaler = StandardScaler().fit(X_my_tracks)
    X_my_tracks = scaler.transform(X_my_tracks)
    X_us_top_hits = scaler.transform(X_us_top_hits)

    knn = NearestNeighbors(n_neighbors=5, metric='cosine')
    knn.fit(X_us_top_hits)
    distances, indices = knn.kneighbors(X_my_tracks)

    pairwise_dists = pairwise_distances(X_us_top_hits, metric='cosine')
    threshold = np.percentile(pairwise_dists[np.triu_indices(pairwise_dists.shape[0], k=1)], 25)
    sorted_recommendations = sorted(zip(distances.flatten(), indices.flatten()), key=lambda x: x[0])
    filtered_recommendations = [(dist, idx) for dist, idx in sorted_recommendations if dist <= threshold]
    top_10_indices = [idx for _, idx in filtered_recommendations[:10]]
    top_10_songs = [f"{us_top_hits_tracks['tracks']['items'][i]['track']['name']} by {us_top_hits_tracks['tracks']['items'][i]['track']['artists'][0]['name']}" for i in top_10_indices]

    return render_template('recommended_tracks.html', tracks=top_10_songs)

@app.route('/playlists/')
def display_playlists():
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    playlists = get_user_playlists(access_token)
    
    for playlist in playlists:
        playlist_id = playlist['id']
        playlist_name = playlist['name']
        playlist_track_arr = get_playlist_tracks(access_token, playlist_id)
        pl = Playlist(id=playlist_id, name=playlist_name)
        db.session.add(pl)
        db.session.commit()
          
    
    return render_template('display_playlists.html', playlists=playlists)

@app.route('/playlists/<playlist_id>/tracks/')
def display_tracks(playlist_id):
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    
    tracks = get_playlist_tracks(access_token, playlist_id)
    # You can also fetch the playlist name if you want to display it
    playlist_name = "Chosen Playlist"  # Placeholder, replace with actual playlist name if you fetch it

    return render_template('display_tracks.html', tracks=tracks, playlist_name=playlist_name)

@app.route('/top_artists/')
def display_artists():
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    artists = get_top_artists(access_token)

    return render_template('top_artists.html', artists=artists)

@app.route('/top_tracks/')
def display_top_tracks():
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    top_tracks = get_top_tracks(access_token)

    #put top tracks into database

    for track in top_tracks:
        track_id = track['id']
        track_name = track['name']
        user = session['user_id']
        track_artist = track['artists'][0]['name']
        

        
        tr = Top_Tracks(uid = user, id=track_id, name=track_name, artists=track_artist)
        db.session.add(tr)
        db.session.commit()


    user_info = session.get('user_id')
    top_tracks = Top_Tracks.query.filter_by(uid=user).all()
    

    return render_template('top_tracks.html', top_tracks=top_tracks)

@app.route('/blended/')
def display_blended():
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('authorize'))
    with app.app_context():
        tracks = Top_Tracks.query.all()
        return render_template('blended.html', tracks=tracks)

if __name__ == "__main__":
    app.run(debug=False)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [07/Nov/2023 23:32:53] "[32mGET /top_tracks/ HTTP/1.1[0m" 302 -
127.0.0.1 - - [07/Nov/2023 23:32:53] "[32mGET /authorize HTTP/1.1[0m" 302 -
127.0.0.1 - - [07/Nov/2023 23:32:54] "[32mGET /callback/?code=AQBi_X_D-OqEL_slyx1YR0F233r5axtM4AA9f8yaY6xlg7_PTntnywWg-692X4DlAPVw0iHOS-iN096n8ZvOe0YeCWOGUWZ-CA4f4rPcGUZ8vUe53OPtF6QBiYZIMcS7v6Iv9TMmbDjcEtbvE6EhFKtjjqno4_rg9Oql5iCmoDCGGDdG_nrC5rLZEtjV-dzNk8QoG8fzxX7FvMhRIikrcUzNMnBj-lcJwnVgX6Hm8Pz4FXLny3QrJCVp9mU&state=6619197a2f582cc26b8e24d8eb3180 HTTP/1.1[0m" 302 -
  user = User.query.get(session['user_id'])
127.0.0.1 - - [07/Nov/2023 23:32:54] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [07/Nov/2023 23:32:56] "GET /top_tracks/ HTTP/1.1" 200 -
