In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
import matplotlib.pyplot as plt
import math
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, udf, collect_list, struct, avg, count, sum
from pyspark.sql.types import FloatType, ArrayType, StructType, StructField
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, split, expr, broadcast
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.functions import count, expr, collect_list, col, sqrt, when, lit, rank, split, explode, sum as sql_sum
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import numpy as np

def create_spark_session(app_name="MovieLensALS"):
    """Create and return a Spark session with optimized memory configuration"""
    return SparkSession.builder \
        .appName(app_name) \
        .config("spark.local.dir", "/scratch/atewary/") \
        .config("spark.driver.memory", "12g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.memory.offHeap.enabled", "true") \
        .config("spark.memory.offHeap.size", "8g") \
        .config("spark.sql.shuffle.partitions", "100") \
        .config("spark.default.parallelism", "100") \
        .config("spark.sql.autoBroadcastJoinThreshold", "-1") \
        .config("spark.executor.cores", "4") \
        .config("spark.driver.maxResultSize", "4g") \
        .config("spark.kryoserializer.buffer.max", "1024m") \
        .config("spark.rdd.compress", "true") \
        .config("spark.shuffle.compress", "true") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.memory.storageFraction", "0.6") \
        .config("spark.executor.extraJavaOptions", "-Xss4m") \
        .config("spark.driver.extraJavaOptions", "-Xss4m") \
        .config("LogLevel", "ERROR") \
        .getOrCreate()

def load_ratings_data(spark, filepath):
    """Load and preprocess ratings data with caching"""
    ratings_df = spark.read.csv(
        filepath,
        header=True,
        inferSchema=True
    ).repartition(100)  # Increased partitions
    
    # Convert columns to appropriate types and cache the result
    processed_df = ratings_df.select(
        col("userId").cast("integer"),
        col("movieId").cast("integer"),
        col("rating").cast("float")
    ).cache()  # Cache the DataFrame
    
    # Force cache computation
    processed_df.count()
    
    return processed_df

def calculate_map(predictions_df, k=10):
    """Calculate Mean Average Precision at K"""
    
    # Create window spec for ranking predictions per user
    window = Window.partitionBy("userId").orderBy(F.col("prediction").desc())
    
    # Add rank and filter top K predictions per user
    predictions_with_rank = predictions_df.withColumn(
        "rank", F.row_number().over(window)
    ).filter(F.col("rank") <= k)
    
    # Calculate precision at each position for each user
    predictions_with_precision = predictions_with_rank.withColumn(
        "precision", 
        F.when(F.col("rating") >= 4.0, 1.0/F.col("rank")).otherwise(0.0)
    )
    
    # Calculate cumulative precision (AP) for each user
    user_ap = predictions_with_precision.groupBy("userId").agg(
        F.sum("precision").alias("ap"),
        F.count("precision").alias("num_predictions")
    )
    
    # Calculate MAP
    map_value = user_ap.select(
        (F.sum("ap") / F.sum("num_predictions")).alias("map")
    ).first()["map"]
    
    return map_value

def evaluate_metrics(predictions_df):
    """Calculate all evaluation metrics"""
    
    # RMSE
    rmse_evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    rmse = rmse_evaluator.evaluate(predictions_df)
    
    # MSE
    mse_evaluator = RegressionEvaluator(
        metricName="mse",
        labelCol="rating",
        predictionCol="prediction"
    )
    mse = mse_evaluator.evaluate(predictions_df)
    
    # MAP
    map_score = calculate_map(predictions_df)
    
    return {
        "RMSE": rmse,
        "MSE": mse,
        "MAP@10": map_score
    }

def train_als_model(training_data, validation_data):
    """Train ALS model with cross-validation for hyperparameter tuning"""
    
    # Initialize ALS model
    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        nonnegative=True,
        implicitPrefs=False,
        coldStartStrategy="drop",
        intermediateStorageLevel="MEMORY_AND_DISK",
        finalStorageLevel="MEMORY_AND_DISK"
    )
    
    # Create parameter grid for cross-validation
    param_grid = ParamGridBuilder() \
        .addGrid(als.rank, [5,10]) \
        .addGrid(als.maxIter, [6,12]) \
        .addGrid(als.regParam, [0.15]) \
        .build()
    
    # Define evaluator
    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    
    # Create CrossValidator
    cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3,
        parallelism=2
    )
    
    # Fit the model using cross-validation
    print("Starting model training...")
    cv_model = cv.fit(training_data)
    
    # Get the best model
    best_model = cv_model.bestModel
    
    # Calculate validation metrics
    predictions = best_model.transform(validation_data)
    metrics = evaluate_metrics(predictions)
    
    # Print best parameters and validation RMSE
    print("\nBest Model Parameters:")
    print(f"Rank: {best_model._java_obj.parent().getRank()}")
    print(f"MaxIter: {best_model._java_obj.parent().getMaxIter()}")
    print(f"RegParam: {best_model._java_obj.parent().getRegParam()}")
    print("\nValidation Metrics:")
    print(f"RMSE: {metrics['RMSE']:.4f}")
    print(f"MSE: {metrics['MSE']:.4f}")
    print(f"MAP@10: {metrics['MAP@10']:.4f}")
    
    return best_model, metrics


