In [1]:
from pyexpat import features

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
import matplotlib.pyplot as plt

In [4]:
track_path = 'tracks.csv'
playlist_path = 'playlists.csv'

## Data Pre-Processing

* Read `track.csv` file
* drop unused columns
* drop duplicate tracks based on `Track_ID` column

In [5]:
tracks = pd.read_csv(track_path)
tracks = tracks.drop(['Unnamed: 0'], axis=1)
tracks = tracks.drop_duplicates(subset=['Track_ID'])

* Read `playlists.csv` file
* drop unused columns
* drop duplicate tracks in a playlists based on `pid` and `track_id` column

In [6]:
playlists = pd.read_csv(playlist_path)
playlists = playlists.drop(['Unnamed: 0', 'track_uri', 'album_uri', 'artist_uri'], axis=1)
playlists = playlists.drop_duplicates(subset=['pid', 'track_id'])

* Convert `Release Date` from string to datetime data type
* Convert `Explicit` from boolean to integer data type

In [7]:
tracks['Release Date'] = pd.to_datetime(tracks['Release Date'], format='%Y-%m-%d', errors='coerce')
tracks['Explicit'] = tracks['Explicit'].astype(int)

* Splits each playlist into train and test tracks by 80% and 20%

In [8]:
def train_test_split_playlists(playlists, test_ratio=0.2, random_state=42):
    """
    Splits each playlist into train and test tracks. This simulates a scenario
    where we know some subset of a user's playlist tracks (train) and want to 
    predict the missing ones (test).
    """
    np.random.seed(random_state)
    train_data = []
    test_data = []
    for pid, group in playlists.groupby('pid'):
        track_list = group['track_id'].tolist()
        np.random.shuffle(track_list)
        split_index = int(len(track_list)*(1-test_ratio))
        train_tracks = track_list[:split_index]
        test_tracks = track_list[split_index:]
        
        for t in train_tracks:
            train_data.append({'pid': pid, 'track_id': t})
        for t in test_tracks:
            test_data.append({'pid': pid, 'track_id': t})
            
    train_df = pd.DataFrame(train_data)
    test_df = pd.DataFrame(test_data)
    return train_df, test_df

train_df, test_df = train_test_split_playlists(playlists, test_ratio=0.2)


* Normalize the numeric features by MinMaxScaler

In [9]:
numeric_features = ['Popularity', 'Danceability', 'Energy', 'Loudness', 'Speechiness', 
                    'Acousticness', 'Instrumentalness', 'Liveness', 'Valence', 'Tempo', 'Explicit']
train_track_ids = train_df['track_id'].unique()
train_tracks_df = tracks[tracks['Track_ID'].isin(train_track_ids)]
scaler = MinMaxScaler()
train_tracks_df.loc[:, numeric_features] = scaler.fit_transform(train_tracks_df[numeric_features].astype(float))



  train_tracks_df.loc[:, numeric_features] = scaler.fit_transform(train_tracks_df[numeric_features].astype(float))


In [None]:
# all_scaled_values = scaler.transform(tracks[numeric_features])
# tracks[numeric_features] = all_scaled_values

## Data Modeling

* Get user's training tracks

In [10]:
def get_user_playlist_tracks(playlist_df, pid):
    user_playlist = playlist_df[playlist_df['pid'] == pid]
    user_track_ids = user_playlist['track_id'].unique()
    return user_playlist, user_track_ids

* Build a user profile from the training tracks by averaging the feature vectors

In [11]:
def build_user_profile(tracks_df, user_track_ids, numeric_features):
    user_tracks = tracks_df[tracks_df['Track_ID'].isin(user_track_ids)]
    if user_tracks.empty:
        return None
    user_profile = user_tracks[numeric_features].mean(axis=0).values.reshape(1, -1)
    return user_profile


* Compute content-based similarity scores for tracks not in the user's training set.

In [12]:
def compute_content_scores(tracks_df, user_profile, user_track_ids, numeric_features):
    candidate_tracks = tracks_df[~tracks_df['Track_ID'].isin(user_track_ids)].copy()
    if candidate_tracks.empty or user_profile is None:
        return pd.DataFrame(columns=['Track_ID', 'content_score'])
    
    candidate_features = candidate_tracks[numeric_features].values
    similarities = cosine_similarity(candidate_features, user_profile)
    candidate_tracks['content_score'] = similarities[:, 0]
    return candidate_tracks[['Track_ID', 'content_score']]

