<a href="https://colab.research.google.com/github/boffett/paytm_test/blob/main/PaytmTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Following the requirements to build the solution as scalable as possible, I chose to implement it on PySpark.**

In [2]:
!pip install pyspark



Import the required PySpark libraries.

In [3]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.mllib.recommendation import ALS

**First, read the data from the following files (movies.dat, ratings.dat) and reorganize/clean the data to be used in the model.**

In [34]:
# Set the path where the movies.dat and ratings.dat files are located
path = "/content/paytm_test/"

spark = SparkSession.builder.appName("PaytmTest").getOrCreate()

Load the movie dataset

In [6]:
# Load the data and convert it into the corresponding column field format.
movie_df = spark.read.text(path + "movies.dat").select(
    split("value", "::").getItem(0).cast("integer").alias("MovieID"),
    split("value", "::").getItem(1).alias("Title"),
    split("value", "::").getItem(2).alias("Genres")
)

# Drop records with null values after importing.
movie_df = movie_df.dropna()
#movie_df.cache()
movie_df.show(3)
movie_df.count()

+-------+--------------------+--------------------+
|MovieID|               Title|              Genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



10686

Load the rating dataset

In [7]:
# Load the data and convert it into the corresponding column field format.
rating_df = spark.read.text(path + "ratings.dat").select(
    split("value", "::").getItem(0).cast("integer").alias("UserID"),
    split("value", "::").getItem(1).cast("integer").alias("MovieID"),
    split("value", "::").getItem(2).cast("float").alias("Rating"),
    split("value", "::").getItem(3).alias("Timestamp")
)

# Drop records with null values after importing.
rating_df = rating_df.dropna()

# Filter out anomalous rating records.
rating_df = rating_df.filter((rating_df.Rating >= 0) & (rating_df.Rating <= 5))

# Convert Timestamp to date.
rating_df = rating_df.withColumn("Date", to_date(from_unixtime("Timestamp")))
rating_df = rating_df.drop("Timestamp")
#rating_df.cache()
rating_df.show(3)
rating_df.count()

+------+-------+------+----------+
|UserID|MovieID|Rating|      Date|
+------+-------+------+----------+
|     1|    122|   5.0|1996-08-02|
|     1|    185|   5.0|1996-08-02|
|     1|    231|   5.0|1996-08-02|
+------+-------+------+----------+
only showing top 3 rows



10000032

**(Q1) What are the titles of top 5 most popular movies i.e. have the most ranking in the whole dataset?**

In [8]:
# From the rating data, count the number of ratings for each movie,
# and sort them in descending order based on this count.
movie_rating_count = rating_df.groupBy("MovieID").agg(
    count("*").alias("#Ratings")).orderBy("#Ratings", ascending=False)
movie_rating_count = movie_rating_count.withColumn("Rank", monotonically_increasing_id())

# Add the title of each movie for easier display of results.
popular_movies = movie_rating_count.join(movie_df, "MovieID").select(
    "Rank", "#Ratings", "Title")
#popular_movies.cache()

In [9]:
# Display the top n most popular movies with titles
top_n_movie = 5
popular_movies.show(top_n_movie)

+----+--------+--------------------+
|Rank|#Ratings|               Title|
+----+--------+--------------------+
|   0|   34864| Pulp Fiction (1994)|
|   1|   34457| Forrest Gump (1994)|
|   2|   33668|Silence of the La...|
|   3|   32631|Jurassic Park (1993)|
|   4|   31126|Shawshank Redempt...|
+----+--------+--------------------+
only showing top 5 rows



**(Q2) What are the top 5 ranked movie genres on average in the whole dataset?**

In [10]:
# First, calculate the average rating for each movie.
movie_avg_ratings = rating_df.groupBy("MovieID").agg(avg("Rating").alias('Avg_Rating'))
movie_avg_ratings.show(5)

+-------+------------------+
|MovieID|        Avg_Rating|
+-------+------------------+
|   1580| 3.563920531231442|
|   5300|3.7041884816753927|
|    471| 3.659111243662392|
|   1591| 2.591865858009276|
|   3175|3.6245300142616363|
+-------+------------------+
only showing top 5 rows



In [11]:
# Associate the average rating of the movies with their genre information.
movie_genres_rating = movie_avg_ratings.join(movie_df,"MovieID").select(
    "MovieID", "Avg_Rating", "Genres")
movie_genres_rating.show(5)

+-------+------------------+--------------------+
|MovieID|        Avg_Rating|              Genres|
+-------+------------------+--------------------+
|   1580| 3.563920531231442|Action|Comedy|Sci-Fi|
|   5300|3.7041884816753927|Action|Adventure|...|
|    471| 3.659111243662392|Comedy|Drama|Fant...|
|   1591| 2.591865858009276|Action|Adventure|...|
|   3175|3.6245300142616363|Adventure|Comedy|...|
+-------+------------------+--------------------+
only showing top 5 rows



