In [9]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error

# Spotify API credentials
scope = "user-top-read user-library-read"  # Include both user-top-read and user-library-read scopes
client_id = "716c1e25d0b94ad59424c2fe6e5268ec"
client_secret = "1f967480693941c69c6265ca6d920b4f"
redirect_uri = "http://localhost:8000/callback"

# Initialize Spotify client
scope = "user-top-read"
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=scope))

# Function to fetch training data from Spotify API
def fetch_training_data(limit=50):  # Adjusted limit to maximum allowed value of 50
    try:
        # Fetch user's top tracks
        top_tracks = sp.current_user_top_tracks(limit=limit)
    except spotipy.exceptions.SpotifyException as e:
        print(f"Spotify API error: {e}")
        return None  # Return None if there's an API error

    track_ids = [track['id'] for track in top_tracks['items']]

    # Fetch audio features and popularity for each track
    data = []
    for track_id in track_ids:
        features = sp.audio_features([track_id])[0]
        track_info = sp.track(track_id)
        popularity = track_info['popularity']
        
        if features:
            # Combine audio features and popularity into a single dictionary
            features['popularity'] = popularity
            data.append(features)
    
    # Convert data to a DataFrame
    df = pd.DataFrame(data)
    return df

# Function to preprocess data
def preprocess_data(df):
    # Drop unnecessary columns and handle missing values
    df = df.drop(columns=['type', 'id', 'uri', 'track_href', 'analysis_url', 'time_signature'])
    df = df.dropna()
    
    # Standardize features
    scaler = StandardScaler()
    df[df.columns] = scaler.fit_transform(df[df.columns])
    
    return df, scaler

# Function to train and save the model
def train_and_save_model():
    # Fetch training data
    df = fetch_training_data(limit=50)  # Use the adjusted limit of 50
    
    # Check if the data is retrieved successfully
    if df is None:
        print("Failed to fetch training data. Check the Spotify API error above.")
        return
    
    # Preprocess data
    df, scaler = preprocess_data(df)
    
    # Separate features and target (popularity)
    X = df.drop(columns=['popularity'])
    y = df['popularity']
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Train a RandomForestRegressor
    model = RandomForestRegressor(n_estimators=50)  # Use fewer estimators for faster training
    model.fit(X_train, y_train)
    
    # Evaluate the model
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Mean Absolute Error: {mae:.2f}")
    
    # Save the trained model and scaler
    joblib.dump(model, 'spotify_model.joblib')
    joblib.dump(scaler, 'spotify_scaler.joblib')
    print("Model and scaler saved to disk.")
    
# Run the function to train and save the model
train_and_save_model()


Mean Absolute Error: 0.82
Model and scaler saved to disk.


In [10]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error
import joblib  # To save the trained model


In [12]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import pandas as pd
import joblib
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error

# Spotify API credentials
scope = "user-top-read user-library-read"  # Include both user-top-read and user-library-read scopes
client_id = "716c1e25d0b94ad59424c2fe6e5268ec"
client_secret = "1f967480693941c69c6265ca6d920b4f"
redirect_uri = "http://localhost:8000/callback"

# Initialize Spotify client
scope = "user-top-read"
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=scope))

# Function to fetch training data from Spotify API
def fetch_training_data(limit=50):  # Adjusted limit to maximum allowed value of 50
    try:
        # Fetch user's top tracks
        top_tracks = sp.current_user_top_tracks(limit=limit)
    except spotipy.exceptions.SpotifyException as e:
        print(f"Spotify API error: {e}")
        return None  # Return None if there's an API error

    track_ids = [track['id'] for track in top_tracks['items']]

    # Fetch audio features and popularity for each track
    data = []
    for track_id in track_ids:
        features = sp.audio_features([track_id])[0]
        track_info = sp.track(track_id)
        popularity = track_info['popularity']
        
        if features:
            # Combine audio features and popularity into a single dictionary
            features['popularity'] = popularity
            data.append(features)
    
    # Convert data to a DataFrame
    df = pd.DataFrame(data)
    return df

# Function to preprocess data
def preprocess_data(df):
    # Drop unnecessary columns and handle missing values
    df = df.drop(columns=['type', 'id', 'uri', 'track_href', 'analysis_url', 'time_signature'])
    df = df.dropna()
    
    # Standardize features
    scaler = StandardScaler()
    df[df.columns] = scaler.fit_transform(df[df.columns])
    
    return df, scaler