* Build a co-occurrence matrix from the training dataset. This prevents the model from "seeing" test co-occurrences.

In [14]:
def build_cooccurrence_df(playlist_df):
    grouped = playlist_df.groupby('pid')['track_id'].apply(list)
    records = []
    for track_list in grouped:
        for i in range(len(track_list)):
            for j in range(i+1, len(track_list)):
                t1, t2 = track_list[i], track_list[j]
                records.append((t1, t2, 1))
                records.append((t2, t1, 1))
                
    cooccurrence_df = pd.DataFrame(records, columns=['track_id_1', 'track_id_2', 'count'])
    cooccurrence_df = cooccurrence_df.groupby(['track_id_1', 'track_id_2'], as_index=False)['count'].sum()
    return cooccurrence_df

In [16]:
cooccurrence = build_cooccurrence_df(train_df)



In [38]:
user_playlist, user_track_ids = get_user_playlist_tracks(train_df, 679430)

user_profile = build_user_profile(train_tracks_df, user_track_ids, numeric_features)
#np.set_printoptions(suppress=True)
print(user_profile)
print(numeric_features)

[[0.58888889 0.68036437 0.7018     0.84107911 0.12748958 0.2296247
  0.00000593 0.16032032 0.50530777 0.48692648 0.7       ]]
['Popularity', 'Danceability', 'Energy', 'Loudness', 'Speechiness', 'Acousticness', 'Instrumentalness', 'Liveness', 'Valence', 'Tempo', 'Explicit']


In [26]:
#matrix = cooccurrence.pivot(index="track_id_1", columns="track_id_2", values="count").fillna(0)
#matrix_nonzero = matrix.replace(0, "")

matrix = cooccurrence.pivot(index="track_id_1", columns="track_id_2", values="count").fillna(0)
matrix.to_csv("cooccurrence_matrix.csv", index=True)


#print(matrix_nonzero)

* Compute collaborative filtering scores from co-occurrence matrix for candidate tracks.

In [None]:
def compute_collaborative_scores_df(user_track_ids, cooccurrence_df, all_track_ids):
    user_cooccurrences = cooccurrence_df[cooccurrence_df['track_id_1'].isin(user_track_ids)]
    user_cooccurrences = user_cooccurrences[~user_cooccurrences['track_id_2'].isin(user_track_ids)]
    
    collab_df = user_cooccurrences.groupby('track_id_2', as_index=False)['count'].sum()
    collab_df.rename(columns={'track_id_2': 'Track_ID', 'count': 'collab_score'}, inplace=True)
    
    if not collab_df.empty:
        collab_df['collab_score'] = collab_df['collab_score'] / collab_df['collab_score'].max()
    else:
        collab_df['collab_score'] = 0
    
    collab_df = collab_df[collab_df['Track_ID'].isin(all_track_ids)]
    return collab_df

* Generate hybrid recommendations by both content-based filtering and collaborative filtering.

In [None]:
def hybrid_recommendations(tracks_df, user_profile, user_track_ids, numeric_features, cooccurrence_df, top_n, alpha=0.5):
    content_scores = compute_content_scores(tracks_df, user_profile, user_track_ids, numeric_features)
    all_track_ids = set(tracks_df['Track_ID'].unique())
    collab_scores = compute_collaborative_scores_df(user_track_ids, cooccurrence_df, all_track_ids)
    
    combined = pd.merge(content_scores, collab_scores, on='Track_ID', how='outer').fillna(0)
    combined['hybrid_score'] = alpha * combined['content_score'] + (1 - alpha) * combined['collab_score']
    combined = combined.sort_values('hybrid_score', ascending=False)
    return combined.head(top_n)['Track_ID'].tolist()

## Model Evaluation

* Evaluate the model by comparing the recommended tracks to the test tracks. We only use co-occurrence and user profiles built from training dataset.

