<a href="https://colab.research.google.com/github/g23ai2026/mlbd_project/blob/main/baseline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Set your data path
data_path = '/content/drive/MyDrive/CSL7110/processed/'

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from collections import defaultdict
import pickle

In [None]:
playlist_tracks = pd.read_csv(f'{data_path}playlist_tracks.csv')
playlist_tracks.head()

Unnamed: 0,pid,track_id,position
0,0,1,0
1,0,2,1
2,0,3,2
3,0,4,3
4,0,5,4


In [None]:
# Configuration
WINDOW_SIZE = 5  # Look at last 5 tracks for prediction
TOP_K = 5       # Number of recommendations to generate

In [None]:


def build_cooccurrence_matrix(playlist_tracks):
    """Create weighted co-occurrence matrix with position decay"""
    cooccur = defaultdict(lambda: defaultdict(float))

    for _, group in playlist_tracks.groupby('pid'):
        tracks = group.sort_values('position')['track_id'].values
        for i in range(len(tracks)):
            for j in range(i+1, min(i+WINDOW_SIZE+1, len(tracks))):
                # Weight by position distance (closer = higher weight)
                weight = 1/(j-i)
                cooccur[tracks[i]][tracks[j]] += weight
                cooccur[tracks[j]][tracks[i]] += weight * 0.5  # Backward weight

    return cooccur





In [None]:
track_info = pd.read_csv(f'{data_path}tracks.csv')
track_names = dict(zip(track_info['track_id'], track_info['track_name']))

In [None]:
def generate_recommendations(cooccur, seed_tracks, k=10):
    """Generate recommendations with track names"""
    scores = defaultdict(float)
    recent_tracks = seed_tracks[-WINDOW_SIZE:]

    for i, track in enumerate(recent_tracks):
        weight = 1.0/(len(recent_tracks) - i)
        if track in cooccur:
            for neighbor, count in cooccur[track].items():
                if neighbor not in seed_tracks:
                    scores[neighbor] += count * weight

    # Get top recommendations with names
    top_tracks = sorted(scores.items(), key=lambda x: -x[1])[:k]
    return [(track_id, score, track_names.get(track_id, "Unknown Track"))
            for track_id, score in top_tracks]

def evaluate(recommendations, ground_truth):
    """Calculate evaluation metrics with track names"""
    relevant = set(ground_truth)
    retrieved = [t[0] for t in recommendations]

    # R-Precision
    r_prec = len(set(retrieved) & relevant) / len(relevant) if relevant else 0.0

    # NDCG
    relevance_scores = np.array([1.0 if t[0] in relevant else 0.0 for t in recommendations])
    ideal_scores = np.sort(relevance_scores)[::-1]
    ndcg = ndcg_score(ideal_scores, relevance_scores, k=TOP_K)

    # Click Depth
    click_depth = next((i for i, t in enumerate(recommendations, 1) if t[0] in relevant), TOP_K+1)

    return r_prec, ndcg, click_depth

In [None]:
def ndcg_score(true_relevance, pred_relevance, k=10):
    """Normalized Discounted Cumulative Gain"""
    # Convert lists to NumPy arrays for proper indexing
    true_relevance = np.asarray(true_relevance)
    pred_relevance = np.asarray(pred_relevance)

    # Sort predictions and true relevance
    pred_order = np.argsort(pred_relevance)[::-1][:k]
    true_order = np.argsort(true_relevance)[::-1][:k]

    # Calculate DCG
    dcg = np.sum(pred_relevance[pred_order] / np.log2(np.arange(2, len(pred_order)+2)))

    # Calculate IDCG
    idcg = np.sum(true_relevance[true_order] / np.log2(np.arange(2, len(true_order)+2)))

    return dcg / (idcg + 1e-8)  # Small epsilon to avoid division by zero

In [None]:
# filter those playlists where seeds and predicts are less than valid amount
def filter_playlists(df, n_seed, k_predict):
    """Keep only playlists with at least n_seed + k_predict tracks."""
    counts = df.groupby('pid')['track_id'].count()
    good = counts[counts >= n_seed + k_predict].index
    return df[df['pid'].isin(good)]
playlist_tracks_val = filter_playlists(playlist_tracks, WINDOW_SIZE, TOP_K)

In [None]:
playlist_tracks_val = playlist_tracks_val[playlist_tracks_val['pid'] <10000]

In [None]:
# Split by playlists (no track overlap)
train_pids, test_pids = train_test_split(playlist_tracks_val['pid'].unique(), test_size=0.2)
test_pids, val_pids = train_test_split(test_pids, test_size=0.5)

# Build co-occurrence matrix on training data
train_data = playlist_tracks_val[playlist_tracks_val['pid'].isin(train_pids)]
cooccur_matrix = build_cooccurrence_matrix(train_data)