class ItemItemCF:
    """Item-based Collaborative Filtering component with optimized implementation"""
    def __init__(self, k_neighbors=5):  # Reduced from 10 to 5 neighbors
        self.k_neighbors = k_neighbors
        self.item_similarities = None
        
    def fit(self, ratings_df):
        """
        Compute item-item similarities using Pearson correlation
        Optimized implementation with reduced shuffling and better caching
        """
        # Calculate mean rating for each movie and broadcast
        mean_ratings = ratings_df.groupBy("movieId").agg(
            F.avg("rating").alias("mean_rating")
        ).cache()
        
        # Normalize ratings in one efficient step with broadcast join
        normalized_ratings = ratings_df.join(
            broadcast(mean_ratings), 
            "movieId"
        ).withColumn(
            "norm_rating", 
            F.col("rating") - F.col("mean_rating")
        ).select(
            "userId", 
            "movieId", 
            "norm_rating"
        )
        
        # Calculate similarities with reduced shuffling
        self.item_similarities = normalized_ratings.alias("r1").join(
            normalized_ratings.alias("r2"),
            (F.col("r1.userId") == F.col("r2.userId")),  # Join on userId
            "inner"
        ).where(
            F.col("r1.movieId") < F.col("r2.movieId")  # Ensure unique pairs
        ).groupBy(
            F.col("r1.movieId"),
            F.col("r2.movieId")
        ).agg(
            F.count("*").alias("common_users"),
            F.sum(F.col("r1.norm_rating") * F.col("r2.norm_rating")).alias("dot_product"),
            F.sum(F.pow(F.col("r1.norm_rating"), 2)).alias("norm1_squared"),
            F.sum(F.pow(F.col("r2.norm_rating"), 2)).alias("norm2_squared")
        ).filter(
            F.col("common_users") >= 5
        ).select(
            F.col("r1.movieId").alias("movieId"),
            F.col("r2.movieId").alias("movieId2"),
            (F.col("dot_product") / (F.sqrt(F.col("norm1_squared")) * 
                                    F.sqrt(F.col("norm2_squared")))).alias("similarity")
        ).cache()
        
        return self
    
    def transform(self, test_data):
        """
        Transform method that generates predictions efficiently
        Replaces the individual predict method with batch processing
        """
        # Create window spec for top-K similar items
        window = Window.partitionBy("movieId").orderBy(F.col("similarity").desc())
        
        # Get top-K similar items for each movie
        top_similarities = self.item_similarities.withColumn(
            "rank", 
            F.row_number().over(window)
        ).filter(
            F.col("rank") <= self.k_neighbors
        )
        
        # Join with test data and calculate predictions - using aliases
        predictions = test_data.alias("test").join(
            broadcast(top_similarities).alias("sim"),
            F.col("test.movieId") == F.col("sim.movieId"),
            "left_outer"
        ).join(
            test_data.select(
                F.col("userId"),
                F.col("movieId").alias("movieId2"),
                F.col("rating")
            ).alias("ratings2"),
            ["userId", "movieId2"],
            "left_outer"
        ).groupBy(
            F.col("test.userId"),
            F.col("test.movieId")
        ).agg(
            (F.sum(F.col("similarity") * F.col("ratings2.rating")) / 
            F.sum(F.abs(F.col("similarity")))).alias("prediction")
        ).na.fill(0)
        
        return predictions.select(
            F.col("userId"),
            F.col("movieId"),
            F.col("prediction")
        )
class HybridRecommender:
    def __init__(self, als_model, weight_als=0.7):
        self.als_model = als_model
        self.item_cf = ItemItemCF(k_neighbors=5)  # Reduced neighbors
        self.weight_als = weight_als
        self.weight_cf = 1.0 - weight_als
    
    def fit(self, training_data):
        self.item_cf.fit(training_data)
        return self
    
    def transform(self, test_data):
        # Get predictions from both models
        als_predictions = self.als_model.transform(test_data)
        cf_predictions = self.item_cf.transform(test_data)
        
        # Combine predictions efficiently
        combined = als_predictions.join(
            cf_predictions.select(
                "userId", "movieId",
                F.col("prediction").alias("cf_prediction")
            ),
            ["userId", "movieId"],
            "left_outer"
        ).fillna(0, subset=['cf_prediction'])
        
        # Calculate weighted prediction
        return combined.withColumn(
            "prediction",
            (self.weight_als * F.col("prediction") + 
             self.weight_cf * F.col("cf_prediction"))
        ).select("userId", "movieId", "prediction", "rating")

