In [3]:
!pip install pyspark




In [None]:
!pip uninstall pyspark -y
!pip install pyspark


In [9]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Step 1: Initialize Spark session
spark = SparkSession.builder \
    .appName("Collaborative Filtering Movie Recommendation") \
    .getOrCreate()

# Step 2: Load the MovieLens dataset with the correct file paths
ratings_path = "/content/ml-100k/ml-100k/u.data"
movies_path = "/content/ml-100k/ml-100k/u.item"

# Load ratings and movies data directly into Spark DataFrames
ratings_df = spark.read.csv(ratings_path, header=False, inferSchema=True, sep="\t")
movies_df = spark.read.csv(movies_path, header=False, inferSchema=True, sep="|")

# Step 3: Assign column names to ratings dataframe
ratings_df = ratings_df.select(
    col("_c0").alias("user"),
    col("_c1").alias("movie"),
    col("_c2").alias("rating"),
    col("_c3").alias("timestamp")
)

# Step 4: Assign column names to movies dataframe
movies_df = movies_df.select(
    col("_c0").alias("movie"),
    col("_c1").alias("title"),
    col("_c2").alias("release_date"),
    col("_c3").alias("video_release_date"),
    col("_c4").alias("imdb_url")
)

# Step 5: Prepare the data for ALS (Collaborative Filtering) model
# ALS requires 'user' and 'movie' columns to be of numeric type, so let's cast them if needed
ratings_df = ratings_df.withColumn("user", ratings_df["user"].cast("int"))
ratings_df = ratings_df.withColumn("movie", ratings_df["movie"].cast("int"))

# Step 6: Split the data into training and test sets
(training, test) = ratings_df.randomSplit([0.8, 0.2])

# Step 7: Train the model
als = ALS(userCol="user", itemCol="movie", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

# Step 8: Evaluate the model on the test data
predictions = model.transform(test)
predictions.show()

# Step 9: Show movie titles from predictions
predicted_ratings = predictions.join(movies_df, on="movie", how="inner") \
    .select("user", "title", "prediction")

predicted_ratings.show()


+----+-----+------+---------+----------+
|user|movie|rating|timestamp|prediction|
+----+-----+------+---------+----------+
| 148|    7|     5|877017054|  4.063942|
| 148|   50|     5|877016805| 4.7599473|
| 148|  127|     1|877399351| 3.8953156|
| 148|  132|     4|877020715|  3.937625|
| 148|  135|     5|877016514|  3.971504|
| 148|  169|     5|877020297| 4.7363753|
| 148|  495|     4|877016735|  3.372613|
| 148|  501|     4|877020297| 3.3081784|
| 148|  507|     5|877398587| 3.0592344|
| 148|  509|     5|877016605| 3.7819676|
| 148| 1012|     4|877400154| 3.9423168|
| 148| 1039|     2|877015784| 3.6771193|
| 463|    3|     2|877386083| 1.6301373|
| 463|   10|     1|890453075|  3.293445|
| 463|   13|     3|877385664| 2.7157714|
| 463|  100|     4|877385237| 3.9880238|
| 463|  116|     5|877385381| 3.9741912|
| 463|  137|     2|877385237| 3.6251612|
| 463|  235|     2|877385457| 1.7040255|
| 463|  253|     5|877387935| 2.3222983|
+----+-----+------+---------+----------+
only showing top

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 0.9187347842492681


In [11]:
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show()


+----+--------------------+
|user|     recommendations|
+----+--------------------+
|   1|[{1463, 5.3074927...|
|   3|[{1368, 5.2011733...|
|   5|[{50, 4.368469}, ...|
|   6|[{1463, 5.2188005...|
|   9|[{1368, 5.5233593...|
|  12|[{1450, 5.3338876...|
|  13|[{1463, 5.540495}...|
|  15|[{1278, 4.238954}...|
|  16|[{1463, 5.897149}...|
|  17|[{1524, 4.5804954...|
|  19|[{1463, 4.9666615...|
|  20|[{1192, 4.704961}...|
|  22|[{1463, 5.19968},...|
|  26|[{1463, 3.9767897...|
|  27|[{1233, 4.4484315...|
|  28|[{1368, 4.722732}...|
|  31|[{1463, 5.7239}, ...|
|  34|[{1449, 5.7307496...|
|  35|[{1450, 4.1438174...|
|  37|[{1368, 5.5148706...|
+----+--------------------+
only showing top 20 rows



In [12]:
user_id = 148
single_user = ratings_df.filter(col("user") == user_id)
recommendations = model.recommendForUserSubset(single_user, 10)
recommendations.show()


+----+--------------------+
|user|     recommendations|
+----+--------------------+
| 148|[{718, 5.0798483}...|
+----+--------------------+

