## Set up environment

In [1]:
import os
os.environ['SPARK_HOME'] = "C:\Spark\spark-3.5.3-bin-hadoop3"

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# set up Spark config
conf = SparkConf() \
    .setAppName("ALS") \
    .setMaster("spark://192.168.0.136:7077") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.memory", "8g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

df = spark.read.csv('dataset.csv', header=True, inferSchema=True)

## Split data & training

In [4]:
from pyspark.sql.functions import col

df = df.withColumn("User_Id", col("User_Id").cast("integer"))
df = df.withColumn("Rating", col("Rating").cast("float"))

In [5]:
(training, test) = df.randomSplit([0.8, 0.2], seed=42)

In [6]:
training = training.na.drop(subset=["Movie_Id", "User_Id"])

In [7]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="User_Id",
    itemCol="Movie_Id",
    ratingCol="Rating",
    coldStartStrategy="drop"
)

model = als.fit(training)

## Evaluate model

In [8]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)

predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")


Root-mean-square error = 0.8178553423768055


In [9]:
from pyspark.sql.functions import col, lit

# test with user 1 
user_id = 1

user_movies = (
    training.select('Movie_Id').distinct()
    .withColumn('User_Id', lit(user_id))
)

predictions = model.transform(user_movies)

rated_movies = training.filter(col('User_Id') == user_id).select('Movie_Id')
predictions = predictions.join(rated_movies, 'Movie_Id', 'left_anti')

top_recommendations = predictions.orderBy(col('prediction').desc()).limit(15)

top_10 = top_recommendations.join(training.select('Movie_Id', 'Movie_Name', 'Year').distinct(), on='Movie_Id')

top_10.select('Movie_Name', 'Year', 'prediction').show()

