In [1]:
from sklearn.neighbors import NearestNeighbors
from scipy import sparse
from sklearn.model_selection import train_test_split
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as npy
from scipy.stats import linregress
from sklearn.model_selection import GroupShuffleSplit
import time
from pathlib import Path
import traceback
import os
import gc
plt.rcParams['figure.dpi'] = 400

In [3]:
bg = pd.read_feather("boardgamegeek.feather")
np = pd.read_feather("netflix_prize.feather")
m = pd.read_feather("movielens_25m.feather") 
y = pd.read_feather("yahoo_r2_songs.subsampled.feather")

In [None]:
bg = bg[bg['user_id'].notnull()]


unique_items = bg['item_id'].unique()
item_id_map = {old: new for new, old in enumerate(unique_items)}
bg['item_id'] = bg['item_id'].map(item_id_map)
n_items = len(unique_items)

groups = bg['user_id']
gss = GroupShuffleSplit(n_splits=1,test_size=0.25,random_state=42)

train_idx,test_idx = next(gss.split(bg,groups=groups))

train_data = bg.iloc[train_idx]
test_data = bg.iloc[test_idx]

unique_items = train_data['user_id'].unique()
item_id_map = {old: new for new, old in enumerate(unique_items)}
train_data['user_id'] = train_data['user_id'].map(item_id_map)

n_training_users = len(unique_items)

training_X = sparse.csr_matrix((train_data["rating"],
                                (train_data["user_id"],
                                train_data["item_id"])),
                                shape=(n_training_users, n_items))

user_counts = test_data['user_id'].value_counts()
valid_users = user_counts[user_counts >= 2].index
filtered_data = test_data[test_data['user_id'].isin(valid_users)]

unique_items = filtered_data['user_id'].unique()
item_id_map = {old: new for new, old in enumerate(unique_items)}
filtered_data['user_id'] = filtered_data['user_id'].map(item_id_map)

test_seen, test_unseen = train_test_split(
    filtered_data,
    test_size=0.25,
    stratify=filtered_data['user_id'],
    random_state=42
)

n_testing_users = test_seen['user_id'].nunique()

testing_X = sparse.csr_matrix((test_seen["rating"],
                                (test_seen["user_id"],
                                test_seen["item_id"])),
                                shape=(n_testing_users, n_items))

# try k for 5, 10, 25, 50, 100, 150, 250, 350, 500
k = 5  
nn_model = NearestNeighbors(n_neighbors=k, metric='cosine',n_jobs=-1)
nn_model.fit(training_X)


distances, indices = nn_model.kneighbors(testing_X)

test_users = []
neighbor_users = []


for test_user_idx, neighbor_indices in enumerate(indices):
    test_users.extend([test_user_idx] * len(neighbor_indices))
    neighbor_users.extend(neighbor_indices)
neighbors_df = pd.DataFrame({
    'test_user_id': test_users,
    'neighbor_user_id': neighbor_users
})

merged = neighbors_df.merge(train_data, left_on='neighbor_user_id', right_on='user_id', how='inner')

result = (merged.groupby(['test_user_id', 'item_id'])['rating']
            .mean()
            .reset_index()
            .rename(columns={'test_user_id': 'user_id', 'rating': 'avg_rating'}))

k_avg_ratings = test_unseen.merge(result, how = 'left', on = ['item_id','user_id'])

k_avg_ratings.to_feather("knn/results/bg/5_nn.feather")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_data['user_id'] = train_data['user_id'].map(item_id_map)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_data['user_id'] = filtered_data['user_id'].map(item_id_map)


0,1,2
,n_neighbors,5
,radius,1.0
,algorithm,'auto'
,leaf_size,30
,metric,'cosine'
,p,2
,metric_params,
,n_jobs,-1


