In [None]:
# Importing all classes and functions from the module
from Spotify_Recommender_Main import *

In [None]:
# Creating and processing the dataset : Source - Kaggle
# Details: 960K Spotify Songs With Lyrics data
# Link: https://www.kaggle.com/datasets/bwandowando/spotify-songs-with-attributes-and-lyrics?select=songs_with_lyrics_and_timestamps.csv

init_df = pd.read_csv("songs_with_attributes_and_lyrics.csv", index_col=0)
init_df.drop(['key', 'mode', 'lyrics'], axis=1, inplace=True)
feature_columns = ['danceability', 'energy', 'loudness', 
                'speechiness', 'acousticness', 'instrumentalness', 
                'liveness', 'valence', 'tempo', 'duration_ms']

song_ids = init_df.index.tolist()
scaler = MinMaxScaler()
init_df[feature_columns] = scaler.fit_transform(init_df[feature_columns])

# Creating a numpy feature matrix from the song features
feature_matrix = np.ascontiguousarray(init_df[feature_columns].values.astype('float32'))

In [None]:
# FAISS (Facebook AI Similarity Search) to perform fast similarity search using cosine similarity
faiss.normalize_L2(feature_matrix)

index = faiss.IndexFlatIP(len(feature_columns))  # Cosine similarity
index.add(feature_matrix)

In [None]:
# 1/4
# This code block is for User Data -

# Creating the server
server = SpotifyAuthServer(config_path='spotify_config.json')
# Launching it in a separate thread
server_thread = threading.Thread(target=server.run, kwargs={'host': '127.0.0.1', 'port': 8888})
server_thread.setDaemon(True)  # Allows the thread to exit when the main program exits
server_thread.start()

print("Server is running. Visit http://127.0.0.1:8888/ in your browser to log in with Spotify.")

In [None]:
# 2/4

# Code to wait for the authorization code
timeout = 30  # seconds
start_time = time.time()
while server.auth_code is None and (time.time() - start_time) < timeout:
    print("Waiting for authorization code...")
    time.sleep(2)

if server.auth_code:
    print("Authorization code:", server.auth_code)
    token_data = server.exchange_token()
    print("Token Data:", token_data)
else:
    print("Timed out waiting for authorization code.")

access_token = token_data['access_token']
refresh_token_ = token_data['refresh_token']
headers = {
    'Authorization': f"Bearer {access_token}"
}

In [None]:
# 3/4

# Test block to check if API is working properly: Should print recently played songs
url = 'https://api.spotify.com/v1/me/player/recently-played?limit=20'
response = requests.get(url, headers=headers)

if response.status_code == 200:
    recent_tracks = response.json()
    for item in recent_tracks['items']:
        track = item['track']
        print(track['name'], 'by', ', '.join([a['name'] for a in track['artists']]))
else:
    print(response.status_code, response.text)

In [None]:
# 4/4

# Datapipeline for Spotify User Data:
with open('spotify_config.json', 'r') as f:
        credentials = json.load(f)

# Fetching User data
user = SpotifyUserData(access_token, refresh_token_, credentials['client_id'], credentials['client_secret'])
fetcher = SpotifyUserDataFetcher(user)

top_tracks_json = fetcher.fetch_top_tracks(time_range='medium_term', limit=20)          # → List[str] (track_id)
recent_tracks_json = fetcher.fetch_recent_tracks(limit=20)                              # → List[Tuple[str, str]] (track_id, played_at)
saved_tracks_json = fetcher.fetch_saved_tracks(limit=20, offset=0)                      # → List[Tuple[str, str]] (track_id, added_at)
top_artists_json = fetcher.fetch_top_artists(time_range='medium_term', limit=20)        # → List[Tuple[str, int]] (artist_id, popularity)

# Test, printing nicely
# print(json.dumps(saved_tracks_json, indent=2))


In [None]:
# From this block onwards, the code is for processing user data and generating a source based user profiles
# 1/2

with open('source_weights.json', 'r') as f:
    source_weights = json.load(f)

# print(source_weights)

# Rank user tracks based on the source weights
ranked_track_ids, track_contributions = fetcher.rank_user_tracks(
    top_tracks_time_range='medium_term',
    top_tracks_limit=20,
    recent_tracks_limit=20,
    saved_tracks_limit=20,
    saved_tracks_offset=0,
    top_artists_time_range='medium_term',
    top_artists_limit=10,
    source_weights=source_weights
)

# Only for checking purposes: 
# print(ranked_track_ids)
# pretty_contributions = HelperFunctions.nested_defaultdict_to_dict(track_contributions)
# print(json.dumps(pretty_contributions, indent=2))


In [None]:
# 2/2 

# Building the user profile based on the ranked track IDs
user_source_profile = HelperFunctions.build_user_source_profiles(track_contributions, init_df, feature_columns)
# Only for checking/debugging purposes:
# print("User Source Profiles:", user_source_profile)

In [None]:
# Creating Recommendations:

# Fetching the top recommendations based on each source (top songs, saved songs, recent songs, top artists)
source_recommendations = {}

for source, vector in user_source_profile.items():
    normalized_vector = vector.reshape(1, -1).copy()
    faiss.normalize_L2(normalized_vector)
    D, I = index.search(normalized_vector, k=10)
    recommended_ids = [song_ids[i] for i in I[0]]
    
    source_recommendations[source] = {
        'song_ids': recommended_ids,
        'scores': D[0].tolist()
    }

for key in source_recommendations:
    print(f"Source: {key}")
    print("Recommended song IDs:", source_recommendations[key]['song_ids'])
    print("Scores:", source_recommendations[key]['scores'])
    HelperFunctions.print_song_names_and_artists(source_recommendations[key]['song_ids'], init_df)
    print("-" * 40)

In [None]:
# Creating instances of the bandit RL Model and the feedback collector:
bandit = EpsilonGreedyBandit(sources=['top_tracks', 'recent_tracks', 'saved_tracks', 'top_artist_tracks'], epsilon=0.1)

collector = RecommendationFeedbackCollector(
    user_source_profile=user_source_profile,
    index=index,
    song_ids=song_ids,
    init_df=init_df,
    print_song_names_and_artists=HelperFunctions.print_song_names_and_artists
)

In [None]:
# Collecting feedback from the user:
source_recommendations, user_feedback = collector.collect_feedback()

In [None]:
# Using user feedback to update the source weights:
for source, ratings in user_feedback.items():
    # Filter out None ratings (i.e., skipped songs)
    valid_ratings = [r for r in ratings if r is not None]
    
    # Store the initial weight before processing any ratings
    initial_weight = bandit.source_weights[source]
    
    # Only update if there are valid ratings
    if valid_ratings:
        for rating in valid_ratings:
            bandit.update_source_weights(source, rating)
    
    # Print the initial and final weight
    print(f"Source: {source}")
    print(f"Initial weight: {initial_weight}")
    print(f"Final updated weight: {bandit.source_weights[source]}")
    print("-" * 40)

In [None]:
# Saving the updated source weights to a JSON file:
with open('source_weights.json', 'w') as f:
    json.dump(source_weights, f, indent=4)