def train_hybrid_model(als_model, training_data, validation_data):
    """Simplified hybrid model training with fewer weight combinations"""
    print("\nTraining Hybrid Model...")
    
    # Try fewer weight combinations
    weight_combinations = [(0.3, 0.7), (0.5, 0.5), (0.7, 0.3)]
    best_rmse = float('inf')
    best_model = None
    
    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    
    for als_weight in [w[0] for w in weight_combinations]:
        print(f"\nTrying ALS weight: {als_weight}")
        
        hybrid_model = HybridRecommender(
            als_model=als_model,
            weight_als=als_weight
        ).fit(training_data)
        
        predictions = hybrid_model.transform(validation_data)
        rmse = evaluator.evaluate(predictions)
        
        if rmse < best_rmse:
            best_rmse = rmse
            best_model = hybrid_model
    
    return best_model

class SupervisedPredictor:
    def __init__(self, max_depth=5, num_trees=10):
        self.model = None
        self.max_depth = max_depth
        self.num_trees = num_trees
        
    def prepare_features(self, ratings_df, movies_df):
        # 1. Extract user statistics with null handling
        user_stats = ratings_df.groupBy("userId").agg(
            F.coalesce(F.avg("rating"), F.lit(0.0)).alias("user_avg_rating"),
            F.coalesce(F.stddev("rating"), F.lit(0.0)).alias("user_rating_std"),
            F.count("rating").cast("double").alias("user_rating_count_double")
        ).cache()
        
        # 2. Extract movie statistics with null handling
        movie_stats = ratings_df.groupBy("movieId").agg(
            F.coalesce(F.avg("rating"), F.lit(0.0)).alias("movie_avg_rating"),
            F.coalesce(F.stddev("rating"), F.lit(0.0)).alias("movie_rating_std"),
            F.count("rating").cast("double").alias("movie_rating_count_double")
        ).cache()
        
        # 3. Extract and process genres
        genres_df = movies_df.select(
            "movieId",
            *[
                F.when(
                    F.array_contains(F.split("genres", "\\|"), genre), 
                    F.lit(1.0)
                ).otherwise(F.lit(0.0)).alias(genre)
                for genre in [
                    "Action", "Adventure", "Animation", "Children", 
                    "Comedy", "Crime", "Documentary", "Drama", "Fantasy",
                    "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", 
                    "Romance", "Sci-Fi", "Thriller", "War", "Western"
                ]
            ]
        ).cache()
        
        # 3. Combine all features
        enriched_data = ratings_df \
            .join(broadcast(user_stats), "userId") \
            .join(broadcast(movie_stats), "movieId") \
            .join(broadcast(genres_df), "movieId")
        
        # 4. Create feature vector
        feature_cols = [col for col in enriched_data.columns if col not in 
                    ["userId", "movieId", "rating"]]
        
        feature_assembler = VectorAssembler(
            inputCols=feature_cols,
            outputCol="features",
            handleInvalid="skip"
        )
        
        # 5. Prepare final dataset
        final_data = feature_assembler.transform(enriched_data) \
            .select("userId", "movieId", "rating", "features")
        
        # Handle null values
        final_data = final_data.na.fill(0)
        # Unpersist cached DataFrames
        user_stats.unpersist()
        movie_stats.unpersist()
        genres_df.unpersist()
        
        return final_data
    
    def fit(self, ratings_df, movies_df):
        # Prepare training data
        training_data = self.prepare_features(ratings_df, movies_df)
        
        # Train Random Forest model
        rf = RandomForestRegressor(
            featuresCol="features",
            labelCol="rating",
            maxDepth=self.max_depth,
            numTrees=self.num_trees,
            seed=42
        )
        
        self.model = rf.fit(training_data)
        return self
    
    def transform(self, ratings_df, movies_df):
        # Prepare test data and make predictions
        test_data = self.prepare_features(ratings_df, movies_df)
        return self.model.transform(test_data)

