In [1]:
from google.cloud import bigquery

client = bigquery.Client(project="virtualization-and-cloud")

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "16384m") \
    .config("spark.executor.memory", "16384m") \
    .getOrCreate()


In [6]:
query = """
SELECT userId, movieId, rating
FROM `virtualization-and-cloud.movies.ratings`
"""
ratings_df = client.query(query).to_dataframe()

In [8]:
# Convert to Spark DataFrame
ratings_sdf = spark.createDataFrame(ratings_df)

In [9]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Convert userId and movieId to integer if needed
from pyspark.sql.functions import col
ratings_sdf = ratings_sdf.select(
    col("userId").cast("int"),
    col("movieId").cast("int"),
    col("rating").cast("float")
)

In [8]:
(training, test) = ratings_sdf.randomSplit([0.8, 0.2], seed=42)

In [11]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
)

In [12]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(als.maxIter, [5, 10]) \
    .build()

In [13]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

In [14]:
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,         # 3-fold cross-validation
    parallelism=4       # Speeds up grid search using parallelism
)

In [16]:
cv_model = cv.fit(training)

In [22]:
best_model = cv_model.bestModel

In [23]:
predictions = best_model.transform(test)

In [17]:
cv_model.bestModel.save("als_best_model")

In [3]:
from pyspark.ml.recommendation import ALSModel
loaded_model = ALSModel.load("als_best_model")

In [14]:
k = 10
user_recommendations = loaded_model.recommendForAllUsers(k)

In [15]:
from pyspark.sql.functions import col
from pyspark.sql.functions import collect_set

# Assume ratings_df contains userId, movieId, rating
# Define "relevant" items as those rated >= 4.0 (you can tweak this threshold)
relevant_ratings = ratings_sdf.filter(col("rating") >= 4.0) \
                              .groupBy("userId") \
                              .agg(collect_set("movieId").alias("relevant_movies"))

In [27]:
from pyspark.sql.functions import expr

# Join recommendations with ground truth
recommendations_with_truth = user_recommendations.join(relevant_ratings, on="userId")

# Convert recommendation list to just movieIds
recommendations_with_truth = recommendations_with_truth.withColumn(
    "recommended_movies",
    expr("transform(recommendations, x -> x.movieId)")
)

In [19]:
k = 10  # or whatever value you want for @K

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import math

def precision_at_k(recommended, relevant):
    if not relevant: return 0.0
    recommended_k = recommended[:k]
    hits = len(set(recommended_k) & set(relevant))
    return hits / float(k)

def recall_at_k(recommended, relevant):
    if not relevant: return 0.0
    recommended_k = recommended[:k]
    hits = len(set(recommended_k) & set(relevant))
    return hits / float(len(relevant))

def make_apk_udf(k):
    def apk(recommended, relevant):
        if not relevant: return 0.0
        score = 0.0
        hits = 0
        for i, p in enumerate(recommended[:k]):
            if p in relevant:
                hits += 1
                score += hits / (i + 1.0)
        return score / min(len(relevant), k)
    return udf(apk, FloatType())

def make_ndcg_udf(k):
    def ndcg(recommended, relevant):
        if not relevant: return 0.0
        dcg = 0.0
        for i, p in enumerate(recommended[:k]):
            if p in relevant:
                dcg += 1.0 / math.log2(i + 2)
        idcg = sum(1.0 / math.log2(i + 2) for i in range(min(len(relevant), k)))
        return dcg / idcg if idcg > 0 else 0.0
    return udf(ndcg, FloatType())

# Register UDFs with fixed k
precision_udf = udf(precision_at_k, FloatType())
recall_udf = udf(recall_at_k, FloatType())
apk_udf = make_apk_udf(k)
ndcg_udf = make_ndcg_udf(k)

In [20]:
metrics_df = recommendations_with_truth \
    .withColumn("precision", precision_udf("recommended_movies", "relevant_movies")) \
    .withColumn("recall", recall_udf("recommended_movies", "relevant_movies")) \
    .withColumn("apk", apk_udf("recommended_movies", "relevant_movies")) \
    .withColumn("ndcg", ndcg_udf("recommended_movies", "relevant_movies"))

final_metrics = metrics_df.select("precision", "recall", "apk", "ndcg").agg(
    {"precision": "avg", "recall": "avg", "apk": "avg", "ndcg": "avg"}
).collect()[0]

print(f"Precision@{k}: {final_metrics['avg(precision)']:.4f}")
print(f"Recall@{k}: {final_metrics['avg(recall)']:.4f}")
print(f"MAP@{k}: {final_metrics['avg(apk)']:.4f}")
print(f"NDCG@{k}: {final_metrics['avg(ndcg)']:.4f}")

Precision@10: 0.0001
Recall@10: 0.0003
MAP@10: 0.0002
NDCG@10: 0.0003


In [30]:
# Check the columns in both DataFrames
print("Columns in user_recommendations:", user_recommendations.columns)
print("Columns in relevant_ratings:", relevant_ratings.columns)

Columns in user_recommendations: ['userId', 'recommendations']
Columns in relevant_ratings: ['userId', 'relevant_movies']


In [24]:
from pyspark.sql.functions import when
predictions = predictions.withColumn(
    "prediction",
    when(col("prediction") > 5, 5).when(col("prediction") < 1, 1).otherwise(col("prediction"))
)

rmse = evaluator.evaluate(predictions)
print(f"Best model RMSE after tuning: {rmse:.4f}")

Best model RMSE after tuning: 0.8138


In [25]:
best_rank = cv_model.bestModel._java_obj.parent().getRank()
best_regParam = cv_model.bestModel._java_obj.parent().getRegParam()
best_maxIter = cv_model.bestModel._java_obj.parent().getMaxIter()

print(f"Best Rank: {best_rank}")
print(f"Best RegParam: {best_regParam}")
print(f"Best MaxIter: {best_maxIter}")

Best Rank: 30
Best RegParam: 0.1
Best MaxIter: 10


In [20]:
import json

best_params = {
    "rank": 30,
    "regParam": 0.1,
    "maxIter": 10
}

with open("best_als_params.json", "w") as f:
    json.dump(best_params, f)

In [26]:
user_recs = best_model.recommendForAllUsers(100)

In [27]:
from pyspark.sql.functions import explode, col, current_timestamp, row_number
from pyspark.sql.window import Window

# Flatten the nested recommendations
flattened_recs = user_recs.select(
    col("userId"),
    explode(col("recommendations")).alias("rec")
).select(
    col("userId"),
    col("rec.movieId").alias("movieId"),
    col("rec.rating").alias("predicted_rating")
)

# Add ranking per user
window = Window.partitionBy("userId").orderBy(col("predicted_rating").desc())
flattened_recs = flattened_recs.withColumn("rank", row_number().over(window))

# Add timestamp column
flattened_recs = flattened_recs.withColumn("generated_at", current_timestamp())

In [50]:
import pandas as pd
df = pd.read_parquet("flattened_recs_parquet")

In [53]:
table_ref = client.dataset("movies").table("recommendationsALS")

In [55]:
client.load_table_from_dataframe(df, table_ref, job_config=bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE)).result()

LoadJob<project=virtualization-and-cloud, location=US, id=a3394507-fcaa-4c11-a9bc-130f0568a330>