In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0xffd8d3f1aab0>

In [0]:
df_movie = spark.read.csv("/Volumes/workspace/default/data/movies.csv", header=True, inferSchema=True)
df_rating = spark.read.csv("/Volumes/workspace/default/data/ratings.csv", header=True, inferSchema=True)

In [0]:
from pyspark.sql.functions import col

df = (
    df_rating.join(df_movie, on="movieId", how="inner")
             .orderBy(col("userId"))
)

In [0]:
from pyspark.sql import functions as F

user_movie_df = (
    df
        .groupBy("userId")
        .pivot("title")
        .agg(F.first("rating"))
)

In [0]:
import mlflow
import mlflow.spark  # for PySpark models
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Split data into training and test sets
train, test = df.randomSplit([0.8, 0.2], seed=42)

mlflow.set_experiment("/Users/samiasaeed0006@gmail.com/hybrid-recommender")

# Build recommendation model using ALS
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"
)
model = als.fit(train)

# Save the model to UC volume path
# model.write().overwrite().save("/Volumes/workspace/default/models")

# Generate predictions
predictions = model.transform(test)

# Evaluate the model
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) = " + str(rmse))

# Take a small sample and convert to pandas
input_example = test.limit(3).toPandas()
output_example = model.transform(test.limit(3)).toPandas()

from mlflow.models.signature import infer_signature
signature = infer_signature(input_example, output_example)

with mlflow.start_run(run_name="HybridRecommender") as run:
    # Log parameters
    mlflow.log_param("rank", 10)
    mlflow.log_param("maxIter", 5)
    mlflow.log_param("regParam", 0.01)
    
    # Log metric
    mlflow.log_metric("rmse", rmse)
    
    mlflow.spark.log_model(
    spark_model=model,
    artifact_path="als_model",
    registered_model_name="HybridRecommender",
    dfs_tmpdir="/Volumes/workspace/default/my_uc_volume/tmp_mlflow",
    signature=signature,
    input_example=input_example
    )
    
    print(f"Model registered in run: {run.info.run_id}")

In [0]:
%fs mkdirs /Volumes/workspace/default/my_uc_volume/tmp_mlflow

True

In [0]:
# Item-Item CF - Similarity Computation
mean_ratings = df.groupBy("movieId").agg(F.avg("rating").alias("mean_rating"))
normalized_ratings_df = df.join(mean_ratings, "movieId")
normalized_ratings_df = normalized_ratings_df.withColumn("norm_rating", col("rating") - col("mean_rating"))

In [0]:
from pyspark.sql.functions import col, count, sqrt, sum as sql_sum, when, lit

from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, sqrt, lit, when

def compute_similarity(df):
    # Perform join on userId and ensure correct column names are used for movieId
    joined_df = df.alias("df1").join(df.alias("df2"), "userId")
    
    # Ensure we are comparing different movieId pairs (df1.movieId < df2.movieId)
    joined_df = joined_df.filter("df1.movieId < df2.movieId")
    
    # Perform aggregation to calculate similarity components
    joined_df = joined_df.groupBy("df1.movieId", "df2.movieId").agg(
        count(col("df1.movieId")).alias("numPairs"),
        F.sum(col("df1.norm_rating") * col("df2.norm_rating")).alias("sum_xy"),
        F.sum(col("df1.norm_rating") * col("df1.norm_rating")).alias("sum_xx"),
        F.sum(col("df2.norm_rating") * col("df2.norm_rating")).alias("sum_yy")
    )

    # Compute similarity (numerator and denominator)
    result_df = joined_df.withColumn("numerator", col("sum_xy"))
    result_df = result_df.withColumn("denominator", sqrt(col("sum_xx")) * sqrt(col("sum_yy")))
    result_df = result_df.withColumn("similarity",
                                     when(col("denominator") != 0, col("numerator") / col("denominator"))
                                     .otherwise(lit(0)))

    # Return the resulting DataFrame with movieId1, movieId2, and similarity
    return result_df.select(col("df1.movieId").alias("movieId1"), 
                            col("df2.movieId").alias("movieId2"), 
                            "similarity")

# Compute similarity and save to Parquet
movie_similarity_df = compute_similarity(normalized_ratings_df)
movie_similarity_df.write.mode("overwrite").parquet("/Volumes/workspace/default/models/item_item_similarities")


INFO:py4j.clientserver:Received command c on object id p0


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