class EnhancedHybridRecommender:
    def __init__(self, hybrid_model, als_weight=0.4, item_cf_weight=0.3, supervised_weight=0.3):
        self.hybrid_model = hybrid_model  # Store the hybrid model
        self.als_weight = als_weight
        self.item_cf_weight = item_cf_weight
        self.supervised_weight = supervised_weight
        self.supervised_model = None
    
    def fit(self, training_data, movies_df):
        # Train only the supervised component since hybrid model is already trained
        self.supervised_model = SupervisedPredictor().fit(training_data, movies_df)
        return self
    
    def transform(self, test_data, movies_df):
        # Get predictions from hybrid model (this includes both ALS and Item-CF)
        hybrid_predictions = self.hybrid_model.transform(test_data)
        
        # Get predictions from supervised model
        supervised_predictions = self.supervised_model.transform(test_data, movies_df)
        
        # Combine predictions
        combined = hybrid_predictions.join(
            supervised_predictions.select(
                "userId", 
                "movieId",
                col("prediction").alias("supervised_prediction")
            ),
            ["userId", "movieId"],
            "outer"
        ).na.fill(0)
        
        # Calculate weighted prediction
        # Note: hybrid_predictions.prediction already contains combined ALS and Item-CF predictions
        final_predictions = combined.withColumn(
            "prediction",
            (col("prediction") * (self.als_weight + self.item_cf_weight) + 
             col("supervised_prediction") * self.supervised_weight)
        ).select("userId", "movieId", "prediction", "rating")
        
        return final_predictions

def train_enhanced_hybrid_model(training_data, validation_data, movies_df, hybrid_model):
    """Train enhanced hybrid model with all three components"""
    print("\nTraining Enhanced Hybrid Model...")

    weight_combinations = [
        (0.4, 0.3, 0.3),  # Equal-ish weights
        (0.6, 0.2, 0.2),  # ALS dominant
        (0.2, 0.6, 0.2),  # Item-CF dominant
        (0.2, 0.2, 0.6)   # Supervised dominant
    ]

    weight_performances = []
    for als_w, cf_w, sup_w in weight_combinations:
        print(f"\nTrying weights: ALS={als_w}, CF={cf_w}, Supervised={sup_w}")
        
        try:
            enhanced_model = EnhancedHybridRecommender(
                hybrid_model=hybrid_model,  # Pass the entire hybrid model
                als_weight=als_w,
                item_cf_weight=cf_w,
                supervised_weight=sup_w
            )
            
            # Train model (only trains the supervised component)
            enhanced_model.fit(training_data, movies_df)
            
            # Evaluate on validation set
            predictions = enhanced_model.transform(validation_data, movies_df)
            
            # Calculate metrics
            metrics = evaluate_metrics(predictions)
            weight_performances.append(((als_w, cf_w, sup_w), metrics))
            
            print(f"RMSE: {metrics['RMSE']:.4f}")
            print(f"MSE: {metrics['MSE']:.4f}")
            print(f"MAP@10: {metrics['MAP@10']:.4f}")
            
        except Exception as e:
            print(f"Error with weights {(als_w, cf_w, sup_w)}: {str(e)}")
            continue

    if weight_performances:
        best_weights, best_metrics = min(weight_performances, key=lambda x: x[1]['RMSE'])
        print("\nBest Configuration:")
        print(f"Weights (ALS, CF, Supervised): {best_weights}")
        print(f"RMSE: {best_metrics['RMSE']:.4f}")
        
        return EnhancedHybridRecommender(
            hybrid_model=hybrid_model,
            als_weight=best_weights[0],
            item_cf_weight=best_weights[1],
            supervised_weight=best_weights[2]
        ).fit(training_data, movies_df)
    else:
        raise Exception("No valid models were trained")

# Create Spark session
spark = create_spark_session()
# Load ratings data
ratings_df = load_ratings_data(spark, "data/ratings.csv")

# Take a smaller sample for initial testing
# sampled_df = ratings_df.sample(fraction=0.00001, seed=42)
#-------------------------------------------------------
# # Take only 2000 rows
# ratings_df_800 = ratings_df.limit(1300).cache()
# # Get unique movieIds from the sampled ratings
# sampled_movie_ids = ratings_df_800.select("movieId").distinct()

# # Load movies data and filter to only include movies in our ratings sample
# movies_df_full = spark.read.csv(
#     "data/movies.csv",
#     header=True,
#     inferSchema=True
# )
# movies_df = movies_df_full.join(
#     sampled_movie_ids,
#     "movieId",
#     "inner"
# ).cache()
#-------------------------------------------------------
training_data, validation_data = ratings_df.randomSplit([0.8, 0.2], seed=42)

print("Training data count:", training_data.count())
print("Validation data count:", validation_data.count())


# Train model and get validation results
als_model, als_metrics = train_als_model(training_data, validation_data)

# Save the model for later use
# best_model.save("models/best_als_model")
als_model.write().overwrite().save("models/best_als_model")

print("\nModel training completed and saved successfully!")
# Add this to your main code after training the ALS model
print("\nTraining Hybrid Model...")
hybrid_model = train_hybrid_model(als_model=als_model,training_data=training_data, validation_data=validation_data)

# Get metrics from hybrid model predictions
two_comp_predictions = hybrid_model.transform(validation_data)
two_comp_metrics = evaluate_metrics(two_comp_predictions)
# Add to your main code:
movies_df = spark.read.csv(
    "data/movies.csv",
    header=True,
    inferSchema=True
)

