In [106]:
import requests
from dotenv import load_dotenv
import os
import json
import base64   
import pandas as pd
import librosa
import numpy as np
import lyricsgenius
import langdetect
import re
import string
import tempfile
from tqdm import tqdm  # Import tqdm for progress bar
from datetime import datetime, timedelta

from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Embedding, Dropout
from tensorflow.keras.optimizers import Adam

In [107]:
# For any api we can try using the "+" email trick to get more API keys

genius_client_id = "wZZ2RWc5mqp-5Pbz2W1rQJWE8LQ3pFBrb1Hw5_AOqgybq28mt7kjdjcG4zktCNbO"
genius_client_secret = "PefqBJHor_muDgTutGlaXXaxmzsI7TQCps9FQ3FwkUTT0WJIT3s0A5YA9mnFbfp_-CBhQF7b0omgE8kaM3dJ3w"
genius_access_token = "NUHHVpwnmbDYUYw8Padu0gQeHvYN4OsKYtE2MKNUpBUI6yR-xZXKY6S5NvCnFbiP"

lastfm_api_key = "97d5a64d5ba4a8bc580b752ceff3b87f"
lastfm_secret = "35175090bd61f6f16ac607bd26e5b1de"

In [3]:
base_url = 'http://ws.audioscrobbler.com/2.0/'

def lastfm_get(payload):
    headers = {'user-agent': 'DataCollectorBot'}
    payload['api_key'] = lastfm_api_key
    payload['format'] = 'json'
    response = requests.get(base_url, headers=headers, params=payload)
    return response.json()


def get_recent_tracks(user):
    payload = {'method': 'user.getrecenttracks', 'user': user}
    return lastfm_get(payload)

def get_weekly_artist_chart(user):
    payload = {'method': 'user.getweeklyartistchart', 'user': user}
    return lastfm_get(payload)

def get_weekly_track_chart(user):
    payload = {'method': 'user.getweeklytrackchart', 'user': user}
    return lastfm_get(payload)

In [82]:
def get_one_month_ago_timestamp():
    one_month_ago = datetime.now() - timedelta(days=30)
    return int(one_month_ago.timestamp())

def recent_tracks_for_user_to_df(user, min_tracks=50, max_tracks=100):
    from_timestamp = get_one_month_ago_timestamp()
    
    payload = {
        'method': 'user.getrecenttracks',
        'user': user,
        'from': from_timestamp,
        'limit': max_tracks 
    }
    
    recent_tracks = lastfm_get(payload)
    tracks_list = []
    
    if 'track' in recent_tracks.get('recenttracks', {}):
        for track in recent_tracks['recenttracks']['track']:
            if 'date' in track: 
                track_info = {
                    'User': user, 
                    'Artist': track['artist']['#text'],
                    'Track Name': track['name'],
                    'Timestamp': track['date']['uts']
                }
                tracks_list.append(track_info)

    df = pd.DataFrame(tracks_list)
    return df

def recent_tracks_all_users_to_df(users):
    all_tracks_dfs = [] 
    total_users = len(users)
    
    with tqdm(total=total_users, desc="Processing Users", unit="user") as pbar:
        for user in users:
            df = recent_tracks_for_user_to_df(user)
            all_tracks_dfs.append(df)
            pbar.update(1) 
    
    combined_df = pd.concat(all_tracks_dfs, ignore_index=True)
    
    return combined_df


df = pd.read_csv('../../Downloads/BT4222ProjectExcel/user_songs_filtered.csv')
users = df["Username"].unique()
combined_tracks_df = recent_tracks_all_users_to_df(users)

Processing Users: 100%|██████████| 5647/5647 [1:05:25<00:00,  1.44user/s]


In [84]:
combined_tracks_df.to_excel("../../Downloads/Users_Songs_Timestamps.xlsx")

In [26]:
def list_to_df(data_list, columns):
    if data_list:
        df = pd.DataFrame(data_list, columns=columns)
        return df
    else:
        return pd.DataFrame(columns=columns)

def get_weekly_artist_chart_df(user):
    result = get_weekly_artist_chart(user)
    artists = []
    if 'weeklyartistchart' in result and 'artist' in result['weeklyartistchart']:
        for item in result['weeklyartistchart']['artist']:
            artists.append({
                'Artist': item['name'],
                'Play Count': item['playcount']
            })
    return list_to_df(artists, ['Artist', 'Play Count'])

def get_weekly_track_chart_df(user):
    result = get_weekly_track_chart(user)
    tracks = []
    if 'weeklytrackchart' in result and 'track' in result['weeklytrackchart']:
        for item in result['weeklytrackchart']['track']:
            tracks.append({
                'Track Name': item['name'],
                'Artist': item['artist']['#text'],
                'Play Count': item['playcount']
            })
    return list_to_df(tracks, ['Track Name', 'Artist', 'Play Count'])

In [135]:
df = combined_tracks_df[:10000]

import numpy as np
from sklearn.model_selection import train_test_split

df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
df['Time_of_Day'] = df['Timestamp'].dt.hour
df['Artist_Track'] = df['Artist'] + ' - ' + df['Track Name']
time_of_day_encoded = pd.get_dummies(df['Time_of_Day'], prefix='hour')

label_encoder = LabelEncoder()
df['Artist_Track_Encoded'] = label_encoder.fit_transform(df['Artist_Track'])
df = pd.concat([df, time_of_day_encoded], axis=1)

sequence_length = 3
vocab_size = len(label_encoder.classes_)


X_seq_list, y_seq_list = [], []