def calculate_item_cf_predictions(user_movie_pairs, item_similarity_df, user_ratings_df, N=10):
    # Alias the DataFrames for clarity
    similarities = item_similarity_df.alias("sims")
    ratings = user_ratings_df.alias("ratings")

    # Join the user-item pairs with item similarities and user ratings
    user_item_sims = user_movie_pairs.alias("pairs").join(
        similarities, col("pairs.movieId1") == col("sims.movieId1")  # Corrected column name
    ).join(
        ratings, (col("ratings.userId") == col("pairs.userId")) & (col("ratings.movieId") == col("sims.movieId2"))
    )

    # Select necessary columns
    user_item_sims = user_item_sims.select(
        col("pairs.userId"),
        col("pairs.movieId1").alias("target_movieId"),  # Corrected column name
        col("sims.movieId2").alias("similar_movieId"),
        col("sims.similarity"),
        col("ratings.rating").alias("similar_movie_rating")
    )

    # Window specification for ranking top N similar movies for each user-target movie pair
    windowSpec = Window.partitionBy("userId", "target_movieId").orderBy(col("similarity").desc())
    top_n_similar = user_item_sims.withColumn("rank", rank().over(windowSpec)).filter(col("rank") <= N)

    # Aggregate weighted ratings and similarity sum
    weighted_ratings = top_n_similar.groupBy("userId", "target_movieId").agg(
        F.sum(col("similarity") * col("similar_movie_rating")).alias("weighted_sum"),
        F.sum("similarity").alias("similarity_sum")
    )

    # Calculate final predictions, handling division by zero with try_divide
    predictions = weighted_ratings.withColumn(
        "prediction", 
        F.when(col("similarity_sum") != 0, col("weighted_sum") / col("similarity_sum"))
         .otherwise(F.lit(None))  # Return NULL if similarity_sum is zero
    ).select("userId", "target_movieId", "prediction")

    return predictions


In [0]:
# Assuming 'test' is your test dataset, 'movie_similarity_df' contains movie similarities, 
# and 'train' is your training dataset containing ratings

# Generate user-movie pairs for prediction
user_movie_pairs = test.select("userId").distinct().crossJoin(movie_similarity_df.select("movieId1").distinct())

# Generate item CF predictions safely with division by zero handled
item_cf_predictions = calculate_item_cf_predictions(user_movie_pairs, movie_similarity_df, train)

# Show the first few predictions
item_cf_predictions.show(5)



+------+--------------+-----------------+
|userId|target_movieId|       prediction|
+------+--------------+-----------------+
|     1|            13|4.466666666666667|
|     1|            30|4.277777777777778|
|     1|            47|4.827372638646121|
|     1|            48|4.687406724897038|
|     1|            55|             NULL|
+------+--------------+-----------------+
only showing top 5 rows


In [0]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

# Combine ALS and Item CF Predictions
als_predictions = predictions.withColumnRenamed("prediction", "als_prediction")
item_cf_predictions = item_cf_predictions.withColumnRenamed("prediction", "cf_prediction")
item_cf_predictions = item_cf_predictions.withColumnRenamed("target_movieId", "movieId")

# Join ALS and Item CF predictions on userId and movieId
combined_predictions = als_predictions.join(item_cf_predictions, ["userId", "movieId"], "inner")

# Calculate the hybrid prediction
combined_predictions = combined_predictions.withColumn(
    "hybrid_prediction",
    (F.col("als_prediction") + F.col("cf_prediction")) / 2
)

# Save the hybrid predictions
combined_predictions.write.mode("overwrite").parquet("/Volumes/workspace/default/models/hybrid_predictions")

# Optional: Evaluate the hybrid model using RMSE

# Rename rating in test to avoid ambiguity
test_renamed = test.withColumnRenamed("rating", "true_rating")

# Join test with combined predictions
evaluation_df = test_renamed.join(combined_predictions, ["userId", "movieId"], "inner")

# Filter out rows where either 'true_rating' or 'hybrid_prediction' is null
evaluation_df = evaluation_df.filter(
    evaluation_df["true_rating"].isNotNull() & evaluation_df["hybrid_prediction"].isNotNull()
)

# Evaluate RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="true_rating", predictionCol="hybrid_prediction")
hybrid_rmse = evaluator.evaluate(evaluation_df)

print(f"Hybrid Model RMSE: {hybrid_rmse}")


INFO:py4j.clientserver:Received command c on object id p0


Hybrid Model RMSE: 10.883646393091693


In [0]:
from pyspark.sql import Row