# After training three-component hybrid
print("\nTraining Enhanced Hybrid Model...")
enhanced_hybrid_model = train_enhanced_hybrid_model(training_data, validation_data, movies_df,hybrid_model)
# Get predictions and evaluate final model
final_predictions = enhanced_hybrid_model.transform(validation_data, movies_df)
three_comp_metrics = evaluate_metrics(final_predictions)
print("\nFinal Enhanced Hybrid Model Performance:")
print(f"RMSE: {three_comp_metrics['RMSE']:.4f}")
print(f"MSE: {three_comp_metrics['MSE']:.4f}")
print(f"MAP@10: {three_comp_metrics['MAP@10']:.4f}")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/02 14:53:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/02 14:53:37 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
                                                                                

Training data count: 19999186


                                                                                

Validation data count: 5000909
Starting model training...


24/12/02 14:54:45 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                


Best Model Parameters:
Rank: 10
MaxIter: 12
RegParam: 0.15

Validation Metrics:
RMSE: 0.8377
MSE: 0.7017
MAP@10: 0.2383


                                                                                


Model training completed and saved successfully!

Training Hybrid Model...

Training Hybrid Model...

Trying ALS weight: 0.3


24/12/02 15:07:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:19 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/12/02 15:07:24 WARN RowBasedKeyValueBatch: Calling spill() on


Trying ALS weight: 0.5


24/12/02 15:38:48 WARN CacheManager: Asked to cache already cached data.        
24/12/02 15:38:48 WARN CacheManager: Asked to cache already cached data.



Trying ALS weight: 0.7


                                                                                


Training Enhanced Hybrid Model...

Training Enhanced Hybrid Model...

Trying weights: ALS=0.4, CF=0.3, Supervised=0.3


24/12/02 15:53:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

RMSE: 1.1846
MSE: 1.4032
MAP@10: 0.2369

Trying weights: ALS=0.6, CF=0.2, Supervised=0.2


[Stage 3412:(56 + 8) / 100][Stage 3414:>(0 + 0) / 33][Stage 3416:>(0 + 0) / 16] 

In [2]:
# Check for null values in ratings_df
print("Ratings DataFrame")
print(f"Total rows: {ratings_df.count()}")
print(f"Rows with null userId: {ratings_df.where(ratings_df.userId.isNull()).count()}")
print(f"Rows with null movieId: {ratings_df.where(ratings_df.movieId.isNull()).count()}")
print(f"Rows with null rating: {ratings_df.where(ratings_df.rating.isNull()).count()}")

# Check for null values in movies_df
print("\nMovies DataFrame")
print(f"Total rows: {movies_df.count()}")
print(f"Rows with null movieId: {movies_df.where(movies_df.movieId.isNull()).count()}")
print(f"Rows with null title: {movies_df.where(movies_df.title.isNull()).count()}")
print(f"Rows with null genres: {movies_df.where(movies_df.genres.isNull()).count()}")

Ratings DataFrame


                                                                                

Total rows: 25000095


                                                                                

Rows with null userId: 0


                                                                                

Rows with null movieId: 0


                                                                                

Rows with null rating: 0

Movies DataFrame


                                                                                

Total rows: 1319


                                                                                

Rows with null movieId: 0


                                                                                

Rows with null title: 0




Rows with null genres: 0


                                                                                