In [2]:
def process_dataset(dataset_name, data, k_values, base_output_dir="knn/results"):
    """
    Process a single dataset with multiple k values
    Uses chunking for memory-efficient average rating calculation
    """
    print(f"\n{'='*60}")
    print(f"Processing dataset: {dataset_name}")
    print(f"{'='*60}")
    
    dataset_start_time = time.time()
    
    try:
        # Data preprocessing
        print(f"Preprocessing {dataset_name}...")
        data = data[data['user_id'].notnull()]
        
        # Map item IDs to continuous range
        unique_items = data['item_id'].unique()
        item_id_map = {old: new for new, old in enumerate(unique_items)}
        data['item_id'] = data['item_id'].map(item_id_map)
        n_items = len(unique_items)
        
        # Split data
        groups = data['user_id']
        gss = GroupShuffleSplit(n_splits=1, test_size=0.25, random_state=42)
        train_idx, test_idx = next(gss.split(data, groups=groups))
        
        train_data = data.iloc[train_idx]
        test_data = data.iloc[test_idx]
        
        # Map user IDs for training data
        unique_users = train_data['user_id'].unique()
        user_id_map = {old: new for new, old in enumerate(unique_users)}
        train_data['user_id'] = train_data['user_id'].map(user_id_map)
        n_training_users = len(unique_users)
        
        # Create training matrix
        training_X = sparse.csr_matrix((train_data["rating"],
                                      (train_data["user_id"],
                                       train_data["item_id"])),
                                      shape=(n_training_users, n_items))
        
        # Filter test data for users with at least 2 ratings
        user_counts = test_data['user_id'].value_counts()
        valid_users = user_counts[user_counts >= 2].index
        filtered_data = test_data[test_data['user_id'].isin(valid_users)]
        
        # Map user IDs for test data
        unique_test_users = filtered_data['user_id'].unique()
        test_user_id_map = {old: new for new, old in enumerate(unique_test_users)}
        filtered_data['user_id'] = filtered_data['user_id'].map(test_user_id_map)
        
        # Split test data into seen and unseen
        test_seen, test_unseen = train_test_split(
            filtered_data,
            test_size=0.25,
            stratify=filtered_data['user_id'],
            random_state=42
        )
        
        n_testing_users = test_seen['user_id'].nunique()
        
        # Create testing matrix
        testing_X = sparse.csr_matrix((test_seen["rating"],
                                     (test_seen["user_id"],
                                      test_seen["item_id"])),
                                     shape=(n_testing_users, n_items))
        
        print(f"Dataset {dataset_name} preprocessed successfully")
        print(f"Training users: {n_training_users}, Testing users: {n_testing_users}, Items: {n_items}")
        
        # Process each k value
        for k in k_values:
            # Check if output file already exists
            output_dir = Path(base_output_dir) / dataset_name
            output_file = output_dir / f"{k}_nn.feather"
            
            if output_file.exists():
                print(f"Skipping k={k} for {dataset_name} - file already exists: {output_file}")
                continue
            
            k_start_time = time.time()
            try:
                print(f"\nProcessing k={k} for {dataset_name}...")
                
                # Ensure output directory exists
                output_dir.mkdir(parents=True, exist_ok=True)
                
                # Fit KNN model
                nn_model = NearestNeighbors(n_neighbors=k, metric='cosine', n_jobs=-1)
                nn_model.fit(training_X)
                
                # Find neighbors
                distances, indices = nn_model.kneighbors(testing_X)
                
                # Memory-efficient processing with chunking by test users
                print(f"Processing neighbors in chunks by test users to manage memory...")
                
               # Initialize result accumulator
                all_results = []
                initial_chunk_size = 5000  # Starting chunk size
                min_chunk_size = 100       # Minimum chunk size before giving up
                chunk_size = initial_chunk_size
                n_test_users = len(indices)

                # Process test users in chunks with dynamic sizing
                chunk_start = 0
                while chunk_start < n_test_users:
                    try:
                        chunk_end = min(chunk_start + chunk_size, n_test_users)
                        
                        # Create neighbors dataframe for this chunk of test users
                        chunk_test_users = []
                        chunk_neighbor_users = []
                        
                        for test_user_idx in range(chunk_start, chunk_end):
                            neighbor_indices = indices[test_user_idx]
                            chunk_test_users.extend([test_user_idx] * len(neighbor_indices))
                            chunk_neighbor_users.extend(neighbor_indices)
                        
                        if not chunk_test_users:  # Skip empty chunks
                            chunk_start = chunk_end
                            continue
                        
                        # Create neighbors dataframe for this chunk
                        chunk_neighbors_df = pd.DataFrame({
                            'test_user_id': chunk_test_users,
                            'neighbor_user_id': chunk_neighbor_users
                        })
                        
                        # Merge with training data
                        chunk_merged = chunk_neighbors_df.merge(
                            train_data, 
                            left_on='neighbor_user_id', 
                            right_on='user_id', 
                            how='inner'
                        )
                        
                        # Calculate average ratings for this chunk of test users
                        chunk_result = (chunk_merged.groupby(['test_user_id', 'item_id'])['rating']
                                    .mean()
                                    .reset_index()
                                    .rename(columns={'test_user_id': 'user_id', 'rating': 'avg_rating'}))
                        
                        all_results.append(chunk_result)
                        
                        # Clean up chunk data to free memory
                        del chunk_neighbors_df, chunk_merged, chunk_result
                        
                        # Successfully processed chunk, move to next
                        chunk_start = chunk_end
                        
                        # Reset chunk size to initial size after successful processing
                        # (in case it was reduced due to previous memory errors)
                        if chunk_size < initial_chunk_size:
                            chunk_size = min(initial_chunk_size, chunk_size * 2)  # Gradually increase back
                        
                        if chunk_end % 5000 == 0 or chunk_end == n_test_users:
                            print(f"Processed {chunk_end}/{n_test_users} test users (chunk_size={chunk_size})")
                    
                    except MemoryError as e:
                        print(f"MEMORY ERROR with chunk_size={chunk_size}: {str(e)}")
                        
                        # Clean up any partially created variables
                        if 'chunk_neighbors_df' in locals():
                            del chunk_neighbors_df
                        if 'chunk_merged' in locals():
                            del chunk_merged
                        if 'chunk_result' in locals():
                            del chunk_result
                        
                        # Force garbage collection
                        gc.collect()
                        
                        # Reduce chunk size
                        new_chunk_size = max(chunk_size // 2, min_chunk_size)
                        
                        if new_chunk_size < min_chunk_size:
                            print(f"Chunk size would be below minimum ({min_chunk_size}). Cannot continue processing.")
                            break
                        
                        print(f"Reducing chunk_size from {chunk_size} to {new_chunk_size}")
                        chunk_size = new_chunk_size
                        
                        # Don't increment chunk_start - retry the same chunk with smaller size
                        continue

                # Combine all chunk results (no additional aggregation needed)
                if all_results:
                    result = pd.concat(all_results, ignore_index=True)
                    del all_results  # Free memory
                else:
                    result = pd.DataFrame(columns=['user_id', 'item_id', 'avg_rating'])

                # return test_unseen,result,all_results
                # Merge with test_unseen
                k_avg_ratings = test_unseen.merge(result, how='left', on=['item_id', 'user_id'])

                # Save results
                k_avg_ratings.to_feather(output_file)

                # Force garbage collection after each k value
                gc.collect()

                k_end_time = time.time()
                k_duration = k_end_time - k_start_time
                print(f"k={k} completed in {k_duration:.2f} seconds")
                
            except Exception as e:
                print(f"ERROR processing k={k} for {dataset_name}: {str(e)}")
                print(f"Traceback: {traceback.format_exc()}")
                continue
        
        dataset_end_time = time.time()
        dataset_duration = dataset_end_time - dataset_start_time
        print(f"\nDataset {dataset_name} completed in {dataset_duration:.2f} seconds ({dataset_duration/60:.2f} minutes)")
        
    except Exception as e:
        print(f"CRITICAL ERROR processing dataset {dataset_name}: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        dataset_end_time = time.time()
        dataset_duration = dataset_end_time - dataset_start_time
        print(f"Dataset {dataset_name} failed after {dataset_duration:.2f} seconds")



In [3]:

"""
Main function to process all datasets
"""
total_start_time = time.time()

# Define datasets
datasets = {
    'bg': 'boardgamegeek.feather',
    'np': 'netflix_prize.feather',
    'm': 'movielens_25m.feather',
    'y': 'yahoo_r2_songs.subsampled.feather'
}

# Define k values to try
k_values = [5, 10, 25, 50, 100, 150, 250, 350, 500]

print("Starting KNN processing for all datasets")
print(f"K values to process: {k_values}")
print(f"Total datasets: {len(datasets)}")
print("Note: Will skip processing if output file already exists")

# Process each dataset
for dataset_name, filename in datasets.items():
    try:
        print(f"\nLoading dataset: {filename}")
        data = pd.read_feather(filename)
        print(f"Dataset loaded successfully. Shape: {data.shape}")
        
        test_unseen,result,all_results = process_dataset(dataset_name, data, k_values)
        # break
        # Force garbage collection between datasets
        del data
        gc.collect()
        
    except Exception as e:
        print(f"CRITICAL ERROR loading dataset {filename}: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        gc.collect()
        continue
    # break

total_end_time = time.time()
total_duration = total_end_time - total_start_time
print(f"\n{'='*60}")
print(f"ALL PROCESSING COMPLETED")
print(f"Total runtime: {total_duration:.2f} seconds ({total_duration/60:.2f} minutes, {total_duration/3600:.2f} hours)")
print(f"{'='*60}")

Starting KNN processing for all datasets
K values to process: [5, 10, 25, 50, 100, 150, 250, 350, 500]
Total datasets: 4
Note: Will skip processing if output file already exists

Loading dataset: boardgamegeek.feather
Dataset loaded successfully. Shape: (18942215, 3)

Processing dataset: bg
Preprocessing bg...


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data['item_id'] = data['item_id'].map(item_id_map)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_data['user_id'] = train_data['user_id'].map(user_id_map)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_data['user_id'] = filtered_data['user_id'].map(test_user_id_map)


Dataset bg preprocessed successfully
Training users: 308530, Testing users: 83026, Items: 21925

Processing k=5 for bg...
Processing neighbors in chunks by test users to manage memory...
Processed 5000/83026 test users (chunk_size=5000)
Processed 10000/83026 test users (chunk_size=5000)
Processed 15000/83026 test users (chunk_size=5000)
Processed 20000/83026 test users (chunk_size=5000)
Processed 25000/83026 test users (chunk_size=5000)
Processed 30000/83026 test users (chunk_size=5000)
Processed 35000/83026 test users (chunk_size=5000)
Processed 40000/83026 test users (chunk_size=5000)
Processed 45000/83026 test users (chunk_size=5000)
Processed 50000/83026 test users (chunk_size=5000)
Processed 55000/83026 test users (chunk_size=5000)
Processed 60000/83026 test users (chunk_size=5000)
Processed 65000/83026 test users (chunk_size=5000)
Processed 70000/83026 test users (chunk_size=5000)
Processed 75000/83026 test users (chunk_size=5000)
Processed 80000/83026 test users (chunk_size=5000

: 