In [1]:
import pandas as pd
import pyspark
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
findspark.init()

conf = pyspark.SparkConf()
spark= SparkSession.builder.config(conf=conf).getOrCreate()

df = spark.read.csv('users_and_ratings.csv', header=True, inferSchema=True).limit(1500000)
genre_columns = df.columns[2:-2]
assembler = VectorAssembler(inputCols=genre_columns, outputCol="features")
df = assembler.transform(df)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=5024)

In [3]:

def als_algo():

    # als = ALS(
        # rank=rank,
    #     maxIter=iter,
    #     regParam=lm,
    #     userCol="user_id",
    #     itemCol="anime_id",
    #     ratingCol="rating",
    #     coldStartStrategy="drop",
    #     seed=5024
    # )

    als = ALS(
        userCol="user_id",
        itemCol="anime_id",
        ratingCol="rating",
        coldStartStrategy="drop",
        seed=5024 
    )

    paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 15, 47]) \
    .addGrid(als.maxIter, [10, 15, 19]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.2]) \
    .build()
    

    rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    mse_evaluator = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")

    crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=rmse_evaluator,
                          numFolds=5,
                            seed=5024)
    
    model = crossval.fit(train_df)
        
    bestAlsModel = model.bestModel

    print("Best rank: ", bestAlsModel._java_obj.parent().getRank())
    print("Best maxIter: ", bestAlsModel._java_obj.parent().getMaxIter())
    print("Best regParam: ", bestAlsModel._java_obj.parent().getRegParam())
        
    
    predictions = bestAlsModel.transform(test_df)
    user_recommendations = bestAlsModel.recommendForAllUsers(10)
    
    rmse = rmse_evaluator.evaluate(predictions)
    mse = mse_evaluator.evaluate(predictions)
    
    print(f"Root Mean Squared Error (RMSE) = {rmse}\nMean Squarred Error (MSE) = {mse}")
    
    predictions.select("user_id", "name", "rating", "prediction").show(10)
    
    user_recommendations.show(10,truncate=False)

als_algo()
# als_algo(47,19,0.1)

Best rank:  15
Best maxIter:  19
Best regParam:  0.2
Root Mean Squared Error (RMSE) = 1.0853725510422239
Mean Squarred Error (MSE) = 1.1780335745559047
+-------+------------+------+----------+
|user_id|        name|rating|prediction|
+-------+------------+------+----------+
|    833|Cowboy Bebop|    10|  9.070294|
|   4818|Cowboy Bebop|     7|  9.002375|
|   6654|Cowboy Bebop|     8|  8.488494|
|  16339|Cowboy Bebop|     8|  9.365178|
|  24171|Cowboy Bebop|    10|  8.584796|
|  28146|Cowboy Bebop|     7|  9.133361|
|  29285|Cowboy Bebop|     9| 8.6937685|
|  29993|Cowboy Bebop|     9|  8.384263|
|  42635|Cowboy Bebop|     8| 7.5446134|
|  51393|Cowboy Bebop|    10|  8.908477|
+-------+------------+------+----------+
only showing top 10 rows

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                      