# Function to train and optimize the model using GridSearchCV
def train_and_optimize_model():
    # Fetch training data
    df = fetch_training_data(limit=50)  # Use the adjusted limit of 50
    
    # Check if the data is retrieved successfully
    if df is None:
        print("Failed to fetch training data. Check the Spotify API error above.")
        return
    
    # Preprocess data
    df, scaler = preprocess_data(df)
    
    # Separate features and target (popularity)
    X = df.drop(columns=['popularity'])
    y = df['popularity']
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Define parameter grid for hyperparameter tuning
    param_grid = {
        'n_estimators': [50, 100, 150],
        'max_depth': [10, 15, 20],
        'min_samples_split': [2, 4, 6],
        'min_samples_leaf': [1, 2, 4],
    }
    
    # Initialize RandomForestRegressor and GridSearchCV
    model = RandomForestRegressor(random_state=42)
    grid_search = GridSearchCV(model, param_grid, cv=5, scoring='neg_mean_absolute_error')
    
    # Perform hyperparameter tuning
    grid_search.fit(X_train, y_train)
    
    # Get the best model
    best_model = grid_search.best_estimator_
    
    # Evaluate the best model on the testing set
    y_pred = best_model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    print(f"Mean Absolute Error with optimized RandomForest: {mae:.2f}")
    
    # Save the best model and scaler
    joblib.dump(best_model, 'spotify_optimized_model.joblib')
    joblib.dump(scaler, 'spotify_scaler.joblib')
    print("Optimized model and scaler saved to disk.")
    
# Run the function to train and optimize the model
train_and_optimize_model()


Mean Absolute Error with optimized RandomForest: 0.88
Optimized model and scaler saved to disk.


In [24]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth
import joblib
from collections import Counter
import pandas as pd

# Spotify API credentials
scope = "playlist-modify-private playlist-modify-public user-top-read"
client_id = "your_client_id"
client_secret = "your_client_secret"
redirect_uri = "your_redirect_uri"

# Initialize Spotify client
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, scope=scope))

# Load the saved model and scaler
model = joblib.load('spotify_model.joblib')
scaler = joblib.load('spotify_scaler.joblib')

# Define the expected feature columns used during training
expected_features = ['danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo']

def create_personalized_playlist():
    # Fetch user's top tracks
    top_tracks = sp.current_user_top_tracks(limit=20)
    track_ids = [track['id'] for track in top_tracks['items']]
    track_names = [track['name'] for track in top_tracks['items']]

    # Get genres from top tracks
    artist_ids = [track['artists'][0]['id'] for track in top_tracks['items']]
    genres = []
    for artist_id in artist_ids:
        artist_info = sp.artist(artist_id)
        genres.extend(artist_info['genres'])
    
    genre_count = Counter(genres)
    most_common_genre = genre_count.most_common(1)[0][0]

    # Search for tracks in the most common genre
    results = sp.search(q=f'genre:{most_common_genre}', type='track', limit=20)
    recommended_tracks = results['tracks']['items']

    # Create a new playlist for the user
    user_id = sp.current_user()['id']
    playlist = sp.user_playlist_create(user_id, 'Personalized Playlist', public=False)
    playlist_id = playlist['id']

    # Prepare the data for prediction
    track_data = []
    for track in recommended_tracks:
        track_id = track['id']
        track_name = track['name']
        
        # Fetch audio features for the track
        features = sp.audio_features([track_id])[0]
        
        # Ensure features match the expected features
        if features:
            # Filter the features to include only the expected features
            filtered_features = [features[key] for key in expected_features]
            
            # Standardize the filtered features using the loaded scaler
            standardized_features = scaler.transform([filtered_features])
            
            # Predict popularity using the trained model
            predicted_popularity = model.predict(standardized_features)[0]
            
            # Append track data with predicted popularity
            track_data.append({'id': track_id, 'name': track_name, 'predicted_popularity': predicted_popularity})

    # Sort the tracks based on predicted popularity
    track_data.sort(key=lambda x: x['predicted_popularity'], reverse=True)

    # Add sorted recommended tracks to the new playlist based on popularity
    sorted_track_ids = [track['id'] for track in track_data]
    sp.playlist_add_items(playlist_id, sorted_track_ids)

    # Return the playlist ID and track names
    return playlist_id, [track['name'] for track in track_data]

# Example usage
if __name__ == "__main__":
    playlist_id, track_names = create_personalized_playlist()
    print(f"Personalized playlist created with ID: {playlist_id}")
    print("Track names added to the personalized playlist:")
    for name in track_names:
        print(name)