In [3]:
# For ratings DataFrame
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
def visualize_model_performance(als_metrics, two_comp_metrics, three_comp_metrics):
    """
    Create comprehensive visualizations comparing performance of all three models:
    1. ALS
    2. Two-Component Hybrid (ALS + Item-CF)
    3. Three-Component Hybrid (ALS + Item-CF + Supervised)
    """
    # Set up Seaborn style for better visuals
    sns.set_style("whitegrid")
    
    # Prepare data in format suitable for Seaborn
    models = ['Base ALS', 'Hybrid (ALS+CF)', 'Enhanced Hybrid']
    metrics_data = []
    
    # Organize metrics data
    for model, metrics in zip(models, [als_metrics, two_comp_metrics, three_comp_metrics]):
        for metric_name, value in metrics.items():
            metrics_data.append({
                'Model': model,
                'Metric': metric_name,
                'Value': value
            })
    
    df = pd.DataFrame(metrics_data)
    
    # 1. Error Metrics Comparison Plot
    plt.figure(figsize=(12, 6))
    error_metrics = df[df['Metric'].isin(['RMSE', 'MSE'])]
    error_plot = sns.barplot(
        data=error_metrics,
        x='Model',
        y='Value',
        hue='Metric'
    )
    
    plt.title('Prediction Error Comparison', pad=20)
    plt.ylabel('Error Value (Lower is Better)')
    
    # Add value labels
    for container in error_plot.containers:
        error_plot.bar_label(container, fmt='%.4f', padding=3)
    
    plt.legend(title='Error Metric')
    plt.xticks(rotation=30)
    plt.tight_layout()
    plt.savefig('error_comparison.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 2. MAP@10 Comparison Plot
    plt.figure(figsize=(10, 6))
    map_data = df[df['Metric'] == 'MAP@10']
    map_plot = sns.barplot(
        data=map_data,
        x='Model',
        y='Value',
        color='skyblue'
    )
    
    plt.title('MAP@10 Comparison', pad=20)
    plt.ylabel('MAP@10 Score (Higher is Better)')
    
    # Add value labels
    map_plot.bar_label(map_plot.containers[0], fmt='%.4f', padding=3)
    
    plt.xticks(rotation=30)
    plt.tight_layout()
    plt.savefig('map_comparison.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 3. Overall Performance Heatmap
    plt.figure(figsize=(10, 6))
    heatmap_data = df.pivot(index='Model', columns='Metric', values='Value')
    sns.heatmap(
        heatmap_data,
        annot=True,
        fmt='.4f',
        cmap='YlOrRd',
        center=0,
        cbar_kws={'label': 'Metric Value'}
    )
    
    plt.title('Model Performance Heatmap', pad=20)
    plt.tight_layout()
    plt.savefig('performance_heatmap.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 4. Relative Improvement Plot
    base_values = {metric: als_metrics[metric] for metric in als_metrics.keys()}
    improvement_data = []
    
    for model, metrics in [("Hybrid", two_comp_metrics), 
                          ("Enhanced Hybrid", three_comp_metrics)]:
        for metric, value in metrics.items():
            improvement = ((value - base_values[metric]) / base_values[metric]) * 100
            improvement_data.append({
                "Model": model,
                "Metric": metric,
                "Improvement %": improvement
            })
    
    improvement_df = pd.DataFrame(improvement_data)
    
    plt.figure(figsize=(10, 6))
    improvement_plot = sns.barplot(
        data=improvement_df,
        x="Metric",
        y="Improvement %",
        hue="Model",
        palette="Set2"
    )
    
    plt.title("Percentage Improvement Over Base ALS Model")
    plt.axhline(y=0, color='r', linestyle='--', alpha=0.3)
    plt.xticks(rotation=30)
    
    # Add value labels
    for container in improvement_plot.containers:
        improvement_plot.bar_label(container, fmt='%.2f%%', padding=3)
    
    plt.tight_layout()
    plt.savefig('improvement_analysis.png', dpi=300, bbox_inches='tight')
    plt.close()

# Usage:
# First ensure required imports
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

# Create visualizations
visualize_model_performance(als_metrics, two_comp_metrics, three_comp_metrics)

In [3]:
# Unpersist cached DataFrames
if 'training_data' in locals():
    training_data.unpersist()
if 'validation_data' in locals():
    validation_data.unpersist()
# Stop Spark session
spark.stop()

In [None]:
plt.figure(figsize=(10, 6))
plt.bar(als_metrics.keys(), als_metrics.values())
plt.title('Model Performance Metrics')
plt.ylabel('Score')
plt.savefig('model_metrics.png')
plt.close()
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

def create_performance_comparison_plots_sns(als_metrics, two_comp_metrics, three_comp_metrics):
    """
    Create visualizations comparing all three models using seaborn
    """
    # Prepare data in long format for seaborn
    models = ['Base ALS', 'Two-Component', 'Three-Component']
    metrics_data = []
    
    # Collect data for each model
    for model, metrics in zip(models, [als_metrics, two_comp_metrics, three_comp_metrics]):
        for metric_name, value in metrics.items():
            metrics_data.append({
                'Model': model,
                'Metric': metric_name,
                'Value': value
            })
    
    # Convert to DataFrame
    df = pd.DataFrame(metrics_data)
    
    # Set style
    sns.set_style("whitegrid")
    sns.set_palette("husl")
    
    # 1. Error Metrics Plot (RMSE and MSE)
    plt.figure(figsize=(15, 6))
    
    # Filter for RMSE and MSE
    error_metrics = df[df['Metric'].isin(['RMSE', 'MSE'])]
    
    # Create grouped bar plot
    error_plot = sns.barplot(
        data=error_metrics,
        x='Model',
        y='Value',
        hue='Metric'
    )
    
    # Customize plot
    plt.title('Error Metrics Comparison Across Models', pad=20, size=14)
    plt.ylabel('Value (lower is better)')
    
    # Add value labels on bars
    for container in error_plot.containers:
        error_plot.bar_label(container, fmt='%.4f', padding=3)
    
    plt.legend(title='Metric')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('error_metrics_comparison_sns.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 2. MAP@10 Plot
    plt.figure(figsize=(10, 6))
    
    # Filter for MAP@10
    map_data = df[df['Metric'] == 'MAP@10']
    
    # Create bar plot
    map_plot = sns.barplot(
        data=map_data,
        x='Model',
        y='Value',
        palette="husl"
    )
    
    # Customize plot
    plt.title('MAP@10 Comparison Across Models', pad=20, size=14)
    plt.ylabel('MAP@10 (higher is better)')
    
    # Add value labels on bars
    map_plot.bar_label(map_plot.containers[0], fmt='%.4f', padding=3)
    
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('map_comparison_sns.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 3. Heatmap of all metrics
    plt.figure(figsize=(12, 8))
    
    # Prepare data for heatmap
    heatmap_data = df.pivot(index='Model', columns='Metric', values='Value')
    
    # Create heatmap
    sns.heatmap(
        heatmap_data,
        annot=True,
        fmt='.4f',
        cmap='YlOrRd',
        center=0,
        cbar_kws={'label': 'Value'}
    )
    
    plt.title('Performance Metrics Heatmap', pad=20, size=14)
    plt.tight_layout()
    plt.savefig('metrics_heatmap_sns.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 4. Point plot with confidence intervals
    plt.figure(figsize=(12, 6))
    
    sns.pointplot(
        data=df,
        x='Model',
        y='Value',
        hue='Metric',
        dodge=True,
        markers=['o', 's', 'D'],
        linestyles=['-', '--', ':']
    )
    
    plt.title('Performance Metrics with Confidence Intervals', pad=20, size=14)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('metrics_pointplot_sns.png', dpi=300, bbox_inches='tight')
    plt.close()

# Create visualizations with actual metrics
create_performance_comparison_plots_sns(als_metrics, two_comp_metrics, three_comp_metrics)
def radar_comparison(als_metrics, two_comp_metrics, three_comp_metrics):
    """
    Create radar chart and comprehensive model comparison visualizations
    """
    import matplotlib.pyplot as plt
    import numpy as np
    
    # 1. Comprehensive Model Comparison
    plt.figure(figsize=(15, 6))
    
    # Set up data
    models = ['Base ALS', 'Two-Component', 'Three-Component']
    metrics = {
        'RMSE': [als_metrics['RMSE'], two_comp_metrics['RMSE'], three_comp_metrics['RMSE']],
        'MSE': [als_metrics['MSE'], two_comp_metrics['MSE'], three_comp_metrics['MSE']],
        'MAP@10': [als_metrics['MAP@10'], two_comp_metrics['MAP@10'], three_comp_metrics['MAP@10']]
    }
    
    # Position of bars on x-axis
    x = np.arange(len(models))
    width = 0.25  # Width of bars
    
    # Create grouped bars
    plt.bar(x - width, metrics['RMSE'], width, label='RMSE', color='#2ecc71')
    plt.bar(x, metrics['MSE'], width, label='MSE', color='#3498db')
    plt.bar(x + width, metrics['MAP@10'], width, label='MAP@10', color='#9b59b6')
    
    # Customize plot
    plt.xlabel('Models')
    plt.ylabel('Metric Values')
    plt.title('Comprehensive Model Comparison')
    plt.xticks(x, models)
    plt.legend()
    
    # Add value labels on bars
    for i in range(len(models)):
        plt.text(i - width, metrics['RMSE'][i], f'{metrics["RMSE"][i]:.4f}', 
                ha='center', va='bottom')
        plt.text(i, metrics['MSE'][i], f'{metrics["MSE"][i]:.4f}', 
                ha='center', va='bottom')
        plt.text(i + width, metrics['MAP@10'][i], f'{metrics["MAP@10"][i]:.4f}', 
                ha='center', va='bottom')
    
    plt.tight_layout()
    plt.savefig('model_comparison.png', dpi=300, bbox_inches='tight')
    plt.close()

    # 2. Radar Chart
    plt.figure(figsize=(10, 10))
    
    # Normalize metrics for radar chart
    max_rmse = max(metrics['RMSE'])
    max_mse = max(metrics['MSE'])
    max_map = max(metrics['MAP@10'])
    
    # Invert RMSE and MSE because lower is better
    normalized_metrics = {
        'Base ALS': [1 - (als_metrics['RMSE']/max_rmse),
                    1 - (als_metrics['MSE']/max_mse),
                    als_metrics['MAP@10']/max_map],
        'Two-Component': [1 - (two_comp_metrics['RMSE']/max_rmse),
                         1 - (two_comp_metrics['MSE']/max_mse),
                         two_comp_metrics['MAP@10']/max_map],
        'Three-Component': [1 - (three_comp_metrics['RMSE']/max_rmse),
                          1 - (three_comp_metrics['MSE']/max_mse),
                          three_comp_metrics['MAP@10']/max_map]
    }
    
    # Set up the angles of the radar chart
    labels = ['RMSE\n(inverted)', 'MSE\n(inverted)', 'MAP@10']
    angles = np.linspace(0, 2*np.pi, len(labels), endpoint=False).tolist()
    angles += angles[:1]  # complete the circle
    
    # Plot for each model
    ax = plt.subplot(111, polar=True)
    colors = ['#2ecc71', '#3498db', '#9b59b6']
    
    for model, color in zip(normalized_metrics.keys(), colors):
        values = normalized_metrics[model]
        values += values[:1]  # complete the circle
        ax.plot(angles, values, 'o-', linewidth=2, label=model, color=color)
        ax.fill(angles, values, alpha=0.25, color=color)
    
    # Set the labels and title
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(labels)
    plt.title('Relative Performance Across Metrics', pad=20)
    plt.legend(loc='upper right', bbox_to_anchor=(0.1, 0.1))

    plt.tight_layout()
    plt.savefig('radar_comparison.png', dpi=300, bbox_inches='tight')
    plt.close()

# Usage example:
radar_comparison(als_metrics, two_comp_metrics, three_comp_metrics)
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

def additional_insights(als_metrics, two_comp_metrics, three_comp_metrics):
    """Create additional visualizations for deeper insights"""
    
    # 1. Performance Distribution Plot
    # Prepare data
    data = []
    for model, metrics in [("Base ALS", als_metrics), 
                          ("Two-Component", two_comp_metrics), 
                          ("Three-Component", three_comp_metrics)]:
        for metric, value in metrics.items():
            data.append({
                "Model": model,
                "Metric": metric,
                "Value": value,
                "Normalized Value": (value - min(als_metrics[metric], 
                                               two_comp_metrics[metric], 
                                               three_comp_metrics[metric])) / 
                                  (max(als_metrics[metric], 
                                      two_comp_metrics[metric], 
                                      three_comp_metrics[metric]) - 
                                   min(als_metrics[metric], 
                                       two_comp_metrics[metric], 
                                       three_comp_metrics[metric]))
            })
    
    df = pd.DataFrame(data)
    
    # Create violin plot
    plt.figure(figsize=(12, 6))
    sns.violinplot(data=df, x="Model", y="Normalized Value", hue="Metric")
    plt.title("Distribution of Normalized Metrics Across Models")
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('metric_distribution.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 2. Correlation Heatmap
    plt.figure(figsize=(10, 8))
    correlation_matrix = pd.pivot_table(df, 
                                      values='Value', 
                                      index='Model', 
                                      columns='Metric').corr()
    sns.heatmap(correlation_matrix, 
                annot=True, 
                cmap='coolwarm', 
                center=0, 
                fmt='.2f')
    plt.title("Correlation Between Metrics")
    plt.tight_layout()
    plt.savefig('metric_correlation.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 3. Relative Improvement Plot
    # Calculate improvement percentages relative to base ALS
    improvement_data = []
    base_values = {metric: als_metrics[metric] for metric in als_metrics.keys()}
    
    for model, metrics in [("Two-Component", two_comp_metrics), 
                          ("Three-Component", three_comp_metrics)]:
        for metric, value in metrics.items():
            improvement = ((value - base_values[metric]) / base_values[metric]) * 100
            improvement_data.append({
                "Model": model,
                "Metric": metric,
                "Improvement %": improvement
            })
    
    improvement_df = pd.DataFrame(improvement_data)
    
    plt.figure(figsize=(10, 6))
    sns.barplot(data=improvement_df, 
                x="Metric", 
                y="Improvement %", 
                hue="Model", 
                palette="Set2")
    plt.title("Percentage Improvement Over Base ALS Model")
    plt.axhline(y=0, color='r', linestyle='--', alpha=0.3)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig('improvement_analysis.png', dpi=300, bbox_inches='tight')
    plt.close()
    
    # 4. Performance Trade-off Plot
    plt.figure(figsize=(10, 6))
    sns.scatterplot(data=df[df['Metric'].isin(['RMSE', 'MAP@10'])],
                    x='Value',
                    y='Normalized Value',
                    hue='Model',
                    style='Metric',
                    s=100)
    plt.title("Performance Trade-offs Between RMSE and MAP@10")
    plt.tight_layout()
    plt.savefig('tradeoff_analysis.png', dpi=300, bbox_inches='tight')
    plt.close()

# Example usage:
additional_insights(als_metrics, two_comp_metrics, three_comp_metrics)
# Unpersist cached DataFrames
if 'training_data' in locals():
    training_data.unpersist()
if 'validation_data' in locals():
    validation_data.unpersist()
# Stop Spark session
spark.stop()