In [13]:
# Split the multiple genres of each movie and associate the ratings with each genre label.
genre_ratings = movie_genres_rating.withColumn("Genre", explode(
    split("Genres", "\|"))).select("Genre", "Avg_Rating")
genre_ratings.show(5)

+---------+------------------+
|    Genre|        Avg_Rating|
+---------+------------------+
|   Action| 3.563920531231442|
|   Comedy| 3.563920531231442|
|   Sci-Fi| 3.563920531231442|
|   Action|3.7041884816753927|
|Adventure|3.7041884816753927|
+---------+------------------+
only showing top 5 rows



In [14]:
# Calculate the average rating for each genre,
# and sort them in descending order based on the average.
genre_avg_rating = genre_ratings.groupBy("Genre").agg(
    avg("Avg_Rating").alias('Avg_Rating'))
genre_avg_rating = genre_avg_rating.orderBy('Avg_Rating', ascending = False)
#genre_ratings.cache()
#genre_ratings.count()

In [15]:
# Display the top n ranked movie genres on average
top_n_genre = 5
genre_avg_rating.show(top_n_genre)

+------------------+------------------+
|             Genre|        Avg_Rating|
+------------------+------------------+
|         Film-Noir|3.7118715983793593|
|(no genres listed)| 3.642857142857143|
|       Documentary|3.4621763397993477|
|               War| 3.454612791239219|
|             Drama|3.3498928844848557|
+------------------+------------------+
only showing top 5 rows



**(Q3) How many movies have been ranked the most consecutive days?**

In [16]:
# First, extract the date of each movie rating from the rating data and remove duplicates.
movie_rating_date = rating_df.select(["MovieID","Date"]).drop_duplicates()
movie_rating_date.show(10)
#movie_rating_date.count()

+-------+----------+
|MovieID|      Date|
+-------+----------+
|    589|1996-08-02|
|   1212|2003-04-11|
|    208|2005-05-20|
|    288|2005-03-23|
|   1080|2005-05-12|
|   1396|2005-03-24|
|   2948|2005-05-12|
|   2997|2005-03-24|
|   3452|2005-03-24|
|   5810|2005-05-12|
+-------+----------+
only showing top 10 rows



In [17]:
# Sort the rating dates for each movie.
winspec = Window.partitionBy("MovieID").orderBy("Date")

# For each movie's rating dates, calculate the number of days difference from the previous date.
movie_rating_date = movie_rating_date.withColumn("date_diff",
    datediff("Date", lag("Date", 1).over(winspec)))

# Based on the date difference, if it is consecutive,
# mark it as the same time window; otherwise, mark it as a new time window.
movie_rating_date = movie_rating_date.withColumn("winID",
    sum(when(col("date_diff") != 1, 1).otherwise(0)).over(
        winspec.rowsBetween(Window.unboundedPreceding, 0)))
movie_rating_date.show(10)

+-------+----------+---------+-----+
|MovieID|      Date|date_diff|winID|
+-------+----------+---------+-----+
|      1|1996-01-29|     NULL|    0|
|      1|1996-02-01|        3|    1|
|      1|1996-02-02|        1|    1|
|      1|1996-02-05|        3|    2|
|      1|1996-02-12|        7|    3|
|      1|1996-02-22|       10|    4|
|      1|1996-02-23|        1|    4|
|      1|1996-02-26|        3|    5|
|      1|1996-03-04|        7|    6|
|      1|1996-03-05|        1|    6|
+-------+----------+---------+-----+
only showing top 10 rows



In [18]:
# Count the multiple time windows for each movie
# to determine the consecutive days for each time window.
movie_rated_days = movie_rating_date.groupBy("MovieID", "winID").count()

# Find the longest consecutive days for each movie.
movie_rated_days = movie_rated_days.groupBy("MovieID").agg(max("count").alias("Max_Consecutive_Days"))

# Sort by the longest consecutive days in descending order.
movie_rated_days = movie_rated_days.orderBy("Max_Consecutive_Days", ascending=False)
#movie_rated_days.cache()

In [19]:
# Display the top n most consecutive rated movies
top_n_movie = 5
movie_rated_days.show(top_n_movie)

+-------+--------------------+
|MovieID|Max_Consecutive_Days|
+-------+--------------------+
|   5952|                 622|
|   2858|                 485|
|   4993|                 483|
|    356|                 425|
|   6377|                 425|
+-------+--------------------+
only showing top 5 rows



**Second, split the data into test and training sets and create a recommender system.**

