## Setup and Installation

In [None]:
# Install required packages (uncomment if running on Colab)
# !pip install recommenders
# !pip install pyspark
# !apt-get update -qq
# !apt-get install openjdk-8-jdk-headless -qq
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import time
import pandas as pd
import numpy as np
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

# Recommenders library imports
from recommenders.datasets import movielens
from recommenders.datasets.python_splitters import python_random_split
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.python_evaluation import map_at_k, ndcg_at_k
from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation
from recommenders.models.sar import SAR
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.timer import Timer

print(f"System version: {sys.version}")
print(f"Spark version: {pyspark.__version__}")

## Configuration and Constants

In [None]:
# Top K items to recommend
TOP_K = 10

# MovieLens data size
MOVIELENS_DATA_SIZE = '20m'

# Column names
COL_USER = "UserId"
COL_ITEM = "MovieId"
COL_RATING = "Rating"
COL_TIMESTAMP = "Timestamp"
COL_PREDICTION = "prediction"

# Train/test split ratio
TRAIN_RATIO = 0.75
RANDOM_SEED = 42

print(f"Configuration:")
print(f"  Dataset: MovieLens-{MOVIELENS_DATA_SIZE}")
print(f"  Top-K: {TOP_K}")
print(f"  Train ratio: {TRAIN_RATIO}")

## 1. Data Loading and Preprocessing

In [None]:
# Load data for Python-based models (Popularity, Item-KNN)
print("Loading MovieLens data...")
df = movielens.load_pandas_df(
    size=MOVIELENS_DATA_SIZE,
    header=[COL_USER, COL_ITEM, COL_RATING, COL_TIMESTAMP]
)

print(f"\nDataset shape: {df.shape}")
print(f"Number of users: {df[COL_USER].nunique()}")
print(f"Number of items: {df[COL_ITEM].nunique()}")
print(f"Number of ratings: {len(df)}")
print(f"Rating density: {len(df) / (df[COL_USER].nunique() * df[COL_ITEM].nunique()):.4%}")

df.head()

In [None]:
# Split data for Python-based models
train_df, test_df = python_random_split(df, ratio=TRAIN_RATIO, seed=RANDOM_SEED)

print(f"Train set: {len(train_df)} ratings")
print(f"Test set: {len(test_df)} ratings")

## 2. Algorithm 1: Popularity-Based Recommender

This recommender suggests the most popular items based on rating counts and average ratings.

In [None]:
class PopularityRecommender:
    """Popularity-based recommender system."""
    
    def __init__(self, col_user=COL_USER, col_item=COL_ITEM, col_rating=COL_RATING):
        self.col_user = col_user
        self.col_item = col_item
        self.col_rating = col_rating
        self.popular_items = None
        
    def fit(self, train_df):
        """Calculate item popularity based on rating count and average rating."""
        # Calculate popularity score: weighted combination of count and avg rating
        item_stats = train_df.groupby(self.col_item).agg({
            self.col_rating: ['count', 'mean']
        })
        item_stats.columns = ['count', 'avg_rating']
        
        # Normalize and combine metrics
        item_stats['count_norm'] = (item_stats['count'] - item_stats['count'].min()) / \
                                    (item_stats['count'].max() - item_stats['count'].min())
        item_stats['rating_norm'] = (item_stats['avg_rating'] - item_stats['avg_rating'].min()) / \
                                     (item_stats['avg_rating'].max() - item_stats['avg_rating'].min())
        
        # Popularity score: 70% count, 30% rating
        item_stats['popularity_score'] = 0.7 * item_stats['count_norm'] + 0.3 * item_stats['rating_norm']
        
        self.popular_items = item_stats.sort_values('popularity_score', ascending=False)
        return self
    
    def recommend_k_items(self, test_df, top_k=TOP_K, remove_seen=True):
        """Recommend top-k popular items for each user."""
        recommendations = []
        
        for user_id in test_df[self.col_user].unique():
            # Get user's seen items if we need to remove them
            if remove_seen:
                seen_items = test_df[test_df[self.col_user] == user_id][self.col_item].values
                available_items = self.popular_items[~self.popular_items.index.isin(seen_items)]
            else:
                available_items = self.popular_items
            
            # Recommend top-k items
            top_items = available_items.head(top_k)
            
            for rank, (item_id, row) in enumerate(top_items.iterrows(), 1):
                recommendations.append({
                    self.col_user: user_id,
                    self.col_item: item_id,
                    COL_PREDICTION: row['popularity_score']
                })
        
        return pd.DataFrame(recommendations)

print("Popularity-based recommender defined.")

In [None]:
# Train Popularity-based model
print("Training Popularity-based recommender...")
with Timer() as train_time:
    popularity_model = PopularityRecommender()
    popularity_model.fit(train_df)

