In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import isnan

# Create the SparkSession
spark = SparkSession.builder.appName("recommendation").getOrCreate()

# Load the data from CSV
data = spark.read.csv('/content/drive/MyDrive/PySpark_BigData_Movie_Recommendations/DataSet/ml-25m/ml-25m/ratings.csv', inferSchema=True, header=True)

# Check for NaN values in the 'rating' column
data.filter(isnan('rating')).show()

# Drop rows with NaN values in the 'rating' column
data = data.dropna(subset=['rating'])

# Split the data into training and test sets
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(train_data)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_data)

# Check for NaN values in the 'prediction' column in the predictions DataFrame
predictions.filter(isnan('prediction')).show()

# Filter out rows with NaN values in the 'prediction' column from the predictions DataFrame
predictions = predictions.dropna(subset=['prediction'])

# Import the necessary classes
from pyspark.ml.evaluation import RegressionEvaluator

# Create the evaluator
evaluator = RegressionEvaluator(labelCol="rating")

# Calculate the RMSE
rmse = evaluator.evaluate(predictions)

# Print the RMSE as a float value
print(f"RMSE: {rmse:.2f}")

# Show single user's ratings from the test data
single_user = test_data.filter(test_data['userId'] == 12).select(['movieId', 'userId'])
single_user.show()

# Get recommendations for the single user
recommendations = model.transform(single_user)

# Show recommendations sorted by prediction score in descending order
recommendations.orderBy('prediction', ascending=False).show()
