In [23]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
SparkSession.builder.config(conf=SparkConf())

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel


In [22]:
spark = (SparkSession.builder
            .appName("ElasticsearchSparkIntegration")
            .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-20_2.12:7.17.14,")
            .getOrCreate())

spark

In [8]:
# Convert the final_data DataFrame to a Spark DataFrame
spark_final_data = spark.read.csv("final_data.csv", inferSchema=True,header=True)
spark_final_data.show()

+-------+--------+------+---------+-----+------------+--------------------+------+
|user_id|movie_id|rating|timestamp|title|release_date|            IMDb_URL| genre|
+-------+--------+------+---------+-----+------------+--------------------+------+
|    196|     242|     3|881250949|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|     63|     242|     3|875747190|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    226|     242|     5|883888671|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    154|     242|     3|879138235|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    306|     242|     5|876503793|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    296|     242|     4|884196057|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|     34|     242|     5|888601628|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    271|     242|     4|885844495|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|    201|     242|     4|884110598|Kolya| 24-Jan-1997|http://us.imdb.co...|Comedy|
|   

In [10]:
# Split the data into training and testing sets
(training, test) = spark_final_data.randomSplit([0.8, 0.2])

In [17]:
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=20, regParam=0.05, userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

In [18]:
# Evaluate the model by computing the Root Mean Squared Error (RMSE) on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean {rmse}")

Root Mean 0.9518014699749676


In [19]:
# Generate top movie recommendations for all users
userRecs = model.recommendForAllUsers(10)

# Show the top recommendations for the first user
userRecs.select("user_id", "recommendations.movie_id").show(truncate=False)

+-------+---------------------------------------------------------+
|user_id|movie_id                                                 |
+-------+---------------------------------------------------------+
|1      |[516, 906, 169, 647, 408, 1449, 114, 718, 1463, 1062]    |
|3      |[1368, 347, 1268, 74, 787, 201, 340, 320, 1001, 1169]    |
|6      |[1463, 1021, 1142, 868, 641, 1063, 474, 57, 1129, 525]   |
|12     |[1664, 1134, 83, 113, 459, 64, 318, 173, 1612, 1167]     |
|13     |[1242, 1473, 337, 1093, 199, 479, 169, 510, 945, 1449]   |
|16     |[1473, 1268, 1463, 1169, 174, 12, 50, 64, 127, 318]      |
|20     |[1157, 989, 990, 1120, 1483, 1003, 982, 681, 1394, 613]  |
|22     |[1589, 868, 1143, 1142, 837, 906, 50, 115, 173, 172]     |
|26     |[1463, 127, 483, 98, 50, 1473, 178, 313, 1142, 480]      |
|27     |[526, 205, 180, 22, 1282, 272, 318, 113, 589, 924]       |
|28     |[1589, 50, 113, 302, 114, 516, 127, 119, 1463, 172]      |
|31     |[352, 1203, 1367, 430, 896, 793, 1288, 

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Define the ALS model
als = ALS(userCol="user_id", itemCol="movie_id", ratingCol="rating", coldStartStrategy="drop")

# Define the parameter grid
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Define the cross-validator
cross_validator = CrossValidator(estimator=als,
                                 estimatorParamMaps=param_grid,
                                 evaluator=evaluator,
                                 numFolds=5)

# Split the data into training and testing sets
(training, test) = spark_final_data.randomSplit([0.8, 0.2])

# Fit the cross-validator to the training data
cv_model = cross_validator.fit(training)

# Make predictions on the test data
predictions = cv_model.transform(test)

# Evaluate the model
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Get the best model from cross-validation
best_model = cv_model.bestModel

# Generate top movie recommendations for all users using the best model
userRecs = best_model.recommendForAllUsers(10)

# Show the top recommendations for the first user
userRecs.select("user_id", "recommendations.movie_id").show(truncate=False)


Root Mean Squared Error (RMSE): 0.9207993172976006
+-------+-----------------------------------------------------+
|user_id|movie_id                                             |
+-------+-----------------------------------------------------+
|1      |[1449, 119, 178, 113, 169, 285, 513, 114, 408, 12]   |
|3      |[320, 1368, 340, 1643, 646, 346, 127, 347, 1288, 187]|
|6      |[1449, 884, 427, 1463, 474, 493, 483, 603, 1203, 480]|
|12     |[318, 64, 1449, 272, 98, 22, 427, 963, 515, 603]     |
|13     |[814, 1063, 867, 1203, 868, 430, 909, 848, 484, 1449]|
|16     |[1463, 318, 64, 483, 174, 12, 657, 205, 127, 408]    |
|20     |[1278, 83, 313, 496, 190, 210, 300, 22, 699, 360]    |
|22     |[50, 12, 357, 172, 173, 408, 171, 174, 178, 114]     |
|26     |[127, 483, 1449, 64, 50, 100, 174, 191, 318, 199]    |
|27     |[1449, 190, 641, 223, 100, 483, 64, 1194, 169, 178]  |
|28     |[302, 172, 50, 114, 12, 174, 195, 183, 1449, 181]    |
|31     |[320, 1463, 484, 1449, 654, 641, 745, 867, 1

In [21]:
import shutil

# Specify the path where you want to save the model
model_path = "best_model"

# Delete the existing directory if it exists
shutil.rmtree(model_path, ignore_errors=True)

# Save the ALS model
model.save(model_path)

print(f"Model saved to: {model_path}")


Model saved to: best_model