print(f"Training completed in {train_time.interval:.2f} seconds")
print(f"\nTop 10 most popular movies:")
print(popularity_model.popular_items.head(10))

In [None]:
# Generate predictions for Popularity model
print("Generating recommendations...")
with Timer() as pred_time:
    popularity_predictions = popularity_model.recommend_k_items(test_df, top_k=TOP_K, remove_seen=True)

print(f"Prediction completed in {pred_time.interval:.2f} seconds")
print(f"Generated {len(popularity_predictions)} recommendations")
popularity_predictions.head(20)

## 3. Algorithm 2: Item-KNN (SAR - Smart Adaptive Recommendations)

Using the SAR algorithm from the recommenders library, which is an item-based collaborative filtering approach.

In [None]:
# Train SAR (Item-based CF) model
print("Training SAR (Item-KNN) recommender...")

sar_model = SAR(
    col_user=COL_USER,
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_timestamp=COL_TIMESTAMP,
    similarity_type="jaccard",  # Can also use 'lift' or 'cooccurrence'
    time_decay_coefficient=30,
    timedecay_formula=True,
    normalize=True
)

with Timer() as train_time:
    sar_model.fit(train_df)

print(f"Training completed in {train_time.interval:.2f} seconds")

In [None]:
# Generate predictions for SAR model
print("Generating recommendations...")
with Timer() as pred_time:
    sar_predictions = sar_model.recommend_k_items(
        test_df,
        top_k=TOP_K,
        remove_seen=True
    )

print(f"Prediction completed in {pred_time.interval:.2f} seconds")
print(f"Generated {len(sar_predictions)} recommendations")
sar_predictions.head(20)

## 4. Algorithm 3: ALS (Alternating Least Squares)

Matrix factorization using PySpark's ALS implementation.

In [None]:
# Initialize Spark
spark = start_or_get_spark("Movie Recommendation Demo", memory="8g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")
print("Spark session initialized")

In [None]:
# Load data for Spark
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType()),
    )
)

data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
train_spark, test_spark = spark_random_split(data, ratio=TRAIN_RATIO, seed=RANDOM_SEED)

print(f"Train set: {train_spark.count()} ratings")
print(f"Test set: {test_spark.count()} ratings")

In [None]:
# Train ALS model
print("Training ALS recommender...")

als_model = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=RANDOM_SEED,
    userCol=COL_USER,
    itemCol=COL_ITEM,
    ratingCol=COL_RATING
)

with Timer() as train_time:
    als_fitted_model = als_model.fit(train_spark)

print(f"Training completed in {train_time.interval:.2f} seconds")

In [None]:
# Generate predictions for ALS model
print("Generating recommendations...")

with Timer() as pred_time:
    # Get all user-item pairs
    users = train_spark.select(COL_USER).distinct()
    items = train_spark.select(COL_ITEM).distinct()
    user_item = users.crossJoin(items)
    
    # Get predictions
    dfs_pred = als_fitted_model.transform(user_item)
    
    # Remove seen items
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train_spark.alias("train"),
        (dfs_pred[COL_USER] == train_spark[COL_USER]) & 
        (dfs_pred[COL_ITEM] == train_spark[COL_ITEM]),
        how='outer'
    )
    
    als_predictions = dfs_pred_exclude_train.filter(
        dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()
    ).select(
        'pred.' + COL_USER, 
        'pred.' + COL_ITEM, 
        'pred.' + COL_PREDICTION
    )
    
    # Force execution
    als_predictions.cache().count()

print(f"Prediction completed in {pred_time.interval:.2f} seconds")
als_predictions.show(20)

## 5. Evaluation: MAP@K and NDCG@K

Now let's evaluate all three algorithms using ranking metrics.

In [None]:
# Evaluate Popularity-based model
print("Evaluating Popularity-based Recommender...")

popularity_map = map_at_k(
    test_df, 
    popularity_predictions, 
    col_user=COL_USER, 
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
    k=TOP_K
)

popularity_ndcg = ndcg_at_k(
    test_df, 
    popularity_predictions, 
    col_user=COL_USER, 
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
    k=TOP_K
)

print(f"Popularity - MAP@{TOP_K}: {popularity_map:.4f}")
print(f"Popularity - NDCG@{TOP_K}: {popularity_ndcg:.4f}")

In [None]:
# Evaluate SAR (Item-KNN) model
print("Evaluating SAR (Item-KNN) Recommender...")

sar_map = map_at_k(
    test_df, 
    sar_predictions, 
    col_user=COL_USER, 
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
    k=TOP_K
)

sar_ndcg = ndcg_at_k(
    test_df, 
    sar_predictions, 
    col_user=COL_USER, 
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
    k=TOP_K
)