+--------------------+------+----------+
|          Movie_Name|  Year|prediction|
+--------------------+------+----------+
|           Two Women|2000.0|   5.02423|
|              Hamoun|1990.0|   5.02423|
|            Bob Funk|2009.0| 5.1732206|
|Peppermint Soda (...|1977.0|   5.02423|
|Class Trip, The (...|1998.0|   5.02423|
|Doggiewoggiez! Po...|2012.0| 5.1070547|
|Notebook, The (A ...|2013.0|   5.02423|
|   Myra Breckinridge|1970.0| 5.3298855|
|          Bad Ronald|1974.0|  5.478671|
|In the Year of th...|1968.0|  5.284635|
|Black Girl (La no...|1966.0|  5.000994|
|Old Lady and the ...|1997.0|  5.458916|
|Last Circus, The ...|2010.0|   5.00471|
|   Toys in the Attic|1963.0|  5.174917|
|Autobiography of ...|1974.0|  5.000994|
+--------------------+------+----------+



In [10]:
# test with user 99 
user_id = 99

user_movies = (
    training.select('Movie_Id').distinct()
    .withColumn('User_Id', lit(user_id))
)

predictions = model.transform(user_movies)

rated_movies = training.filter(col('User_Id') == user_id).select('Movie_Id')
predictions = predictions.join(rated_movies, 'Movie_Id', 'left_anti')

top_recommendations = predictions.orderBy(col('prediction').desc()).limit(15)

top_10 = top_recommendations.join(training.select('Movie_Id', 'Movie_Name', 'Year').distinct(), on='Movie_Id')

top_10.select('Movie_Name', 'Year', 'prediction').show()

+--------------------+------+----------+
|          Movie_Name|  Year|prediction|
+--------------------+------+----------+
|Year of the Hare,...|1977.0| 5.1986485|
|           Two Women|2000.0|  5.249591|
|              Hamoun|1990.0|  5.249591|
|            Bob Funk|2009.0|   5.47594|
|   Absolute Giganten|1999.0| 5.2050104|
|Peppermint Soda (...|1977.0|  5.249591|
|Class Trip, The (...|1998.0|  5.249591|
|Doggiewoggiez! Po...|2012.0| 5.2244368|
|Notebook, The (A ...|2013.0|  5.249591|
|   Myra Breckinridge|1970.0| 5.8697023|
|          Bad Ronald|1974.0|  5.752883|
|In the Year of th...|1968.0| 5.3278675|
|Black Girl (La no...|1966.0|  5.188285|
|Who Are you Polly...|1966.0|  5.217513|
|Autobiography of ...|1974.0|  5.188285|
+--------------------+------+----------+



In [11]:
from pyspark.ml.recommendation import ALSModel

# Load the model from the specified directory
loaded_model = ALSModel.load("../experiments/als_model")

# Example DataFrame for inference
new_data = spark.createDataFrame([
    (1, 10),  # (User_Id, Movie_ID)
    (1, 20),
    (2, 10)
], ["User_Id", "Movie_Id"])

# Make predictions using the loaded model
predictions = loaded_model.transform(new_data)
predictions.show()

+-------+--------+----------+
|User_Id|Movie_Id|prediction|
+-------+--------+----------+
|      1|      10|  4.148014|
|      1|      20| 3.7615376|
|      2|      10| 4.9369745|
+-------+--------+----------+



In [12]:
all_movies = df.select("Movie_Id", "Movie_Name", "Year").distinct()


In [13]:
df.show()

+-------+--------------------+------+--------------------+------+--------+
|User_Id|          Movie_Name|Rating|               Genre|  Year|Movie_Id|
+-------+--------------------+------+--------------------+------+--------+
|      1|             Jumanji|   3.5|Adventure|Childre...|1995.0|       1|
|      1|City of Lost Chil...|   3.5|Adventure|Drama|F...|1995.0|       2|
|      1|Twelve Monkeys (a...|   3.5|Mystery|Sci-Fi|Th...|1995.0|       3|
|      1|Seven (a.k.a. Se7en)|   3.5|    Mystery|Thriller|1995.0|       4|
|      1| Usual Suspects, The|   3.5|Crime|Mystery|Thr...|1995.0|       5|
|      1|Rumble in the Bro...|   3.5|Action|Adventure|...|1995.0|       6|
|      1|             Rob Roy|   4.0|Action|Drama|Roma...|1995.0|       7|
|      1|              Clerks|   4.0|              Comedy|1994.0|       8|
|      1|Interview with th...|   4.0|        Drama|Horror|1994.0|       9|
|      1|Star Wars: Episod...|   4.0|Action|Adventure|...|1977.0|      10|
|      1|Léon: The Profes

In [14]:
all_movies.show()

+--------+--------------------+------+
|Movie_Id|          Movie_Name|  Year|
+--------+--------------------+------+
|      60|                Jaws|1975.0|
|     480|   Conspiracy Theory|1997.0|
|     581|  Autumn in New York|2000.0|
|     741|    Inspector Gadget|1999.0|
|    1209|     Lethal Weapon 3|1992.0|
|    1463|            Ref, The|1994.0|
|    1661|Home for the Holi...|1995.0|
|    2202|            Fog, The|1980.0|
|    2417|      Bodyguard, The|1992.0|
|    2434|Postcards From th...|1990.0|
|    2576|Love and Death on...|1997.0|
|    2583|Things to Do in D...|1995.0|
|    2625|     American Hustle|2013.0|
|    2774|Skin I Live In, T...|2011.0|
|    3071|Andromeda Strain,...|1971.0|
|    3109|    My Friend Flicka|1943.0|
|    3185|      My Man Godfrey|1957.0|
|    3913|              Capote|2005.0|
|    4157|               Buddy|1997.0|
|    4270|             Country|1984.0|
+--------+--------------------+------+
only showing top 20 rows



## Fine-tune the model

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

cvModel = crossval.fit(training)

bestModel = cvModel.bestModel
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Best model Root-mean-square error = {rmse}")

Best model Root-mean-square error = 0.8178553423767568