In [None]:
# Test evaluation
test_data = playlist_tracks_val[playlist_tracks_val['pid'].isin(test_pids)]
metrics = {'r_precision': [], 'ndcg': [], 'click_depth': []}

for pid, group in test_data.groupby('pid'):
    tracks = group.sort_values('position')['track_id'].values
    if len(tracks) > WINDOW_SIZE + 5:  # Ensure enough tracks for test
        seed = tracks[:WINDOW_SIZE]
        ground_truth = tracks[WINDOW_SIZE:WINDOW_SIZE+5]

        recs = generate_recommendations(cooccur_matrix, seed, TOP_K)
        r_prec, ndcg, depth = evaluate(recs, ground_truth)

        metrics['r_precision'].append(r_prec)
        metrics['ndcg'].append(ndcg)
        if depth:
            metrics['click_depth'].append(depth)
                # Print example recommendations
        if len(metrics['r_precision']) == 1:  # First test case
            print("\nExample Recommendation:")
            print(f"Seed Tracks: {[track_names.get(t, 'Unknown') for t in seed]}")
            print("Recommended Tracks:")
            for i, (track_id, score, name) in enumerate(recs, 1):
                print(f"{i}. {name} (score: {score:.2f})")
# Print results
print(f"Average R-Precision: {np.mean(metrics['r_precision']):.3f}")
print(f"Average NDCG@10: {np.mean(metrics['ndcg']):.3f}")
print(f"Average Click Depth: {np.mean(metrics['click_depth']):.1f}")


Example Recommendation:
Seed Tracks: ['Twice', '7', 'Whole Wide World - Unpeeled', 'Goodbye Angels', 'Too Much To Think']
Recommended Tracks:
1. Same Old Situation (score: 1.00)
2. Do for Love (score: 1.00)
3. Five of Everything (score: 1.00)
4. Bom Bidi Bom (score: 1.00)
5. And Now I'm Nothing (score: 1.00)
Average R-Precision: 0.064
Average NDCG@10: 0.228
Average Click Depth: 5.1


In [None]:

sample_seed = [123, 456, 789]
recommendations = generate_recommendations(cooccur_matrix, sample_seed)
print("Top recommendations:", [t[0] for t in recommendations])


Top recommendations: [np.int64(792), np.int64(791), np.int64(787), np.int64(790), np.int64(461), np.int64(7115), np.int64(14401), np.int64(806), np.int64(4659), np.int64(449)]


In [None]:
print("\nExample Recommendation:")
print(f"Seed Tracks: {[track_names.get(t, 'Unknown') for t in seed]}")
print("Recommended Tracks:")
for i, (track_id, score, name) in enumerate(recommendations, 1):
    print(f"{i}. {name} (score: {score:.2f})")


Example Recommendation:
Seed Tracks: ['Hope', 'Blind', 'Disconnected', 'No Cigar', 'Penguins & Polarbears']
Recommended Tracks:
1. Come As You Are (score: 20.20)
2. Lithium (score: 10.89)
3. Heart-Shaped Box (score: 10.30)
4. In Bloom - Nevermind Version (score: 7.91)
5. Hey Jude - Remastered 2015 (score: 5.86)
6. Alive (score: 4.33)
7. Give It Away (score: 3.75)
8. Black Hole Sun (score: 3.72)
9. Here Comes The Sun - Remastered (score: 3.50)
10. Come Together - Remastered (score: 3.35)


In [None]:

# Save the model
with open(f'{data_path}cooccur_model_1kp.pkl', 'wb') as f:
    pickle.dump(cooccur_matrix, f)

# Load the model
with open(f'{data_path}cooccur_model_1kp.pkl', 'rb') as f:
    loaded_model = pickle.load(f)

In [None]:
from scipy.sparse import dok_matrix, csr_matrix

In [None]:



# Configuration
WINDOW_SIZE = 5
TOP_K = 10
MIN_PLAYLIST_LENGTH = 10