for _, group in df.groupby('User'):
    group = group.sort_values('Timestamp')
    
    for i in range(len(group) - sequence_length + 1):
        artist_track_sequence = group['Artist_Track_Encoded'].iloc[i:i + sequence_length - 1].values
        
        time_features_sequence = group[time_of_day_encoded.columns].iloc[i:i + sequence_length - 1].values.reshape((sequence_length - 1) * len(time_of_day_encoded.columns))
        
        sequence = np.hstack([artist_track_sequence, time_features_sequence])
        
        label = group['Artist_Track_Encoded'].iloc[i + sequence_length - 1]
        
        X_seq_list.append(sequence)
        y_seq_list.append(label)

X_seq = np.array(X_seq_list)
# y_seq = to_categorical(y_seq_list, num_classes=vocab_size)
y_seq = np.array(y_seq_list) # Integer instead of one hot encoding


X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.2, shuffle=False)

num_artist_track_features = sequence_length - 1  
num_time_features = 24 * (sequence_length - 1)  

X_train_artist_track = X_train[:, :num_artist_track_features]  
X_train_time_features = X_train[:, num_artist_track_features:] 

X_test_artist_track = X_test[:, :num_artist_track_features] 
X_test_time_features = X_test[:, num_artist_track_features:]  


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Timestamp'] = pd.to_datetime(df['Timestamp'], unit='s')
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Time_of_Day'] = df['Timestamp'].dt.hour
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['Artist_Track'] = df['Artist'] + ' - ' + df['Track Name']
A value is trying to be set on a copy of

In [136]:
num_time_features_actual = X_train_time_features.shape[1]

num_time_features = num_time_features_actual
print("Updated number of time features:", num_time_features)


Updated number of time features: 48


# LSTM


In [137]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, LSTM, Dropout, Dense, Bidirectional, BatchNormalization, concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.mixed_precision import set_global_policy
from tensorflow.keras.models import load_model

set_global_policy('mixed_float16')

early_stopping = EarlyStopping(
    monitor='val_loss',  
    patience=10,  
    restore_best_weights=True 
)

artist_track_input = Input(shape=(sequence_length-1,), dtype='int32', name='artist_track_input')
time_features_input = Input(shape=(num_time_features,), name='time_features_input')  

embedding_layer = Embedding(input_dim=vocab_size + 1, output_dim=50, input_length=sequence_length-1)(artist_track_input)
lstm_layer = LSTM(40, dropout=0.2, recurrent_dropout=0.2)(embedding_layer)

time_dense_layer = Dense(40, activation='relu')(time_features_input)

combined = concatenate([lstm_layer, time_dense_layer])

x = Dropout(0.5)(combined)
x = BatchNormalization()(x)
x = Dense(100, activation='relu', kernel_regularizer=l1_l2(l1=0.01, l2=0.01))(x)
x = Dropout(0.5)(x)
output = Dense(vocab_size, activation='softmax')(x)

model = Model(inputs=[artist_track_input, time_features_input], outputs=output)

model.compile(loss='sparse_categorical_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
model.summary()




Model: "model_28"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 artist_track_input (InputLayer  [(None, 2)]         0           []                               
 )                                                                                                
                                                                                                  
 embedding_30 (Embedding)       (None, 2, 50)        330400      ['artist_track_input[0][0]']     
                                                                                                  
 time_features_input (InputLaye  [(None, 48)]        0           []                               
 r)                                                                                               
                                                                                           

In [126]:
def data_generator(X_artist_track, X_time_features, y, batch_size):

    num_samples = X_artist_track.shape[0]
    while True: 
        for offset in range(0, num_samples, batch_size):
            batch_X_artist_track = X_artist_track[offset:offset+batch_size]
            batch_X_time_features = X_time_features[offset:offset+batch_size]
            batch_y = y[offset:offset+batch_size]
            
            
            yield [batch_X_artist_track, batch_X_time_features], batch_y


In [138]:
batch_size = 128
steps_per_epoch = np.ceil(X_train_artist_track.shape[0] / batch_size)

train_generator = data_generator(X_train_artist_track, X_train_time_features, y_train, batch_size)
validation_generator = data_generator(X_test_artist_track, X_test_time_features, y_test, batch_size)

history = model.fit(
    train_generator,
    steps_per_epoch=steps_per_epoch,
    epochs=10,
    validation_data=validation_generator,
    validation_steps=np.ceil(X_test_artist_track.shape[0] / batch_size), 
    callbacks=[early_stopping]
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
model.save('lstm_model')

model = load_model('lstm_model')

In [140]:
def calculate_top_30_accuracy_and_print_recommendations(y_pred_prob, y_true, label_encoder):
    """
    Calculate the top-30 accuracy and print the recommendations and true values for a sample.

    :param y_pred_prob: Model predictions (probabilities).
    :param y_true: True labels (integer encoded).
    :param label_encoder: Fitted LabelEncoder used for encoding labels.
    :param sample_size: Number of samples to print recommendations for.
    """
    k = 30
    top_k_indices = np.argsort(y_pred_prob, axis=1)[:, -k:][::-1] 
    top_k_accuracy_list = []

    for i, (top_k, true) in enumerate(zip(top_k_indices, y_true)):
        hit = true in top_k
        top_k_accuracy_list.append(int(hit))

        recommended_names = label_encoder.inverse_transform(top_k)
        true_name = label_encoder.inverse_transform([true])[0]
        
        # print(f"Top-30 Recommendations: {recommended_names}")
        # print(f"True Value: {true_name}")
        print(hit)

    top_30_accuracy = np.mean(top_k_accuracy_list)
    print(f"Top-30 Accuracy: {top_30_accuracy*100:.2f}%")
    
    return top_30_accuracy

y_pred_prob = model.predict([X_test_artist_track, X_test_time_features])
y_true = y_test


calculate_top_30_accuracy_and_print_recommendations(y_pred_prob, y_true, label_encoder)


False
False
False
False
True
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
True
True
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
F

0.0163265306122449