In [None]:
def evaluate_model(tracks_df, train_df, test_df, numeric_features, cooccurrence, alpha=0.5):
    test_dict = test_df.groupby('pid')['track_id'].apply(set).to_dict()
    train_dict = train_df.groupby('pid')['track_id'].apply(set).to_dict()
    
    precisions, recalls = [], []
    results = []
    for pid in train_df['pid'].unique():
        user_train_tracks = train_dict.get(pid, set())
        user_test_tracks = test_dict.get(pid, set())
        
        if len(user_test_tracks) == 0:
            continue
        total_playlist_size = len(user_train_tracks) + len(user_test_tracks)
        
        # Build user profile from train tracks only
        user_profile = build_user_profile(tracks_df, list(user_train_tracks), numeric_features)
        if user_profile is None:
            continue
        
        recommendations = hybrid_recommendations(
            tracks_df,
            user_profile,
            list(user_train_tracks),
            numeric_features,
            cooccurrence,
            len(user_test_tracks),
            alpha=alpha
        )
        
        recommended_set = set(recommendations)
        hit_count = len(recommended_set.intersection(user_train_tracks.union(user_test_tracks)))
        
        #print(f"Length of test: {len(user_test_tracks)} | Length of recommendations: {len(recommended_set)}")
        #print(f"Hit count: {hit_count}")
        
        precision = hit_count / len(user_test_tracks)
        recall = hit_count / len(user_test_tracks)
        
        precisions.append(precision)
        recalls.append(recall)
        
        results.append({
            'pid': pid,
            'hit_rate': recall,
            'playlist_length': total_playlist_size
        })
        
    avg_precision = np.mean(precisions) if precisions else 0
    avg_recall = np.mean(recalls) if recalls else 0
    return pd.DataFrame(results)

* alpha = 1 - recommend tracks with only content-based filtering

In [None]:
content_results_df = evaluate_model(tracks, train_df, test_df, numeric_features, cooccurrence, alpha=1)

In [None]:
overall_content_hit_rate = content_results_df['hit_rate'].mean()
print(f"Content-Based Filtering Hit Rate: {overall_content_hit_rate}")

* alpha = 0, recommend tracks with only collaborative filtering

In [None]:
colab_results_df = evaluate_model(tracks, train_df, test_df, numeric_features, cooccurrence, alpha=0, top_n=10)

In [None]:
overall_colab_hit_rate = colab_results_df['hit_rate'].mean()
print(f"Collaborative Filtering Hit Rate: {overall_colab_hit_rate}")

* Find the optimal alpha

In [None]:
alpha = []
mean_hit_rate = []

for param in np.arange(0.1, 1.1, 0.01):
    alpha.append(param)
    print(param)
    result_df = evaluate_model(tracks, train_df, test_df, numeric_features, cooccurrence, alpha=param)
    hit_rate = result_df['hit_rate'].mean()
    mean_hit_rate.append(hit_rate)
    print(mean_hit_rate)

df = pd.DataFrame({
    'alpha': alpha,
    'mean_hit_rate': mean_hit_rate
})

print(df)


* Calculate the average hit rate for various sizes of playlist

In [None]:
results_df = evaluate_model(tracks, train_df, test_df, numeric_features, cooccurrence, alpha=0.60)


In [None]:
bin_edges = [0, 5, 10, 15, 20, 25, 30, 35, 40]
bin_labels = ['<5', '5-10', '11-15', '16-20', '21-25', '26-30', '31-35', '36-40']
results_df['length_bin'] = pd.cut(results_df['playlist_length'], bins=bin_edges, labels=bin_labels, right=True)
hit_rate_by_bin = results_df.groupby('length_bin', observed=False)['hit_rate'].mean().reset_index()

# bin_counts = results_df['length_bin'].value_counts().reset_index()
# bin_counts.columns = ['length_bin', 'playlist_count']

print(hit_rate_by_bin)
# print(bin_counts)


In [None]:
import matplotlib.pyplot as plt

# Data
length_bin = ["<5", "5-10", "11-15", "16-20", "21-25", "26-30", "31-35", "36-40"]
hit_rate = [0.021277, 0.029343, 0.034079, 0.040047, 0.051994, 0.050600, 0.054314, 0.070072]

# Create line chart
plt.figure(figsize=(10, 6))
plt.plot(length_bin, hit_rate, marker='o', linestyle='-', linewidth=2)
plt.title("Hit Rate by Length Bin", fontsize=16)
plt.xlabel("Length Bin", fontsize=14)
plt.ylabel("Hit Rate", fontsize=14)
plt.grid(True, which='both', linestyle='--', linewidth=0.5)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()

# Show plot
plt.show()
