In [1]:
#!/usr/bin/env python
# Spark-only implementation of recommendation models for MovieLens 32M

import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession, Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, lit, expr, udf, collect_list, sum, avg, desc, array, explode, first
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType, IntegerType, ArrayType, StructType, StructField, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import BucketedRandomProjectionLSH
import pickle
import joblib
import gc
import logging
import json
import math

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("recommender.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("MovieLensRecommender")

# Initialize Spark session
def init_spark(app_name="MovieLens_Recommender", memory="5g"):
    """Initialize a Spark session with specified memory allocation"""
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.memory", memory) \
        .config("spark.executor.memory", memory) \
        .config("spark.sql.session.timeZone", "UTC") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
        .config("spark.driver.maxResultSize", "4g") \
        .getOrCreate()
    
    # Set log level to reduce verbosity
    spark.sparkContext.setLogLevel("ERROR")
    
    logger.info(f"Spark session initialized with {memory} memory")
    return spark

# Load data using PySpark
def load_data(spark, data_dir="ml-32m", min_ratings=5):
    """Load the MovieLens dataset into Spark DataFrames"""
    logger.info("Loading data with PySpark...")
    start_time = time.time()
    
    # Load ratings
    ratings_df = spark.read.csv(
        os.path.join(data_dir, 'ratings.csv'),
        header=True,
        inferSchema=True
    )
    
    # Load movies
    movies_df = spark.read.csv(
        os.path.join(data_dir, 'movies.csv'),
        header=True,
        inferSchema=True
    )
    
    # Filter users and movies with minimum ratings
    user_counts = ratings_df.groupBy("userId").count().filter(col("count") >= min_ratings)
    active_users = user_counts.select("userId")
    
    movie_counts = ratings_df.groupBy("movieId").count().filter(col("count") >= min_ratings)
    active_movies = movie_counts.select("movieId")
    
    # Join to get filtered ratings
    filtered_ratings = ratings_df.join(active_users, "userId") \
                               .join(active_movies, "movieId")
    
    # Get stats about the filtered dataset
    n_users = active_users.count()
    n_movies = active_movies.count()
    n_ratings = filtered_ratings.count()
    
    logger.info(f"Data loaded in {time.time() - start_time:.2f} seconds")
    logger.info(f"Filtered dataset: {n_users:,} users, {n_movies:,} movies, {n_ratings:,} ratings")
    
    return filtered_ratings, movies_df, n_users, n_movies, active_users, active_movies

# Process movie features
def process_movie_features(spark, movies_df):
    """Extract and process movie features including genres"""
    logger.info("Processing movie features...")
    
    # Extract year from title
    movies_df = movies_df.withColumn(
        "year", 
        expr("CAST(regexp_extract(title, '\\((\\d{4})\\)', 1) AS INT)")
    )
    
    # Clean title (remove year)
    movies_df = movies_df.withColumn(
        "clean_title",
        expr("regexp_replace(title, '\\s*\\(\\d{4}\\)$', '')")
    )
    
    # Get all unique genres
    genres_list = movies_df.select("genres").distinct().rdd.flatMap(lambda x: x[0].split('|')).collect()
    all_genres = sorted(list(set(genres_list)))
    
    if '(no genres listed)' in all_genres:
        all_genres.remove('(no genres listed)')
    
    # Create genre feature columns
    for genre in all_genres:
        movies_df = movies_df.withColumn(
            f"genre_{genre}", 
            (col("genres").contains(genre)).cast(IntegerType())
        )
    
    # Create vector of genre features for content-based filtering
    genre_columns = [f"genre_{genre}" for genre in all_genres]
    assembler = VectorAssembler(inputCols=genre_columns, outputCol="genre_vector")
    movies_df = assembler.transform(movies_df)
    
    logger.info(f"Created {len(all_genres)} genre features and genre vectors")
    return movies_df, all_genres

# Split data into train, validation, and test sets
def train_val_test_split(ratings_df, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, seed=42):
    """Split ratings into training, validation, and test sets"""
    # Ensure ratios sum to 1
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6
    
    logger.info(f"Splitting data into train({train_ratio:.1%})/val({val_ratio:.1%})/test({test_ratio:.1%})...")
    
    # Split the data
    train_data, temp_data = ratings_df.randomSplit([train_ratio, val_ratio + test_ratio], seed=seed)
    
    # Adjust validation ratio
    val_adjusted_ratio = val_ratio / (val_ratio + test_ratio)
    val_data, test_data = temp_data.randomSplit([val_adjusted_ratio, 1.0 - val_adjusted_ratio], seed=seed)
    
    # Cache the datasets
    train_data.cache()
    val_data.cache()
    test_data.cache()
    
    # Display split sizes
    train_count = train_data.count()
    val_count = val_data.count()
    test_count = test_data.count()
    total = train_count + val_count + test_count
    
    logger.info(f"Training set: {train_count:,} ratings ({train_count/total*100:.2f}%)")
    logger.info(f"Validation set: {val_count:,} ratings ({val_count/total*100:.2f}%)")
    logger.info(f"Test set: {test_count:,} ratings ({test_count/total*100:.2f}%)")
    
    return train_data, val_data, test_data

# Function to evaluate with PySpark
def evaluate_spark_model(predictions, truth_col="rating", pred_col="prediction"):
    """Calculate RMSE, MAE and R2 for model predictions using Spark"""
    evaluator_rmse = RegressionEvaluator(
        metricName="rmse", 
        labelCol=truth_col, 
        predictionCol=pred_col
    )
    
    evaluator_mae = RegressionEvaluator(
        metricName="mae", 
        labelCol=truth_col, 
        predictionCol=pred_col
    )
    
    rmse = evaluator_rmse.evaluate(predictions)
    mae = evaluator_mae.evaluate(predictions)
    
    # Calculate R-squared using summary statistics - fixed version
    stats = predictions.agg(
        lit(predictions.count()).alias("count"),
        sum(col(truth_col)).alias("sum_y"),
        sum(col(truth_col) * col(truth_col)).alias("sum_y_squared"),
        sum((col(truth_col) - col(pred_col)) * (col(truth_col) - col(pred_col))).alias("sum_squared_error")
    ).collect()[0]
    
    count = stats["count"]
    sum_y = stats["sum_y"]
    sum_y_squared = stats["sum_y_squared"]
    sum_squared_error = stats["sum_squared_error"]
    
    y_mean = sum_y / count
    total_sum_squares = sum_y_squared - (sum_y * sum_y) / count
    r2 = 1.0 - (sum_squared_error / total_sum_squares) if total_sum_squares != 0 else 0.0
    
    logger.info(f"RMSE: {rmse:.4f}, MAE: {mae:.4f}, R2: {r2:.4f}")
    
    return {"RMSE": rmse, "MAE": mae, "R2": r2}

# Baseline Models
def train_baseline_models(train_data, val_data):
    """Train and evaluate baseline models: Global Mean and Bias Model"""
    logger.info("Training baseline models...")
    start_time = time.time()
    
    # Global mean model
    global_mean = train_data.select("rating").agg({"rating": "avg"}).collect()[0][0]
    logger.info(f"Global mean rating: {global_mean:.4f}")
    
    # Add global mean prediction column to validation data
    val_global_mean = val_data.withColumn("prediction", lit(global_mean))
    
    # Evaluate global mean model
    logger.info("Global Mean Model Performance:")
    global_mean_metrics = evaluate_spark_model(val_global_mean)
    
    # Bias model: Calculate user and item biases
    user_means = train_data.groupBy("userId").agg({"rating": "avg"}).withColumnRenamed("avg(rating)", "user_mean")
    item_means = train_data.groupBy("movieId").agg({"rating": "avg"}).withColumnRenamed("avg(rating)", "item_mean")
    
    # Calculate biases (differences from global mean)
    user_biases = user_means.withColumn("user_bias", col("user_mean") - global_mean)
    item_biases = item_means.withColumn("item_bias", col("item_mean") - global_mean)
    
    # Add predictions to validation data
    val_with_user = val_data.join(user_biases, "userId", "left")
    val_with_user_item = val_with_user.join(item_biases, "movieId", "left")
    
    # Fill missing biases with 0
    val_with_user_item = val_with_user_item.na.fill({
        "user_bias": 0.0,
        "item_bias": 0.0
    })
    
    # Calculate bias prediction
    val_with_predictions = val_with_user_item.withColumn(
        "prediction", 
        global_mean + col("user_bias") + col("item_bias")
    )
    
    # Evaluate bias model
    logger.info("Bias Model Performance:")
    bias_model_metrics = evaluate_spark_model(val_with_predictions)
    
    logger.info(f"Baseline models trained and evaluated in {time.time() - start_time:.2f} seconds")
    
    # Extract bias parameters for saving
    user_biases_dict = {row['userId']: row['user_bias'] for row in user_biases.collect()}
    item_biases_dict = {row['movieId']: row['item_bias'] for row in item_biases.collect()}
    
    # Create model parameters for later use
    global_mean_params = {
        'name': 'GlobalMean',
        'global_mean': global_mean
    }
    
    bias_model_params = {
        'name': 'BiasModel',
        'global_mean': global_mean,
        'user_biases': user_biases_dict,
        'item_biases': item_biases_dict
    }
    
    return global_mean_params, bias_model_params, global_mean_metrics, bias_model_metrics


def train_item_similarity_model_with_lsh(spark, train_data, val_data, n_neighbors=50, hash_tables=10, hash_length=4):
    """Train an item-based collaborative filtering model using Spark and LSH for efficient similarity search"""
    logger.info(f"Training item-based CF model with LSH (hash_tables={hash_tables}, hash_length={hash_length})...")
    start_time = time.time()
    
    # Normalize ratings by subtracting user means
    user_means = train_data.groupBy("userId").agg({"rating": "avg"}).withColumnRenamed("avg(rating)", "user_mean")
    train_normalized = train_data.join(user_means, "userId") \
                              .withColumn("normalized_rating", col("rating") - col("user_mean"))
    
    # Create a matrix where rows are movies and columns are users
    # First, pivot the data to create a sparse user-item matrix
    # Get all unique users we'll need for the pivot
    unique_users = train_normalized.select("userId").distinct()
    unique_users_list = [row.userId for row in unique_users.collect()]
    
    # For very large datasets, consider limiting to most active users
    if len(unique_users_list) > 10000:
        # Get user activity counts
        user_counts = train_normalized.groupBy("userId").count()
        # Take top 10,000 most active users
        active_users = user_counts.orderBy(col("count").desc()).limit(10000)
        active_user_ids = [row.userId for row in active_users.collect()]
        # Filter to only include these users
        train_normalized = train_normalized.filter(col("userId").isin(active_user_ids))
        unique_users_list = active_user_ids
        logger.info(f"Limited to top 10,000 most active users for LSH processing")
    
    # Create expression for pivot
    pivot_expr = {}
    for user_id in unique_users_list:
        pivot_expr[f"user_{user_id}"] = f"sum(CASE WHEN userId = {user_id} THEN normalized_rating ELSE 0 END)"
    
    # Create pivot query (note: this is a string-based approach for flexibility with many columns)
    select_exprs = [f"{expr} as {col_name}" for col_name, expr in pivot_expr.items()]
    pivot_query = f"""
    SELECT movieId, {", ".join(select_exprs)}
    FROM train_normalized
    GROUP BY movieId
    """
    
    # Register temp view for SQL query
    train_normalized.createOrReplaceTempView("train_normalized")
    
    # Execute pivot
    logger.info("Creating pivoted item-user matrix...")
    item_features_df = spark.sql(pivot_query)
    
    # Convert to feature vectors for LSH
    # Create vector assembler
    vector_cols = [f"user_{user_id}" for user_id in unique_users_list]
    assembler = VectorAssembler(inputCols=vector_cols, outputCol="features")
    item_vectors = assembler.transform(item_features_df)
    
    # Apply LSH
    logger.info("Applying LSH for similarity search...")
    brp = BucketedRandomProjectionLSH(
        inputCol="features", 
        outputCol="hashes", 
        numHashTables=hash_tables,
        bucketLength=hash_length
    )
    model = brp.fit(item_vectors)
    item_hashed = model.transform(item_vectors)
    
    # Find approximate nearest neighbors for each item
    logger.info("Finding approximate nearest neighbors...")
    
    # Store neighbors
    item_neighbors = {}
    
    # Process items in batches to avoid OOM errors
    item_ids = [row.movieId for row in item_vectors.select("movieId").collect()]
    batch_size = 100
    
    for i in range(0, len(item_ids), batch_size):
        batch_ids = item_ids[i:i+batch_size]
        logger.info(f"Processing batch {i//batch_size + 1}/{(len(item_ids)-1)//batch_size + 1}...")
        
        for movie_id in batch_ids:
            # Get the item vector
            item = item_vectors.filter(col("movieId") == movie_id)
            
            # Skip if item not found
            if item.count() == 0:
                continue
                
            # Find approximate nearest neighbors
            distances = model.approxSimilarityJoin(
                item, 
                item_hashed, 
                threshold=2.0,  # Adjust threshold as needed
                distCol="distance"
            )
            
            # Filter out self-matches and convert distance to similarity
            neighbors = distances.filter(
                (col("datasetA.movieId") == movie_id) & 
                (col("datasetB.movieId") != movie_id)
            ).select(
                col("datasetB.movieId").alias("neighbor_id"),
                (1.0 / (1.0 + col("distance"))).alias("similarity")  # Convert distance to similarity
            ).orderBy(desc("similarity")).limit(n_neighbors)
            
            # Collect and store neighbors
            item_neighbors[movie_id] = [
                (row.neighbor_id, row.similarity) 
                for row in neighbors.collect()
            ]
    
    # Create user means dictionary
    user_means_dict = {row.userId: row.user_mean for row in user_means.collect()}
    
    # Create model parameters
    item_cf_params = {
        'name': 'ItemCF_LSH',
        'n_neighbors': n_neighbors,
        'user_means': user_means_dict,
        'item_neighbors': item_neighbors
    }
    
    logger.info(f"Item-based CF model with LSH completed in {time.time() - start_time:.2f} seconds")
    logger.info(f"Generated neighborhoods for {len(item_neighbors)} items")
    
    return item_cf_params


def generate_item_cf_recommendations(item_cf_params, user_id, user_ratings, movies_df, n=10):
    """Generate recommendations using item-based CF model"""
    # Check if user exists in model
    if user_id not in item_cf_params['user_means']:
        logger.warning(f"User {user_id} not found in model, using global average")
        # Return popular recommendations instead
        return []
        
    # Get user mean
    user_mean = item_cf_params['user_means'].get(user_id, 0.0)
    
    # Get items rated by the user
    user_rated_items = set()
    user_item_ratings = {}
    
    # Extract user's ratings
    for row in user_ratings.filter(col("userId") == user_id).collect():
        movie_id = row.movieId
        rating = row.rating
        user_rated_items.add(movie_id)
        user_item_ratings[movie_id] = rating
    
    # Compute predicted scores for candidate items
    candidate_scores = {}
    
    # For each item the user rated
    for item_id, rating in user_item_ratings.items():
        # Normalize the rating
        normalized_rating = rating - user_mean
        
        # Get similar items
        if item_id in item_cf_params['item_neighbors']:
            similar_items = item_cf_params['item_neighbors'][item_id]
            
            for similar_id, similarity in similar_items:
                # Skip items the user has already rated
                if similar_id in user_rated_items:
                    continue
                    
                # Update the candidate score
                if similar_id not in candidate_scores:
                    candidate_scores[similar_id] = {'sim_sum': 0.0, 'weighted_sum': 0.0}
                    
                candidate_scores[similar_id]['sim_sum'] += similarity
                candidate_scores[similar_id]['weighted_sum'] += similarity * normalized_rating
    
    # Compute final predictions
    predictions = []
    for movie_id, scores in candidate_scores.items():
        if scores['sim_sum'] > 0:
            predicted_rating = user_mean + (scores['weighted_sum'] / scores['sim_sum'])
            
            # Clip predictions to valid range
            predicted_rating = max(0.5, min(5.0, predicted_rating))
            
            predictions.append((movie_id, predicted_rating))
    
    # Sort by predicted rating and get top N
    top_predictions = sorted(predictions, key=lambda x: x[1], reverse=True)[:n]
    
    # Get movie details
    recommended_movies = []
    for movie_id, score in top_predictions:
        movie_info = movies_df.filter(col("movieId") == movie_id).collect()
        if movie_info:
            movie = movie_info[0]
            recommended_movies.append({
                "movieId": movie_id,
                "title": movie["title"],
                "genres": movie["genres"],
                "predicted_rating": score
            })
    
    return recommended_movies

def evaluate_lsh_model(item_cf_params, val_data, movies_df):
    """Evaluate the LSH-based item similarity model"""
    logger.info("Evaluating LSH-based item similarity model...")
    
    # Sample users for evaluation (for speed)
    user_sample = val_data.select("userId").distinct().sample(fraction=0.1).collect()
    sample_user_ids = [row.userId for row in user_sample]
    
    # Filter validation data to these users
    val_sample = val_data.filter(col("userId").isin(sample_user_ids))
    
    # Get all ratings for these users (including training data)
    user_ratings = spark.sql("SELECT * FROM train_normalized")
    
    # Track metrics
    prediction_count = 0
    rmse_sum = 0.0
    mae_sum = 0.0
    
    # Process each user
    for user_id in sample_user_ids:
        # Get user's validation ratings
        user_val_ratings = val_sample.filter(col("userId") == user_id).collect()
        
        if not user_val_ratings:
            continue
            
        # Generate recommendations for all possible movies
        all_recs = generate_item_cf_recommendations(
            item_cf_params, user_id, user_ratings, movies_df, n=1000
        )
        
        # Create a dictionary of predicted ratings
        predictions = {rec["movieId"]: rec["predicted_rating"] for rec in all_recs}
        
        # Compare with actual ratings
        for row in user_val_ratings:
            movie_id = row.movieId
            actual_rating = row.rating
            
            if movie_id in predictions:
                pred_rating = predictions[movie_id]
                
                # Update metrics
                rmse_sum += (pred_rating - actual_rating) ** 2
                mae_sum += abs(pred_rating - actual_rating)
                prediction_count += 1
    
    # Calculate final metrics
    if prediction_count > 0:
        rmse = math.sqrt(rmse_sum / prediction_count)
        mae = mae_sum / prediction_count
        
        logger.info(f"LSH Model Evaluation - RMSE: {rmse:.4f}, MAE: {mae:.4f}, Count: {prediction_count}")
        
        return {"RMSE": rmse, "MAE": mae}
    else:
        logger.warning("No predictions could be generated for evaluation")
        return {"RMSE": float('inf'), "MAE": float('inf')}



def save_lsh_model(item_cf_params, output_dir="streamlit_models"):
    """Save LSH model parameters efficiently"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    logger.info(f"Saving LSH model parameters to {output_dir}...")
    
    # Save in chunks to avoid memory issues
    # 1. Save user means
    with open(os.path.join(output_dir, "item_cf_user_means.pkl"), 'wb') as f:
        pickle.dump(item_cf_params['user_means'], f)
    
    # 2. Save item neighbors in batches
    item_ids = list(item_cf_params['item_neighbors'].keys())
    batch_size = 1000
    
    for i in range(0, len(item_ids), batch_size):
        batch_ids = item_ids[i:i+batch_size]
        batch_neighbors = {item_id: item_cf_params['item_neighbors'][item_id] 
                          for item_id in batch_ids}
        
        with open(os.path.join(output_dir, f"item_cf_neighbors_batch_{i//batch_size}.pkl"), 'wb') as f:
            pickle.dump(batch_neighbors, f)
    
    # 3. Save metadata
    metadata = {
        'name': item_cf_params['name'],
        'n_neighbors': item_cf_params['n_neighbors'],
        'num_items': len(item_ids),
        'num_users': len(item_cf_params['user_means']),
        'num_batches': (len(item_ids) - 1) // batch_size + 1
    }
    
    with open(os.path.join(output_dir, "item_cf_metadata.json"), 'w') as f:
        json.dump(metadata, f, indent=4)
    
    logger.info(f"LSH model saved in {(len(item_ids) - 1) // batch_size + 1} batches")

# ALS model training
def train_als_model(train_data, val_data, rank=100, regParam=0.1, maxIter=15, seed=42, implicit=False):
    """Train an ALS model with specified parameters"""
    logger.info(f"Training ALS model (rank={rank}, regParam={regParam}, maxIter={maxIter}, implicit={implicit})...")
    start_time = time.time()
    
    # Create an ALS model
    als = ALS(
        rank=rank,
        maxIter=maxIter,
        regParam=regParam,
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        coldStartStrategy="drop",
        seed=seed,
        nonnegative=True,  # Enforce non-negative factors
        implicitPrefs=implicit  # Set to True for implicit feedback
    )
    
    # Train the model
    model = als.fit(train_data)
    
    # Make predictions on validation data
    predictions = model.transform(val_data)
    
    # Evaluate model
    logger.info("ALS Model Performance:")
    als_metrics = evaluate_spark_model(predictions)
    
    logger.info(f"ALS model trained in {time.time() - start_time:.2f} seconds")
    
    return model, als_metrics

# Function to tune ALS parameters
def tune_als_model(train_data, val_data, ranks=[10, 50, 100], regParams=[0.01, 0.1, 1.0], maxIters=[10, 15]):
    """Find the best ALS parameters through grid search"""
    logger.info("Tuning ALS model parameters...")
    start_time = time.time()
    
    best_rmse = float('inf')
    best_params = None
    best_model = None
    
    results = []
    
    # Grid search
    for rank in ranks:
        for regParam in regParams:
            for maxIter in maxIters:
                logger.info(f"Trying rank={rank}, regParam={regParam}, maxIter={maxIter}")
                
                # Train model with current parameters
                model, metrics = train_als_model(
                    train_data, val_data, 
                    rank=rank, regParam=regParam, maxIter=maxIter
                )
                
                rmse = metrics['RMSE']
                
                results.append({
                    'rank': rank,
                    'regParam': regParam,
                    'maxIter': maxIter,
                    'RMSE': rmse,
                    'MAE': metrics['MAE'],
                    'R2': metrics['R2']
                })
                
                # Check if this model is better
                if rmse < best_rmse:
                    best_rmse = rmse
                    best_params = (rank, regParam, maxIter)
                    best_model = model
    
    # Create results dataframe
    results_df = pd.DataFrame(results)
    
    # Print best parameters
    best_rank, best_regParam, best_maxIter = best_params
    logger.info(f"Best parameters: rank={best_rank}, regParam={best_regParam}, maxIter={best_maxIter}")
    logger.info(f"Best RMSE: {best_rmse:.4f}")
    
    logger.info(f"ALS parameter tuning completed in {time.time() - start_time:.2f} seconds")
    
    # Save tuning results
    results_df.to_csv("als_tuning_results.csv", index=False)
    
    # Return best model and results
    return best_model, results_df

# Extract ALS model parameters for Streamlit
def extract_als_parameters(als_model, original_params=None):
    """Extract user and item factors from ALS model for use in Streamlit"""
    logger.info("Extracting ALS model parameters...")
    
    # Extract user factors
    user_factors = als_model.userFactors.collect()
    user_factor_dict = {row.id: row.features for row in user_factors}
    
    # Extract item factors
    item_factors = als_model.itemFactors.collect()
    item_factor_dict = {row.id: row.features for row in item_factors}
    
    # Create model parameters
    als_params = {
        'name': 'ALS',
        'rank': als_model.rank,
        'user_factors': user_factor_dict,
        'item_factors': item_factor_dict
    }
    
    # Add original parameters if provided
    if original_params:
        als_params.update({
            'maxIter': original_params.get('maxIter', 10),
            'regParam': original_params.get('regParam', 0.1),
            'nonnegative': original_params.get('nonnegative', True),
            'implicitPrefs': original_params.get('implicitPrefs', False)
        })
    
    logger.info(f"Extracted factors for {len(user_factor_dict)} users and {len(item_factor_dict)} items")
    
    return als_params

# Content-based filtering using genre information
def train_content_based_model(spark, movies_df, train_data, all_genres):
    """Train a content-based filtering model using movie genres"""
    logger.info("Training content-based filtering model...")
    start_time = time.time()
    
    # Calculate user genre preferences based on their ratings
    # For each user, calculate average rating for each genre
    
    # First, get the genre columns
    genre_columns = [f"genre_{genre}" for genre in all_genres]
    
    # Join ratings with movies to get genre information
    user_ratings_with_genres = train_data.join(movies_df.select("movieId", *genre_columns), "movieId")
    
    # Create user profiles by calculating average rating per genre
    user_profiles = {}
    
    # Process user ratings in batches to create profiles
    for user_row in user_ratings_with_genres.groupBy("userId").count().collect():
        user_id = user_row["userId"]
        
        # Get this user's ratings
        user_ratings = user_ratings_with_genres.filter(col("userId") == user_id)
        
        # Calculate average rating per genre
        genre_ratings = []
        genre_counts = []
        
        # Sum ratings and counts for each genre
        for genre in all_genres:
            genre_col = f"genre_{genre}"
            genre_data = user_ratings.filter(col(genre_col) == 1)
            
            if genre_data.count() > 0:
                avg_rating = genre_data.agg({"rating": "avg"}).collect()[0][0]
                genre_ratings.append(avg_rating)
                genre_counts.append(genre_data.count())
            else:
                genre_ratings.append(0.0)
                genre_counts.append(0)
        
        # Create normalized profile
        total = sum([r * c for r, c in zip(genre_ratings, genre_counts) if c > 0])
        if total > 0:
            profile = [(r * c) / total if c > 0 else 0.0 for r, c in zip(genre_ratings, genre_counts)]
            # Normalize to unit length
            norm = math.sqrt(sum([p*p for p in profile]))
            if norm > 0:
                profile = [p/norm for p in profile]
            user_profiles[user_id] = profile
    
    # Create genre vectors for movies
    movie_genre_vectors = {}
    
    # Process each movie
    for movie_row in movies_df.select("movieId", *genre_columns).collect():
        movie_id = movie_row["movieId"]
        genre_vector = [float(movie_row[genre_col]) for genre_col in genre_columns]
        
        # Normalize vector
        norm = math.sqrt(sum([g*g for g in genre_vector]))
        if norm > 0:
            genre_vector = [g/norm for g in genre_vector]
        
        movie_genre_vectors[movie_id] = genre_vector
    
    # Create content model parameters
    content_params = {
        'name': 'ContentBased',
        'user_profiles': user_profiles,
        'movie_genre_vectors': movie_genre_vectors,
        'genres': all_genres
    }
    
    logger.info(f"Content-based model trained in {time.time() - start_time:.2f} seconds")
    logger.info(f"Created profiles for {len(user_profiles)} users")
    
    return content_params

# Hybrid model - combine ALS and bias model
def create_hybrid_model(bias_model_params, als_model, val_data, weight_als=0.7):
    """Create a hybrid model combining baseline bias and ALS predictions"""
    logger.info(f"Creating hybrid model with ALS weight={weight_als}")
    
    # Get bias model parameters
    global_mean = bias_model_params['global_mean']
    user_biases = bias_model_params['user_biases']
    item_biases = bias_model_params['item_biases']
    
    # Create bias prediction function
    def bias_prediction(user_id, movie_id):
        user_bias = user_biases.get(user_id, 0.0)
        item_bias = item_biases.get(movie_id, 0.0)
        return global_mean + user_bias + item_bias
    
    # Register UDF
    bias_prediction_udf = udf(bias_prediction, FloatType())
    
    # Add bias model predictions to validation data
    val_with_bias = val_data.withColumn(
        "bias_prediction",
        bias_prediction_udf(col("userId"), col("movieId"))
    )
    
    # Add ALS predictions
    val_with_hybrid = als_model.transform(val_with_bias)
    
    # Create hybrid prediction
    val_with_hybrid = val_with_hybrid.withColumn(
        "hybrid_prediction",
        (1 - weight_als) * col("bias_prediction") + weight_als * col("prediction")
    )
    
    # Evaluate hybrid model
    logger.info("Hybrid Model Performance:")
    hybrid_metrics = evaluate_spark_model(
        val_with_hybrid, 
        truth_col="rating", 
        pred_col="hybrid_prediction"
    )
    
    # Create hybrid model parameters
    hybrid_params = {
        'name': 'HybridModel',
        'bias_model': bias_model_params,
        'als_model_name': 'ALS',  # Reference to ALS model
        'weight_als': weight_als
    }
    
    return hybrid_params, hybrid_metrics

# Create popularity-based recommendations
def create_popularity_recommendations(train_data, movies_df, all_genres, n=100):
    """Create popularity-based recommendation lists"""
    logger.info("Creating popularity-based recommendation lists...")
    
    # Calculate popularity metrics
    pop_metrics = train_data.groupBy("movieId").agg(
        {"rating": "count", "rating": "avg"}
    ).withColumnRenamed("count(rating)", "rating_count") \
     .withColumnRenamed("avg(rating)", "avg_rating")
    
    # Join with movie data
    pop_with_info = pop_metrics.join(movies_df.select("movieId", "title", "genres"), "movieId")
    
    # Calculate minimum interactions for consideration (median)
    min_interactions = pop_metrics.approxQuantile("rating_count", [0.5], 0.1)[0]
    logger.info(f"Minimum interactions threshold: {min_interactions}")
    
    # Create different recommendation lists
    
    # 1. Most Popular (by number of ratings)
    most_popular = pop_with_info.filter(col("rating_count") >= min_interactions) \
                                .orderBy(col("rating_count").desc())
    
    # 2. Highest Rated (with minimum interactions)
    highest_rated = pop_with_info.filter(col("rating_count") >= min_interactions) \
                                 .orderBy(col("avg_rating").desc())
    
    # Convert to small pandas dataframes for storage
    most_popular_df = most_popular.select("movieId", "title", "genres", "rating_count", "avg_rating") \
                                 .limit(n).toPandas()
    
    highest_rated_df = highest_rated.select("movieId", "title", "genres", "rating_count", "avg_rating") \
                                   .limit(n).toPandas()
    
    # Create genre-based popularity lists
    genre_lists = {}
    
    # Get popular movies by genre
    for genre in all_genres:
        genre_popular = pop_with_info.join(
            movies_df.select("movieId", f"genre_{genre}"), 
            "movieId"
        ).filter(col(f"genre_{genre}") == 1) \
         .filter(col("rating_count") >= min_interactions / 2) \
         .orderBy(col("rating_count").desc()) \
         .select("movieId", "title", "genres", "rating_count", "avg_rating") \
         .limit(20)
        
        genre_lists[genre] = genre_popular.toPandas()
    
    # Create popularity model parameters
    popularity_params = {
        'name': 'Popularity',
        'most_popular': most_popular_df,
        'highest_rated': highest_rated_df,
        'genre_lists': genre_lists
    }
    
    return popularity_params



# Function to generate recommendations using ALS model parameters
def generate_als_recommendations(als_params, user_id, movies_df, n=10):
    """Generate movie recommendations using ALS model parameters"""
    # Get user factors
    if user_id not in als_params['user_factors']:
        logger.warning(f"No factors found for user {user_id}")
        return []
    
    user_factor = als_params['user_factors'][user_id]
    
    # Calculate ratings for all items
    item_scores = []
    for movie_id, item_factor in als_params['item_factors'].items():
        # Dot product between user and item factors
        score = sum(u * i for u, i in zip(user_factor, item_factor))
        item_scores.append((movie_id, score))
    
    # Sort by score and get top N
    top_items = sorted(item_scores, key=lambda x: x[1], reverse=True)[:n]
    
    # Get movie details
    recommendations = []
    for movie_id, score in top_items:
        movie_info = movies_df.filter(col("movieId") == movie_id).collect()
        if movie_info:
            movie = movie_info[0]
            recommendations.append({
                "movieId": movie_id,
                "title": movie["title"],
                "genres": movie["genres"],
                "predicted_rating": score
            })
    
    return recommendations

# Generate content-based recommendations
def generate_content_recommendations(content_params, user_id, movies_df, n=10):
    """Generate recommendations based on content similarity"""
    # Get user profile
    if user_id not in content_params['user_profiles']:
        logger.warning(f"No profile found for user {user_id}")
        return []
    
    user_profile = content_params['user_profiles'][user_id]
    
    # Calculate similarity with all movies
    movie_scores = []
    for movie_id, genre_vector in content_params['movie_genre_vectors'].items():
        # Compute cosine similarity
        similarity = sum(u * i for u, i in zip(user_profile, genre_vector))
        movie_scores.append((movie_id, similarity))
    
    # Sort by similarity and get top N
    top_items = sorted(movie_scores, key=lambda x: x[1], reverse=True)[:n]
    
    # Get movie details
    recommendations = []
    for movie_id, similarity in top_items:
        movie_info = movies_df.filter(col("movieId") == movie_id).collect()
        if movie_info:
            movie = movie_info[0]
            recommendations.append({
                "movieId": movie_id,
                "title": movie["title"],
                "genres": movie["genres"],
                "similarity": similarity
            })
    
    return recommendations

# Generate hybrid recommendations
def generate_hybrid_recommendations(hybrid_params, als_params, user_id, movies_df, n=10):
    """Generate recommendations using hybrid model parameters"""
    # Get predictions from ALS model
    als_recs = generate_als_recommendations(als_params, user_id, movies_df, n=n*2)
    
    # Get bias model parameters
    bias_model = hybrid_params['bias_model']
    global_mean = bias_model['global_mean']
    user_biases = bias_model['user_biases']
    item_biases = bias_model['item_biases']
    
    # Apply hybrid weighting
    weight_als = hybrid_params['weight_als']
    weight_bias = 1.0 - weight_als
    
    hybrid_scores = []
    for rec in als_recs:
        movie_id = rec["movieId"]
        als_score = rec["predicted_rating"]
        
        # Calculate bias score
        user_bias = user_biases.get(user_id, 0.0)
        item_bias = item_biases.get(movie_id, 0.0)
        bias_score = global_mean + user_bias + item_bias
        
        # Calculate hybrid score
        hybrid_score = weight_als * als_score + weight_bias * bias_score
        
        hybrid_scores.append((movie_id, hybrid_score, rec["title"], rec["genres"]))
    
    # Sort by hybrid score and get top N
    top_items = sorted(hybrid_scores, key=lambda x: x[1], reverse=True)[:n]
    
    # Format recommendations
    recommendations = []
    for movie_id, score, title, genres in top_items:
        recommendations.append({
            "movieId": movie_id,
            "title": title,
            "genres": genres,
            "predicted_rating": score
        })
    
    return recommendations

# Save models for Streamlit app
def save_models_for_streamlit(model_params, output_dir="streamlit_models"):
    """Save model parameters in pickle format for Streamlit"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    logger.info(f"Saving model parameters to {output_dir}...")
    
    # Save all model parameters
    for model_name, params in model_params.items():
        model_file = os.path.join(output_dir, f"{model_name}_params.pkl")
        
        # For large model parameters, consider saving in chunks or processing
        # before saving to reduce memory footprint
        with open(model_file, 'wb') as f:
            pickle.dump(params, f)
        
        logger.info(f"Saved {model_name} parameters to {model_file}")
    
    # Create model info file
    model_info = {
        'available_models': list(model_params.keys()),
        'saved_date': time.strftime("%Y-%m-%d %H:%M:%S")
    }
    
    with open(os.path.join(output_dir, "model_info.json"), 'w') as f:
        json.dump(model_info, f, indent=4)
    
    logger.info(f"All model parameters saved successfully to {output_dir}")



# Evaluation visualization
def create_evaluation_visualizations(metrics_dict, output_path="model_evaluation.png"):
    """Create visualizations of model performance"""
    # Convert metrics dictionary to DataFrame
    metrics_df = pd.DataFrame.from_dict(metrics_dict, orient='index')
    
    # Create visualization
    plt.figure(figsize=(12, 8))
    
    # Plot RMSE
    plt.subplot(2, 1, 1)
    metrics_df['RMSE'].plot(kind='bar', color='skyblue')
    plt.title('Model Comparison - RMSE (lower is better)')
    plt.ylabel('RMSE')
    plt.xticks(rotation=45)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    # Plot MAE
    plt.subplot(2, 1, 2)
    metrics_df['MAE'].plot(kind='bar', color='lightgreen')
    plt.title('Model Comparison - MAE (lower is better)')
    plt.ylabel('MAE')
    plt.xticks(rotation=45)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    plt.savefig(output_path)
    logger.info(f"Model comparison visualization saved to {output_path}")
    
    return metrics_df


def main():
    # Initialize Spark
    spark = init_spark(memory="8g")  # Adjust memory based on your system
    
    try:
        # Load and prepare data
        ratings_df, movies_df, n_users, n_movies, active_users, active_movies = load_data(
            spark, "ml-32m", min_ratings=5
        )
        
        # Process movie features
        movies_processed_df, all_genres = process_movie_features(spark, movies_df)
        
        # Split data
        train_data, val_data, test_data = train_val_test_split(ratings_df)
        
        # Train baseline models
        global_mean_params, bias_model_params, global_mean_metrics, bias_model_metrics = train_baseline_models(
            train_data, val_data
        )
        
        # Train ALS model with updated parameter extraction
        rank, regParam, maxIter = 50, 0.1, 10
        original_params = {
            'rank': rank,
            'regParam': regParam,
            'maxIter': maxIter,
            'nonnegative': True,
            'implicitPrefs': False
        }
        
        als_model, als_metrics = train_als_model(
            train_data, val_data, 
            rank=rank, regParam=regParam, maxIter=maxIter
        )
        
        # Extract ALS parameters for Streamlit with original params
        als_params = extract_als_parameters(als_model, original_params)
        
        # # Train item-based CF model with LSH
        # item_cf_params = train_item_similarity_model_with_lsh(
        #     spark, train_data, val_data, n_neighbors=30, hash_tables=10, hash_length=4.0
        # )
        
        # # Evaluate LSH model
        # lsh_metrics = evaluate_lsh_model(item_cf_params, val_data, movies_processed_df)
        # Create hybrid model
        hybrid_params, hybrid_metrics = create_hybrid_model(
            bias_model_params, als_model, val_data, weight_als=0.7
        )
        
        # Train content-based model
        content_params = train_content_based_model(
            spark, movies_processed_df, train_data, all_genres
        )
        
        # Create popularity recommendations
        popularity_params = create_popularity_recommendations(
            train_data, movies_processed_df, all_genres, n=100
        )

        # Collect all model parameters
        all_model_params = {
            'global_mean': global_mean_params,
            'bias': bias_model_params,
            'als': als_params,
            # 'item_cf': item_cf_params,
            'hybrid': hybrid_params,
            'content': content_params,
            'popularity': popularity_params
        }
        # Collect all metrics
        all_metrics = {
            'Global Mean': global_mean_metrics,
            'Bias Model': bias_model_metrics,
            'ALS': als_metrics,
            # 'Item-CF-LSH': lsh_metrics,
            'Hybrid': hybrid_metrics
        }
        
        # Create evaluation visualizations
        create_evaluation_visualizations(all_metrics)
        
        # Save model parameters for Streamlit
        save_models_for_streamlit(all_model_params)
        
        # Save LSH model separately due to size
        # save_lsh_model(item_cf_params)
        # Generate example recommendations
        sample_user_id = active_users.first()['userId']
        
        logger.info(f"\nExample recommendations for user {sample_user_id}:")
        
        # ALS recommendations
        als_recs = generate_als_recommendations(als_params, sample_user_id, movies_processed_df, n=5)
        logger.info("ALS Recommendations:")
        for i, rec in enumerate(als_recs):
            logger.info(f"{i+1}. {rec['title']} - Predicted rating: {rec['predicted_rating']:.2f}")
        
        # Content-based recommendations
        content_recs = generate_content_recommendations(content_params, sample_user_id, movies_processed_df, n=5)
        logger.info("\nContent-Based Recommendations:")
        for i, rec in enumerate(content_recs):
            logger.info(f"{i+1}. {rec['title']} - Similarity: {rec['similarity']:.4f}")
        
        # Hybrid recommendations
        hybrid_recs = generate_hybrid_recommendations(hybrid_params, als_params, sample_user_id, movies_processed_df, n=5)
        logger.info("\nHybrid Recommendations:")
        for i, rec in enumerate(hybrid_recs):
            logger.info(f"{i+1}. {rec['title']} - Predicted rating: {rec['predicted_rating']:.2f}")
        
        logger.info("\nRecommendation model pipeline completed successfully!")
        
    except Exception as e:
        logger.error(f"Error in recommendation pipeline: {str(e)}", exc_info=True)
    finally:
        # Stop Spark session
        spark.stop()
        logger.info("Spark session stopped")

        
if __name__ == "__main__":
    # Run main function
    main()

25/04/14 23:48:03 WARN Utils: Your hostname, lenovo-server resolves to a loopback address: 127.0.1.1; using 192.168.100.30 instead (on interface eno1)
25/04/14 23:48:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/14 23:48:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-04-14 23:48:04,746 - MovieLensRecommender - INFO - Spark session initialized with 8g memory
2025-04-14 23:48:04,747 - MovieLensRecommender - INFO - Loading data with PySpark...
2025-04-14 23:48:49,776 - MovieLensRecommender - INFO - Data loaded in 45.03 seconds
2025-04-14 23:48:49,777 - MovieLensRecommender - INFO - Filtered dataset: 200,948 users, 43,884 movies, 31,921,467 ratings
2025-04-14 23:48:49,777 - MovieLensRecommender - INFO - Processing movie features...
2025-04-14 23