In [None]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql.functions import col, when, split, explode, array, concat_ws
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

### Load Data

In [54]:
#load data
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

movies_data = spark.read.csv("/content/Dataset/movies.csv", header=True, inferSchema=True)
ratings_data = spark.read.csv("/content/Dataset/ratings.csv", header=True, inferSchema=True)

### Preprocess Movies dataset

In [55]:
#preprocess movie data
#split genres
genre_splitted = movies_data.withColumn(
    "genre_list",
    when(col("Genres").isNotNull(), split(col("Genres"), "\\|"))
    .otherwise(array())
)

#convert array to string for StringIndexer compatibility
genre_splitted = genre_splitted.withColumn(
    "genre_list",
    concat_ws(",", col("genre_list"))  #join array elements with a comma
)

#create genre indexer pipeline
genre_indexer = StringIndexer(
    inputCol="genre_list",
    outputCol="genre_indices"
)

#one-hot encoder for genre
genre_encoder = OneHotEncoder(
    inputCol="genre_indices",
    outputCol="genre_vectors"
)

#set pipeline
pipeline = Pipeline(stages=[genre_indexer, genre_encoder])

#fit and transform the pipeline
preprocessed_movies = pipeline.fit(genre_splitted).transform(genre_splitted)

### Preprocess Ratings dataset

In [56]:
preprocessed_ratings = ratings_data.dropna()

### Split Train, Test datasets

In [57]:
train_data, test_data = preprocessed_ratings.randomSplit([0.8, 0.2], seed=42)

### Train ALS model

In [58]:
als_model = ALS(
        userCol="UserId",
        itemCol="MovieId",
        ratingCol="Rating",
        nonnegative=True,
        coldStartStrategy="drop",
        rank=10,
        maxIter=10
    )

fitted_model = als_model.fit(train_data)

### Evaluate ALS model

In [62]:
def evaluate_als_model(model, test_data):
    predictions = model.transform(test_data)
    evaluator = RegressionEvaluator(
        metricName='rmse',
        labelCol='Rating',
        predictionCol='prediction'
    )
    rmse = evaluator.evaluate(predictions)
    return rmse

In [64]:
rmse = evaluate_als_model(fitted_model, test_data)

print(f"RMSE for trained ALS model: {rmse:.4f}")

RMSE for trained ALS model: 0.8736


### Get Recommendations

In [70]:
def recommend_movies(user_id, train_data, movies_df, model, num_recomms=5):
    #create user subset
    user_subset = train_data.select("UserId").filter(col("UserId")==user_id)

    #get recommendations
    user_recomms = model.recommendForUserSubset(
        user_subset,
        num_recomms
    )
    #extract movie ids
    recommended_movie_ids = user_recomms.select(explode(user_recomms.recommendations.MovieId).alias("MovieId"))

    #combine with movie details
    recommendations = recommended_movie_ids.join(
        movies_df,
        recommended_movie_ids.MovieId == movies_df.MovieId
        ).select(
            movies_df.Title,
            movies_df.Genres
        )

    return recommendations

### Print Recemmendations for Given User

In [72]:
recomms = recommend_movies(500, train_data, preprocessed_movies, fitted_model) #set userId here
recomms.show(truncate=False)

+--------------------------------+-------------+
|Title                           |Genres       |
+--------------------------------+-------------+
|Firelight (1997)                |Drama        |
|Fall (1997)                     |Romance      |
|Bandits (1997)                  |Drama        |
|Zachariah (1971)                |Western      |
|Leather Jacket Love Story (1997)|Drama|Romance|
+--------------------------------+-------------+