class PlaylistRecommender:
    def __init__(self):
        self.cooccur_matrix = None
        self.track_names = {}
        self.track_artists = {}
        self.track_popularity = {}
        self.num_tracks = 0

    def load_data(self, playlist_path: str, tracks_path: str, artists_path: str):
        """Load and preprocess all data"""
        # Load raw data
        playlist_tracks = pd.read_csv(playlist_path)
        tracks_df = pd.read_csv(tracks_path)
        artists_df = pd.read_csv(artists_path)

        # Create mappings
        self.track_names = dict(zip(tracks_df['track_id'], tracks_df['track_name']))
        self.track_artists = dict(zip(tracks_df['track_id'], tracks_df['artist_id']))

        # Calculate basic popularity (number of playlist appearances)
        self.track_popularity = playlist_tracks['track_id'].value_counts().to_dict()
        self.num_tracks = len(self.track_names)

        return playlist_tracks

    def build_sparse_cooccurrence(self, playlist_tracks: pd.DataFrame) -> csr_matrix:
        """Build optimized sparse co-occurrence matrix"""
        cooccur = dok_matrix((self.num_tracks + 1, self.num_tracks + 1), dtype=np.float32)

        for _, group in playlist_tracks.groupby('pid'):
            tracks = group.sort_values('position')['track_id'].values
            if len(tracks) < MIN_PLAYLIST_LENGTH:
                continue

            for i in range(len(tracks)):
                for j in range(i+1, min(i+WINDOW_SIZE+1, len(tracks))):
                    weight = 1.0 / (j - i)
                    cooccur[tracks[i], tracks[j]] += weight
                    cooccur[tracks[j], tracks[i]] += weight * 0.5

        return cooccur.tocsr()

    def train(self, playlist_path: str, tracks_path: str, artists_path: str):
        """Full training pipeline"""
        playlist_tracks = self.load_data(playlist_path, tracks_path, artists_path)
        self.cooccur_matrix = self.build_sparse_cooccurrence(playlist_tracks)

    def save_model(self, path: str):
        """Save trained model"""
        with open(path, 'wb') as f:
            pickle.dump({
                'matrix': self.cooccur_matrix,
                'track_names': self.track_names,
                'track_artists': self.track_artists,
                'track_popularity': self.track_popularity
            }, f)

    def load_model(self, path: str):
        """Load pretrained model"""
        with open(path, 'rb') as f:
            data = pickle.load(f)
            self.cooccur_matrix = data['matrix']
            self.track_names = data['track_names']
            self.track_artists = data['track_artists']
            self.track_popularity = data['track_popularity']
            self.num_tracks = len(self.track_names)

    def _get_artist_variety_score(self, track_id: int, selected_artists: set) -> float:
        """Penalize tracks from overrepresented artists"""
        artist_id = self.track_artists.get(track_id)
        artist_count = sum(1 for a in selected_artists if a == artist_id)
        return 1.0 / (1.0 + artist_count)

    def recommend_enhanced(self, seed_tracks: List[int], k: int = TOP_K) -> List[Tuple[int, float, str]]:
        """Generate optimized recommendations with diversity"""
        if not seed_tracks:
            return []

        # Get base recommendations with scores
        recent_seeds = seed_tracks[-WINDOW_SIZE:]
        scores = np.zeros(self.num_tracks + 1)

        for i, track in enumerate(recent_seeds):
            weight = 1.0 / (len(recent_seeds) - i)
            if track < self.cooccur_matrix.shape[0]:  # Check matrix bounds
                scores += self.cooccur_matrix[track].toarray()[0] * weight

        # Apply popularity boost (less popular tracks get small boost)
        for track in range(1, self.num_tracks + 1):
            if track not in seed_tracks:
                popularity = self.track_popularity.get(track, 0)
                scores[track] *= (1.0 + 0.5 * np.log1p(1.0 / (popularity + 1)))

        # Get top candidates
        top_candidates = np.argsort(-scores)[:k*3]  # Get 3x more candidates for diversity

        # Apply artist diversity
        final_recs = []
        selected_artists = set()
        for track in top_candidates:
            if track == 0:  # Skip padding index
                continue

            if track not in seed_tracks:
                artist = self.track_artists.get(track)
                diversity_score = self._get_artist_variety_score(track, selected_artists)
                final_score = scores[track] * diversity_score

                final_recs.append((
                    track,
                    final_score,
                    self.track_names.get(track, "Unknown Track"),
                    artist
                ))
                selected_artists.add(artist)

                if len(final_recs) >= k:
                    break

        # Sort by final score and return
        return sorted(final_recs, key=lambda x: -x[1])[:k]

# Example Usage
if __name__ == "__main__":
    # Initialize and train
    recommender = PlaylistRecommender()
    recommender.train(
        playlist_path="playlist_tracks.csv",
        tracks_path="tracks.csv",
        artists_path="artists.csv"
    )
    recommender.save_model("recommender_model.pkl")

    # Load pre-trained (alternative to training)
    # recommender = PlaylistRecommender()
    # recommender.load_model("recommender_model.pkl")

    # Generate recommendations
    sample_seed = [123, 456, 789]  # Replace with real track IDs
    recommendations = recommender.recommend_enhanced(sample_seed)

    # Display results
    print("Seed Tracks:")
    for i, track_id in enumerate(sample_seed, 1):
        print(f"{i}. {recommender.track_names.get(track_id, 'Unknown')} (ID: {track_id})")

    print("\nEnhanced Recommendations:")
    for i, (track_id, score, name, artist_id) in enumerate(recommendations, 1):
        print(f"{i}. {name} (Score: {score:.2f}, Artist: {artist_id})")

NameError: name 'List' is not defined