ValueError: X has 11 features, but StandardScaler is expecting 13 features as input.

In [28]:
# Required packages:
# - spotipy for interacting with Spotify's API
# - surprise for collaborative filtering and recommendation systems
# - pandas for data manipulation and analysis
# - joblib for saving and loading models
import spotipy
from spotipy.oauth2 import SpotifyOAuth
from surprise import SVD, Dataset, Reader, accuracy
from surprise.model_selection import train_test_split
import pandas as pd
import joblib

# Define the scope of access
scope = "user-library-read playlist-modify-public user-top-read"

# Authenticate with Spotify
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope=scope))

# Function to get user's saved tracks
def get_user_tracks(sp, limit=50):
    tracks = []
    results = sp.current_user_saved_tracks(limit=limit)
    while results:
        for item in results['items']:
            track = item['track']
            tracks.append(track)
        if results['next']:
            results = sp.next(results)
        else:
            break
    return tracks

# Function to get user's top tracks
def get_user_top_tracks(sp, time_range='medium_term', limit=50):
    top_tracks = sp.current_user_top_tracks(time_range=time_range, limit=limit)
    return [track['id'] for track in top_tracks['items']]

# Retrieve user tracks and top tracks
user_tracks = get_user_tracks(sp, limit=100)
top_tracks = get_user_top_tracks(sp, time_range='medium_term', limit=50)

# Function to create an interaction matrix
def create_interaction_matrix(user_tracks, top_tracks):
    data = {
        'user_id': [],
        'track_id': [],
        'rating': []
    }
    user_id = sp.current_user()['id']

    # Add top tracks as high ratings
    for track_id in top_tracks:
        data['user_id'].append(user_id)
        data['track_id'].append(track_id)
        data['rating'].append(5)  # High rating for top tracks

    # Add user tracks as neutral ratings
    for track in user_tracks:
        track_id = track['id']
        if track_id not in top_tracks:
            data['user_id'].append(user_id)
            data['track_id'].append(track_id)
            data['rating'].append(3)  # Neutral rating for user tracks

    return pd.DataFrame(data)

# Create an interaction matrix
interaction_matrix = create_interaction_matrix(user_tracks, top_tracks)

# Create a Reader and load the data into a Dataset
reader = Reader(rating_scale=(1, 5))
data = Dataset.load_from_df(interaction_matrix[['user_id', 'track_id', 'rating']], reader)

# Split the data into train and test sets
trainset, testset = train_test_split(data, test_size=0.2)

# Train a Singular Value Decomposition (SVD) model
model = SVD()
model.fit(trainset)

# Test the model
predictions = model.test(testset)
print(f"RMSE: {accuracy.rmse(predictions)}")

# Save the model to a file
joblib.dump(model, 'playlist_recommender_model.joblib')

# Function to recommend tracks
def recommend_tracks(model, sp, num_recommendations=20):
    user_id = sp.current_user()['id']
    track_pool = interaction_matrix['track_id'].unique()
    
    # Recommend tracks based on the user's profile
    recommendations = []
    for track_id in track_pool:
        prediction = model.predict(user_id, track_id)
        recommendations.append((track_id, prediction.est))
    
    # Sort recommendations by predicted rating
    recommendations.sort(key=lambda x: x[1], reverse=True)
    recommended_track_ids = [rec[0] for rec in recommendations[:num_recommendations]]
    
    return recommended_track_ids

# Function to create a playlist
def create_playlist(sp, name, track_ids):
    # Create a new playlist
    user_id = sp.current_user()['id']
    playlist = sp.user_playlist_create(user_id, name, public=True)
    
    # Add tracks to the playlist
    sp.playlist_add_items(playlist['id'], track_ids)
    return playlist

# Recommend tracks and create a playlist
recommended_track_ids = recommend_tracks(model, sp)
playlist_name = "Recommended Playlist"
playlist = create_playlist(sp, playlist_name, recommended_track_ids)
print(f"Playlist created: {playlist['name']} (ID: {playlist['id']})")


SpotifyException: http status: 400, code:-1 - Unsupported URL / URI., reason: None

In [None]:
# Spotify API credentials
scope = "user-top-read user-library-read"  # Include both user-top-read and user-library-read scopes
client_id = "716c1e25d0b94ad59424c2fe6e5268ec"
client_secret = "1f967480693941c69c6265ca6d920b4f"
redirect_uri = "http://localhost:8000/callback"