In [5]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder,CrossValidator
import pandas as pd


# Define user rating columns :
user_rating = ['userId', 'movieId', 'rating']

spark = SparkSession.builder \
    .appName("Recommendation_Module") \
    .getOrCreate()

# Read data from JSON file into a PySpark DataFrame :
movie_ratings_spark = spark.read.json('movies.json')

# Select relevant columns from the DataFrame :
movie_ratings_spark = movie_ratings_spark.select(user_rating)

In [6]:
movie_ratings_spark.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   244|      1|     4|
|   298|      1|     5|
|   253|      1|     5|
|   305|      1|     5|
|     6|      1|     4|
|    62|      1|     2|
|   286|      1|     4|
|   200|      1|     5|
|   210|      1|     5|
|   303|      1|     5|
|   194|      1|     4|
|   291|      1|     5|
|   234|      1|     3|
|   299|      1|     3|
|   308|      1|     4|
|    95|      1|     5|
|    38|      1|     5|
|   102|      1|     3|
|    63|      1|     3|
|   160|      1|     4|
+------+-------+------+
only showing top 20 rows



In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Define the ALS model
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop")

# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Define the cross-validator
cross_validator = CrossValidator(estimator=als,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

# Split the data into training and testing sets
(training, test) = movie_ratings_spark.randomSplit([0.8, 0.2])

# Fit the cross-validator to the training data
cv_model = cross_validator.fit(training)

# Make predictions on the test data
predictions = cv_model.transform(test)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Get the best model from cross-validation
best_model = cv_model.bestModel

# Generate top movie recommendations for all users using the best model
userRecs = best_model.recommendForAllUsers(10)

# Show the top recommendations for the first user
# userRecs.select("user_id", "recommendations.movie_id").show(truncate=False)

In [7]:
# Create test and train set :
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

############################### crose valedation ############################
# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Define the cross-validator
cross_validator = CrossValidator(estimator=als,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

(training, test) = movie_ratings_spark.randomSplit([0.8, 0.2])

model = cross_validator.fit(training)

In [None]:
print(f"Root Mean Squared Error (RMSE): {rmse}")

In [None]:
# Evaluate the model by computing the Root Mean Squared Error (RMSE) on the test data :
predictions = model.transform(test)

predictions.show()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   392|    463|     3| 3.6991503|
|   897|    496|     5| 4.3330564|
|   251|    148|     2| 2.5000026|
|   251|    471|     3| 3.8907123|
|   458|    496|     3| 3.7506993|
|   883|    496|     2| 3.7914562|
|   588|    463|     4| 2.8509839|
|   588|    496|     3| 4.1895394|
|   796|    496|     5| 4.4519105|
|   101|    471|     3| 3.5822284|
|   115|    471|     2| 3.1278152|
|   385|    496|     2| 2.6508143|
|   577|    471|     3| 3.8977022|
|    44|    496|     4|  3.802884|
|   606|    833|     5|  3.122258|
|   236|    148|     4|  2.853957|
|   738|    496|     4|  4.125635|
|   663|    148|     4| 3.3692496|
|   222|    471|     3|  3.668528|
|   875|    496|     4| 3.7526584|
+------+-------+------+----------+
only showing top 20 rows

Root Mean Squared Error (RMSE): 1.0884023264591982


In [None]:
# definer user id : 
test_user_id = 1

data = {"user_id": [test_user_id]}

# Generate top movie recommendations for the specified user :
user_recommendations = model.recommendForUserSubset(spark.createDataFrame(data), 10)

# Display the recommendations for the specified user :
user_recommendations.show(truncate=False)

In [75]:
# Save the ALS model :
model_path = "./best_model"

# Save the model :
model.write().save(model_path)

### Test Load The Module :

In [78]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALSModel

# Create a Spark session
spark = SparkSession.builder.appName("ModelUsage").getOrCreate()

# Specify the path where the ALS model is saved
model_path = "./best_model"

# Load the ALS model
loaded_model = ALSModel.load(model_path)

user_id = 55
selected_user_df = movie_ratings_spark.filter(col('userId') == user_id)

selected_user_df.show()

# user_recommendations = model.recommendForUserSubset(selected_user_df, 5)
# user_recommendations.show(truncate=False)


+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|    55|      7|     3|
|    55|     22|     5|
|    55|     50|     4|
|    55|     56|     4|
|    55|     79|     5|
|    55|     89|     5|
|    55|    117|     3|
|    55|    118|     5|
|    55|    121|     3|
|    55|    144|     5|
|    55|    174|     4|
|    55|    181|     4|
|    55|    254|     2|
|    55|    257|     3|
|    55|    273|     5|
|    55|    405|     1|
|    55|    597|     2|
|    55|    678|     3|
|    55|    685|     1|
|    55|   1016|     1|
+------+-------+------+
only showing top 20 rows



## All Code :

In [21]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
import pandas as pd

# Define user rating columns :


spark = SparkSession.builder \
    .appName("Recommendation_Module") \
    .getOrCreate()

user_rating = ['userId', 'movieId', 'rating']
# Read data from JSON file into a PySpark DataFrame :
movie_ratings_spark = spark.read.json('movies.json')

# Select relevant columns from the DataFrame :
movie_ratings_spark = movie_ratings_spark.select(user_rating)

# Create test and train set :
(training, test) = movie_ratings_spark.randomSplit([0.8, 0.2])
                                             
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

model = als.fit(training)

# Evaluate the model by computing the Root Mean Squared Error (RMSE) on the test data :
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

user_id = 55
selected_user_df = movie_ratings_spark.filter(col('userId') == user_id)

user_recommendations = model.recommendForUserSubset(selected_user_df, 5)
user_recommendations.show(truncate=False)

model_path = "./best_model"

# Save the model
model.write().save(model_path)

Root Mean Squared Error (RMSE): 1.08701507564806




+------+---------------------------------------------------------------------------------------------+
|userId|recommendations                                                                              |
+------+---------------------------------------------------------------------------------------------+
|55    |[{1311, 15.909463}, {884, 14.893386}, {1245, 14.099055}, {1084, 13.582125}, {865, 12.662319}]|
+------+---------------------------------------------------------------------------------------------+



In [45]:
from pyspark.ml.recommendation import ALSModel

model_path= 'C:/Users/Youcode/Desktop/Devlepeure Data/Project_Breif/film-recommender_with_ia/ALS/best_model'

# Load the ALS model :
loaded_model = ALSModel.load(model_path)