# Save metrics to file
metrics_data = [
    Row(metric="ALS RMSE", value=float(rmse)),
    Row(metric="Hybrid Model RMSE", value=float(hybrid_rmse)),
]

# Create a DataFrame from the metrics data
metrics_df = spark.createDataFrame(metrics_data)

# Apply coalesce before writing to reduce the number of output files (to 1)
metrics_df.coalesce(1).write.format("csv").option("header", "true").save("/Volumes/workspace/default/models/metrics.csv")


In [0]:
# Join ALS, Item CF predictions
combined_predictions = als_predictions.join(item_cf_predictions, ["userId", "movieId"], "inner")

In [0]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

# Define weight combinations to try
weight_values = [
    (0.2, 0.3),
    (0.3, 0.3),
    (0.4, 0.3)
]

rmse_values = []

# Rename rating to avoid ambiguity
test_renamed = test.withColumnRenamed("rating", "true_rating")

for weight_als, weight_cf in weight_values:
    # Calculate hybrid prediction
    hybrid_df = combined_predictions.withColumn(
        "hybrid_prediction",
        F.col("als_prediction") * weight_als + F.col("cf_prediction") * weight_cf
    )

    # Join with test data
    evaluation_df = hybrid_df.join(test_renamed, ["userId", "movieId"], "inner")

    # Remove rows with nulls in label or prediction
    evaluation_df = evaluation_df.filter(
        (F.col("true_rating").isNotNull()) & (F.col("hybrid_prediction").isNotNull())
    )

    # Evaluate RMSE
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="true_rating", predictionCol="hybrid_prediction")
    h2_rmse = evaluator.evaluate(evaluation_df)
    rmse_values.append(h2_rmse)

# Find best weights
best_index = rmse_values.index(min(rmse_values))
best_weights = weight_values[best_index]

# Save best hybrid predictions
best_hybrid_df = combined_predictions.withColumn(
    "hybrid_prediction",
    F.col("als_prediction") * best_weights[0] + F.col("cf_prediction") * best_weights[1]
)
best_hybrid_df.write.mode("overwrite").parquet("/Volumes/workspace/default/models/best_hybrid_model")

print(f"Best hybrid model weights: {best_weights}")
print(f"Best Hybrid Model RMSE: {min(rmse_values)}")


Best hybrid model weights: (0.4, 0.3)
Best Hybrid Model RMSE: 6.6516218576976


In [0]:
import mlflow
import mlflow.pyfunc
from pyspark.sql import functions as F
from mlflow.models.signature import infer_signature

# Define a custom hybrid model class inheriting from mlflow.pyfunc.PythonModel
class HybridRecommendationModel(mlflow.pyfunc.PythonModel):
    def __init__(self, als_weight=0.5, cf_weight=0.5):
        self.als_weight = als_weight
        self.cf_weight = cf_weight

    def predict(self, context, model_input):
        # model_input is a DataFrame with ALS and CF predictions
        als_predictions = model_input['als_prediction']
        cf_predictions = model_input['cf_prediction']
        
        # Calculate hybrid prediction
        hybrid_prediction = (als_predictions * self.als_weight) + (cf_predictions * self.cf_weight)
        return hybrid_prediction

# Log the hybrid model to MLflow

with mlflow.start_run():
    # Set the weights for the hybrid model
    hybrid_model = HybridRecommendationModel(als_weight=0.4, cf_weight=0.6)
    
    # Log the model using mlflow.pyfunc
    mlflow.pyfunc.log_model("hybrid_model", python_model=hybrid_model)

    # Optionally log parameters if needed
    mlflow.log_param("als_weight", 0.4)
    mlflow.log_param("cf_weight", 0.6)

    # Take a small sample of input and output for signature
    input_example = test.limit(3).toPandas()
    output_example = model.transform(test.limit(3)).toPandas()

    # Infer signature of the model
    signature = infer_signature(input_example, output_example)

    # Log the model with signature and example input
    mlflow.pyfunc.log_model(
        "hybrid_model_with_signature",
        python_model=hybrid_model,
        signature=signature,
        input_example=input_example
    )

    print(f"Model registered in run: {mlflow.active_run().info.run_id}")


In [0]:
# Log predictions (optional)
best_hybrid_df = combined_predictions.withColumn(
    "hybrid_prediction",
    F.col("als_prediction") * 0.4 + F.col("cf_prediction") * 0.6
)
# Assuming you want to log predictions for reference
mlflow.log_artifact("/Volumes/workspace/default/models/best_hybrid_model")
