In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## a. Alternating Least Squares (ALS)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import expr, rank, col, collect_list
from pyspark.ml.feature import StringIndexer

In [None]:
df = spark.read.csv("sample-user-filtered-2023.csv", header=True, inferSchema=True)

In [None]:
df = df.orderBy("user_id", "anime_id")

In [None]:
df.show()

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|      19|     9|
|      1|      20|    10|
|      1|      21|     9|
|      1|     154|     8|
|      1|     199|    10|
|      1|     269|     7|
|      1|     442|     7|
|      1|     481|     8|
|      1|    1535|     9|
|      1|    1575|    10|
|      1|    1735|    10|
|      1|    2472|     7|
|      1|    4224|     9|
|      1|    5081|     8|
|      1|    5114|    10|
|      1|    6547|     6|
|      1|    7674|     7|
|      1|    9253|    10|
|      1|    9919|     8|
|      1|   10087|     7|
+-------+--------+------+
only showing top 20 rows



In [None]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
user_window = Window.partitionBy("user_id").orderBy(col("anime_id").desc())
df = df.withColumn("num_animes", expr("count(*) over (partition by user_id)"))

In [None]:
percent_animes_to_mask = 0.3

df_final = df.withColumn("num_animes_to_mask", (col("num_animes") * percent_animes_to_mask).cast("int"))
df_final = df_final.withColumn("anime_rank", rank().over(user_window))

indexer_user = StringIndexer(inputCol='user_id', outputCol='user_index').setHandleInvalid("keep")
indexer_anime = StringIndexer(inputCol='anime_id', outputCol='anime_index').setHandleInvalid("keep")

df_final = indexer_user.fit(df_final).transform(df_final)
df_final = indexer_anime.fit(df_final).transform(df_final)

df_final = df_final.withColumn('user_index', df_final['user_index'].cast('integer'))
df_final = df_final.withColumn('anime_index', df_final['anime_index'].cast('integer'))

train_data = df_final.filter(col("anime_rank") > col("num_animes_to_mask"))
test_data = df_final.filter(col("anime_rank") <= col("num_animes_to_mask"))

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator



als = ALS(userCol='user_index', itemCol='anime_index', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True)


param_grid = ParamGridBuilder().addGrid(
    als.rank, [1, 10, 20]
).addGrid(
    als.maxIter, [15]
).addGrid(
    als.regParam, [.05, .15]
).build()
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

cv = CrossValidator(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3)

model = cv.fit(train_data)

best_model = model.bestModel
print('rank: ', best_model.rank)
print('MaxIter: ', best_model._java_obj.parent().getMaxIter())
print('RegParam: ', best_model._java_obj.parent().getRegParam())

rank:  10
MaxIter:  15
RegParam:  0.15


In [35]:
final_als = ALS(
    userCol='user_index',
    itemCol='anime_index',
    ratingCol='rating',
    coldStartStrategy='drop',
    nonnegative=True,
    rank=10,
    maxIter=20,
    regParam=0.15
)

best_model = als.fit(train_data)

In [36]:
predictions = best_model.transform(test_data)
predictions = predictions.withColumn("prediction", expr("CASE WHEN prediction < 1 THEN 1 WHEN prediction > 10 THEN 10 ELSE prediction END"))
evaluator = RegressionEvaluator(labelCol='rating', predictionCol='prediction')

metrics = {}

metric_names = ['rmse', 'mae', 'mse', 'r2']

for metric in metric_names:
    evaluator.setMetricName(metric)
    metrics[metric] = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {metrics['rmse']}")
print(f"Mean Absolute Error (MAE): {metrics['mae']}")
print(f"Mean Squared Error (MSE): {metrics['mse']}")
print(f"R² (coefficient of determination): {metrics['r2']}")

Root Mean Squared Error (RMSE): 1.3548028972766433
Mean Absolute Error (MAE): 1.0377965388827288
Mean Squared Error (MSE): 1.8354908904691867
R² (coefficient of determination): 0.2703084700615269


In [None]:
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import functions as F

def evaluate_als_recommendations(model, test_data, k=10):
    user_subset = test_data.select('user_index').distinct()
    recommendations = model.recommendForUserSubset(user_subset, k)

    predicted_rankings = recommendations.rdd.map(
        lambda row: (row.user_index, [int(x.anime_index) for x in row.recommendations])
    )

    actual_rankings = test_data.groupBy('user_index').agg(
        F.collect_list('anime_index').alias('actual_items')
    ).rdd.map(
        lambda row: (row.user_index, [int(x) for x in row.actual_items])
    )

    prediction_truth = predicted_rankings.join(actual_rankings)

    metrics = RankingMetrics(prediction_truth.map(lambda x: (x[1][0], x[1][1])))

    # Calculate various metrics
    results = {
        'MAP': metrics.meanAveragePrecision,
        'NDCG at K': metrics.ndcgAt(k),
        'Precision at K': metrics.precisionAt(k),
        'Recall at K': metrics.recallAt(k)
    }

    return results

# Use the evaluation function
metrics_results = evaluate_als_recommendations(best_model, test_data, k=10)

# Print results
for metric_name, value in metrics_results.items():
    print(f"{metric_name}: {value:.4f}")

MAP: 0.0208
NDCG at K: 0.1272
Precision at K: 0.1207
Recall at K: 0.0420


In [38]:
userRecs = best_model.recommendForAllUsers(10)

In [None]:
from pyspark.sql import functions as F

def format_recommendations_for_export(userRecs):
    flattened = userRecs.select(
        'user_index',
        F.posexplode('recommendations').alias('position', 'rec')
    ).select(
        'user_index',
        'position',
        'rec.anime_index',
        'rec.rating'
    )

    pivoted = flattened.groupBy('user_index').pivot('position').agg(
        F.first('anime_index').alias('anime_index'),
        F.first('rating').alias('predicted_rating')
    )

    for i in range(10):
        pivoted = (pivoted
            .withColumnRenamed(f'{i}_anime_index', f'recommendation_{i+1}_anime_id')
            .withColumnRenamed(f'{i}_predicted_rating', f'recommendation_{i+1}_score'))

    final_df = pivoted.orderBy('user_index')

    return final_df

formatted_recs = format_recommendations_for_export(userRecs)

formatted_recs.toPandas().to_csv('anime_recommendations.csv', index=False)

formatted_recs.coalesce(1).write.mode('overwrite').option('header', 'true').csv('anime_recommendations_spark')

formatted_recs.show(5)