print(f"SAR - MAP@{TOP_K}: {sar_map:.4f}")
print(f"SAR - NDCG@{TOP_K}: {sar_ndcg:.4f}")

In [None]:
# Evaluate ALS model
print("Evaluating ALS Recommender...")

als_eval = SparkRankingEvaluation(
    test_spark, 
    als_predictions,
    k=TOP_K,
    col_user=COL_USER,
    col_item=COL_ITEM,
    col_rating=COL_RATING,
    col_prediction=COL_PREDICTION,
    relevancy_method="top_k"
)

als_map = als_eval.map_at_k()
als_ndcg = als_eval.ndcg_at_k()

print(f"ALS - MAP@{TOP_K}: {als_map:.4f}")
print(f"ALS - NDCG@{TOP_K}: {als_ndcg:.4f}")

## 6. Results Comparison

In [None]:
# Create results comparison table
results = pd.DataFrame({
    'Algorithm': ['Popularity', 'Item-KNN (SAR)', 'ALS'],
    'MAP@10': [popularity_map, sar_map, als_map],
    'NDCG@10': [popularity_ndcg, sar_ndcg, als_ndcg]
})

# Sort by MAP@10
results = results.sort_values('MAP@10', ascending=False).reset_index(drop=True)

print("\n" + "="*60)
print("FINAL RESULTS - MovieLens 20M Recommendation Algorithms")
print("="*60)
print(results.to_string(index=False))
print("="*60)

# Highlight best performing model
best_model = results.iloc[0]['Algorithm']
print(f"\nüèÜ Best performing model: {best_model}")

In [None]:
# Visualize results
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# MAP@10 comparison
axes[0].bar(results['Algorithm'], results['MAP@10'], color=['#FF6B6B', '#4ECDC4', '#45B7D1'])
axes[0].set_ylabel('MAP@10', fontsize=12)
axes[0].set_title('Mean Average Precision @ 10', fontsize=14, fontweight='bold')
axes[0].set_ylim([0, max(results['MAP@10']) * 1.2])
for i, v in enumerate(results['MAP@10']):
    axes[0].text(i, v + 0.001, f'{v:.4f}', ha='center', va='bottom', fontweight='bold')

# NDCG@10 comparison
axes[1].bar(results['Algorithm'], results['NDCG@10'], color=['#FF6B6B', '#4ECDC4', '#45B7D1'])
axes[1].set_ylabel('NDCG@10', fontsize=12)
axes[1].set_title('Normalized Discounted Cumulative Gain @ 10', fontsize=14, fontweight='bold')
axes[1].set_ylim([0, max(results['NDCG@10']) * 1.2])
for i, v in enumerate(results['NDCG@10']):
    axes[1].text(i, v + 0.001, f'{v:.4f}', ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.savefig('recommendation_comparison.png', dpi=300, bbox_inches='tight')
plt.show()

print("\nVisualization saved as 'recommendation_comparison.png'")

## 7. Sample Recommendations

Let's look at some actual recommendations for a sample user.

In [None]:
# Select a sample user from test set
sample_user = test_df[COL_USER].value_counts().index[0]
print(f"Sample User ID: {sample_user}")
print(f"\nUser's actual ratings in test set:")
user_test_ratings = test_df[test_df[COL_USER] == sample_user].sort_values(COL_RATING, ascending=False)
print(user_test_ratings.head(10))

In [None]:
# Get recommendations from all three models
print(f"\n{'='*60}")
print(f"RECOMMENDATIONS FOR USER {sample_user}")
print(f"{'='*60}\n")

# Popularity recommendations
pop_recs = popularity_predictions[popularity_predictions[COL_USER] == sample_user].head(TOP_K)
print("Popularity-based Recommendations:")
print(pop_recs)

# SAR recommendations
sar_recs = sar_predictions[sar_predictions[COL_USER] == sample_user].head(TOP_K)
print("\nItem-KNN (SAR) Recommendations:")
print(sar_recs)

# ALS recommendations
als_recs_pd = als_predictions.filter(als_predictions[COL_USER] == sample_user).toPandas().head(TOP_K)
print("\nALS Recommendations:")
print(als_recs_pd)

## 8. Conclusion

### Key Findings:

1. **Popularity-based Recommender**: Simple baseline that recommends globally popular items. Fast but lacks personalization.

2. **Item-KNN (SAR)**: Item-based collaborative filtering that finds similar items. Good balance of performance and interpretability.

3. **ALS**: Matrix factorization approach that learns latent factors. Often provides the best personalization but requires more computational resources.

### Metrics:
- **MAP@K**: Measures precision of recommendations considering order
- **NDCG@K**: Measures ranking quality with position-based discounting

The results show the trade-offs between algorithmic complexity, computational cost, and recommendation quality.

In [None]:
# Cleanup
spark.stop()
print("Spark session stopped. Demo complete!")