In [20]:
# Split the rating data into training and testing datasets randomly in an 8:2 ratio.
(train_df, test_df) = rating_df.randomSplit([0.8, 0.2])
train_df.show(5)
#train_df.count()

+------+-------+------+----------+
|UserID|MovieID|Rating|      Date|
+------+-------+------+----------+
|     1|    185|   5.0|1996-08-02|
|     1|    231|   5.0|1996-08-02|
|     1|    292|   5.0|1996-08-02|
|     1|    316|   5.0|1996-08-02|
|     1|    329|   5.0|1996-08-02|
+------+-------+------+----------+
only showing top 5 rows



In [21]:
# Use the ALS algorithm in PySpark to generate a recommendation model.
from pyspark.ml.recommendation import ALS
als = ALS(
    userCol="UserID",
    itemCol="MovieID",
    ratingCol="Rating",
    maxIter=10,
    regParam=0.01,
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)
model = als.fit(train_df)
model.save(path + "als.model")

**(Q4) What are the top 5 recommended movies made to one user, e.g. , UserID = 122 (any user can be selected)**

In [28]:
# Define a recommendation service function that recommends new movies
# not yet rated by the user, based on the user ID.

def recommend_movies(model, user_id, num_movies, movie_df, rating_df):
    users_df = spark.createDataFrame([(user_id,)], ["UserID"])

    # Generate a surplus number of recommended movies to filter out those that have already been rated.
    user_recs = model.recommendForUserSubset(users_df, num_movies * 3)  # 生成更多的推荐以便筛选

    # explode the recommendated movie list
    user_recs = user_recs.withColumn("rec_exp", explode("recommendations")).select('UserID', col("rec_exp.MovieID"), col("rec_exp.rating"))

    # Retrieve the user's rating history.
    user_rated_movies = rating_df.filter(col("UserID") == user_id).select("MovieID")

    # Filter out movies that the user has already rated.
    recommendations = user_recs.join(user_rated_movies, "MovieID", "left_anti")

    # Order by the predicted rating in descending order and limit to the desired number of movies
    recommendations = recommendations.join(movie_df, "MovieID").select(
        "UserID", "Title", "rating").orderBy(desc("rating")).limit(num_movies)

    return recommendations

In [29]:
# Recommend n movies for a given UserID.
user_id = 122
num_movies = 5
recommendations = recommend_movies(model, user_id, num_movies, movie_df, rating_df)
recommendations.show(truncate=False)

+------+----------------------------------------------+---------+
|UserID|Title                                         |rating   |
+------+----------------------------------------------+---------+
|122   |Mudge Boy, The (2003)                         |7.287633 |
|122   |War Is Over, The (La Guerre Est Finie) (1966) |7.134191 |
|122   |Sea Is Watching, The (Umi wa miteita) (2002)  |6.8065577|
|122   |December 7th (1943)                           |6.765809 |
|122   |Color of Pomegranates, The (Sayat Nova) (1968)|6.5478687|
+------+----------------------------------------------+---------+



**(Q5) What are the top 5 movies that are most frequently recommended by your model? (use training set)**

In [31]:
# Generate the top 5 recommended movies for each user based on the model.
user_recs = model.recommendForAllUsers(5)

# Explode the recommended movie list.
movie_recs = user_recs.withColumn("recommendations",
    explode("recommendations")).select(col("recommendations.MovieID"))
# Count the number of recommendations for each movie and sort them in descending order
movie_freq = movie_recs.groupBy("MovieID").count().orderBy(desc("count"))

In [32]:
# Display the top 5 most recommended movies
top_5_movies = movie_freq.limit(5)
top_5_movies_with_titles = top_5_movies.join(movie_df, "MovieID").select("MovieID", "Title", "count")
top_5_movies_with_titles.orderBy(desc("count")).show(truncate=False)

+-------+---------------------------------------------+-----+
|MovieID|Title                                        |count|
+-------+---------------------------------------------+-----+
|8203   |War Is Over, The (La Guerre Est Finie) (1966)|35091|
|26176  |Titicut Follies (1967)                       |24084|
|63194  |Caótica Ana (2007)                           |14748|
|32792  |Red Desert, The (Deserto rosso, Il) (1964)   |14406|
|26699  |Close-Up (Nema-ye Nazdik) (1990)             |12975|
+-------+---------------------------------------------+-----+



**(Q6) Calculate the RMSE of your model for your test set.**

In [33]:
from pyspark.ml.evaluation import RegressionEvaluator

# Predict ratings on the test set.
predict_test = model.transform(test_df)

# Calculate the RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predict_test)
print(f"RMSE on test data = {rmse}")

RMSE on test data = 0.81123898688074
