In [0]:
from pyspark.sql.functions import expr, col
from pyspark.sql.types import IntegerType, FloatType

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Part 0: Data ETL and Data Exploration


In [0]:
# Example loading CSV files into Spark DataFrames
movies_path = "file:/xxx/movies.csv"
ratings_path = "file:/xxx/ratings.csv"
links_path = "file:/xxx/links.csv"
tags_path = "file:/xxx/tags.csv"

In [0]:
# Load CSV files into Spark DataFrames
movies_df = spark.read.load(movies_path, format='csv', header=True, inferSchema=True)
ratings_df = spark.read.load(ratings_path, format='csv', header=True, inferSchema=True)
links_df = spark.read.load(links_path, format='csv', header=True, inferSchema=True)
tags_df = spark.read.load(tags_path, format='csv', header=True, inferSchema=True)

In [0]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [0]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [0]:
links_df.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [0]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



In [0]:
# Calculate minimum number of ratings per user
min_ratings_per_user = ratings_df.groupBy("userID").count().agg({"count": "min"}).collect()[0][0]

# Calculate minimum number of ratings per movie
min_ratings_per_movie = ratings_df.groupBy("movieId").count().agg({"count": "min"}).collect()[0][0]

# Print results
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(min_ratings_per_user))
print('Minimum number of ratings per movie is {}'.format(min_ratings_per_movie))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [0]:
# Calculate number of movies rated by only one user
movies_rated_by_one_user = ratings_df.groupBy("movieId").count().filter("count = 1").count()

# Calculate total number of distinct movies
total_movies = ratings_df.select('movieId').distinct().count()

# Print results
print('{} out of {} movies are rated by only one user'.format(movies_rated_by_one_user, total_movies))

3446 out of 9724 movies are rated by only one user


# Part 1: Spark SQL and OLAP 


In [0]:
# Register DataFrames as temporary tables
movies_df.createOrReplaceTempView("movies")
ratings_df.createOrReplaceTempView("ratings")
links_df.createOrReplaceTempView("links")
tags_df.createOrReplaceTempView("tags")

In [0]:
# 1. Calculate the number of distinct users
users_amount = spark.sql("SELECT COUNT(DISTINCT userID) AS distinct_users FROM ratings")
users_amount.show()

+--------------+
|distinct_users|
+--------------+
|           610|
+--------------+



In [0]:
# 2. Calculate the number of distinct movies
movies_amount = spark.sql("SELECT count(distinct movieID) FROM movies")
movies_amount.show()

+-----------------------+
|count(DISTINCT movieID)|
+-----------------------+
|                   9742|
+-----------------------+



In [0]:
# 3. Calculate the Number of movies that are rated by users & Movies not Rated Before
rated_movies = spark.sql("SELECT count(distinct movieID) FROM ratings")
rated_movies.show()

+-----------------------+
|count(DISTINCT movieID)|
+-----------------------+
|                   9724|
+-----------------------+



In [0]:
# 4. Calculate the Number of movies that are not rated by users & Movies not Rated Before
Nrated_movies = spark.sql("""SELECT distinct title, genres  
                             FROM movies where movieID not in 
                             (SELECT distinct movieID FROM ratings)
                          """)
display(Nrated_movies)

title,genres
This Gun for Hire (1942),Crime|Film-Noir|Thriller
"Chosen, The (1981)",Drama
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
Scrooge (1970),Drama|Fantasy|Musical
"Browning Version, The (1951)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
For All Mankind (1989),Documentary
Twentieth Century (1934),Comedy
Call Northside 777 (1948),Crime|Drama|Film-Noir


In [0]:
# 5. Display all movie genres
genres_split = spark.sql("SELECT distinct explode(split(genres,'[|]')) as genres FROM movies")
genres_split.show()

+------------------+
|            genres|
+------------------+
|             Crime|
|           Romance|
|          Thriller|
|         Adventure|
|             Drama|
|               War|
|       Documentary|
|           Fantasy|
|           Mystery|
|           Musical|
|         Animation|
|         Film-Noir|
|(no genres listed)|
|              IMAX|
|            Horror|
|           Western|
|            Comedy|
|          Children|
|            Action|
|            Sci-Fi|
+------------------+



In [0]:
# 6. Calculate the number of movies for each genre
genre_count = spark.sql("""SELECT genre, sum(num) as number_of_movies
                    FROM (SELECT explode(split(genres,'[|]')) as genre, count(movieID) as num FROM movies GROUP BY 1)
                    GROUP BY 1 ORDER BY 1 ASC
                    """)
genre_count.show()

+------------------+----------------+
|             genre|number_of_movies|
+------------------+----------------+
|(no genres listed)|              34|
|            Action|            1828|
|         Adventure|            1263|
|         Animation|             611|
|          Children|             664|
|            Comedy|            3756|
|             Crime|            1199|
|       Documentary|             440|
|             Drama|            4361|
|           Fantasy|             779|
|         Film-Noir|              87|
|            Horror|             978|
|              IMAX|             158|
|           Musical|             334|
|           Mystery|             573|
|           Romance|            1596|
|            Sci-Fi|             980|
|          Thriller|            1894|
|               War|             382|
|           Western|             167|
+------------------+----------------+



# Part 2: Spark ALS based Approach for Model Training
We will use Spark ML (ALS) to predict the ratings of movies.


## 2.1 Rating data transformation 

In [0]:
movie_ratings=ratings_df.drop('timestamp')
movie_ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [0]:
# Convert columns to desired data types
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

# Show the DataFrame to verify the changes
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



## 2.2 ALS Model Selection and Fitting
With the ALS model, a grid search is used to find the optimal hyperparameters.

In [0]:
# Split data into training and test sets
(training, test) = movie_ratings.randomSplit([0.8, 0.2], seed=1234)

# Create ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
)

# Define parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.1, 0.2, 0.3]) \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.maxIter, [10, 20, 30]) \
    .build()

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build CrossValidator
cv = CrossValidator(estimator=als,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)

# Fit ALS model to training data
cvModel = cv.fit(training)

# Best model parameters
bestModel = cvModel.bestModel
print("\nBest model parameters:")
print("  - Rank:", bestModel.rank)
print("  - Max Iterations:", bestModel._java_obj.parent().getMaxIter())
print("  - Regularization Parameter:", bestModel._java_obj.parent().getRegParam())


Best model parameters:
  - Rank: 15
  - Max Iterations: 20
  - Regularization Parameter: 0.2


In [0]:
print("Cross validation RMSE of the best model: {}".format(min(cvModel.avgMetrics)))

Cross validation RMSE of the best model: 0.902109840101935


## 2.3 Model Testing and Evaluation

In [0]:
# Generate predictions using the best model from Grid Search
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = {:.2f}".format(rmse))

# Generate predictions using the best model from Grid Search
predictions.sort('userID',ascending=True).show()

Root Mean Squared Error (RMSE) on test data = 0.88
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|      3|   4.0| 3.7179055|
|     1|     50|   5.0| 4.8459826|
|     1|    362|   5.0|  3.969352|
|     1|    441|   4.0| 4.4536858|
|     1|    457|   5.0| 4.5441957|
|     1|    480|   4.0| 4.3135433|
|     1|    500|   3.0| 3.8585742|
|     1|    608|   5.0| 4.6142097|
|     1|    923|   5.0| 4.7404428|
|     1|    943|   4.0|  4.272798|
|     1|   1025|   5.0|  4.293479|
|     1|   1032|   5.0|  4.051794|
|     1|   1042|   4.0| 3.7605937|
|     1|   1049|   5.0| 3.6442964|
|     1|   1090|   4.0| 4.3878946|
|     1|   1127|   4.0| 3.9402187|
|     1|   1206|   5.0|  4.605923|
|     1|   1208|   4.0|  4.806447|
|     1|   1219|   2.0|  4.762763|
|     1|   1265|   4.0| 4.5831523|
+------+-------+------+----------+
only showing top 20 rows



# Part 3: Recommendation Systems based on Fine-tuned ALS Model

##3.1 Apply the fine-tuned model to entire dataset

In [0]:
# Apply best model to entire dataset
alldata = bestModel.transform(movie_ratings)

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Calculate RMSE on entire dataset
rmse_all = evaluator.evaluate(alldata)
print("RMSE = {:.2f}".format(rmse_all))

RMSE = 0.74


In [0]:
# Register alldata DataFrame as a temporary table
alldata.registerTempTable("alldata")

# SQL query to join movies and alldata
spark.sql(
    "Select movies.movieId,title,userId,rating,prediction From movies join alldata on movies.movieID = alldata.movieID"
).show()



+-------+--------------------+------+------+----------+
|movieId|               title|userId|rating|prediction|
+-------+--------------------+------+------+----------+
|      1|    Toy Story (1995)|     1|   4.0| 4.4776278|
|      3|Grumpier Old Men ...|     1|   4.0| 3.7179055|
|      6|         Heat (1995)|     1|   4.0|  4.398159|
|     47|Seven (a.k.a. Se7...|     1|   5.0|  4.579977|
|     50|Usual Suspects, T...|     1|   5.0| 4.8459826|
|     70|From Dusk Till Da...|     1|   3.0| 3.8381057|
|    101|Bottle Rocket (1996)|     1|   5.0| 4.5207243|
|    110|   Braveheart (1995)|     1|   4.0| 4.5791154|
|    151|      Rob Roy (1995)|     1|   5.0|  4.016674|
|    157|Canadian Bacon (1...|     1|   5.0| 3.8533337|
|    163|    Desperado (1995)|     1|   5.0| 4.0027437|
|    216|Billy Madison (1995)|     1|   5.0| 3.7846382|
|    223|       Clerks (1994)|     1|   3.0| 4.4134116|
|    231|Dumb & Dumber (Du...|     1|   5.0|  3.545307|
|    235|      Ed Wood (1994)|     1|   4.0|  4.

## Part 3.2: Recommend Movies to Users according to the userID

In [0]:
# Recommend 10 movies for each user
user_recs = bestModel.recommendForAllUsers(10)
user_recs.display()

In [0]:
# Register DataFrame as temporary table
user_recs.createOrReplaceTempView("als_recs_temp")

# SQL query to explode recommendations and select required columns
recommendation_each = spark.sql("""
    SELECT userID, t1.movieId AS MovieID, t1.rating AS rating
    FROM als_recs_temp
    LATERAL VIEW explode(recommendations) AS t1
""")

# Show the structured recommendations
recommendation_each.show()

+------+-------+---------+
|userID|MovieID|   rating|
+------+-------+---------+
|     1|   3379|6.0179787|
|     1|   5490|5.8362217|
|     1|  33649| 5.806962|
|     1| 141718|5.8035975|
|     1|  60943| 5.667082|
|     1|  59018| 5.667082|
|     1| 102217|5.6581016|
|     1|  92494|5.6581016|
|     1|  33779|5.6581016|
|     1| 131724| 5.539549|
|     2|   3379| 4.891857|
|     2| 131724|4.7216053|
|     2|  33649| 4.703524|
|     2| 141718|4.6549067|
|     2|  67618|4.6266236|
|     2|   5490|4.6231766|
|     2|  60943| 4.545721|
|     2|  59018| 4.545721|
|     2| 102217| 4.541153|
|     2|  92494| 4.541153|
+------+-------+---------+
only showing top 20 rows



In [0]:
# Register DataFrames as temporary tables

recommendation_each.registerTempTable("recommendation_each")
movies_df.registerTempTable("movies_df")

#### Example 1: For user with id 575, the top 10 movies that we will recommend using this model:

In [0]:
res_user575 = spark.sql("""
                        select userId,title
                        FROM recommendation_each t1
                        LEFT JOIN movies_df t2
                        ON t1.movieId = t2.movieId
                        WHERE t1.userId = 575
                        """)
res_user575.show()

+------+--------------------+
|userId|               title|
+------+--------------------+
|   575| Frozen River (2008)|
|   575| Visitor, The (2007)|
|   575| On the Beach (1959)|
|   575|  The Big Bus (1976)|
|   575|  Saving Face (2004)|
|   575|The Jinx: The Lif...|
|   575|Strictly Sexual (...|
|   575|Calendar Girl (1993)|
|   575|De platte jungle ...|
|   575|Blue Planet II (2...|
+------+--------------------+



#### Example 2: For user with id 232, the top 10 movies that we will recommend using this model:

In [0]:
res_user232 = spark.sql("""
                        select userId,title
                        FROM recommendation_each t1
                        LEFT JOIN movies_df t2
                        ON t1.movieId = t2.movieId
                        WHERE t1.userId = 232
                        """)
res_user232.show()

+------+--------------------+
|userId|               title|
+------+--------------------+
|   232| On the Beach (1959)|
|   232|Strictly Sexual (...|
|   232|  Saving Face (2004)|
|   232| Frozen River (2008)|
|   232| Visitor, The (2007)|
|   232|The Jinx: The Lif...|
|   232|De platte jungle ...|
|   232|Blue Planet II (2...|
|   232|Nasu: Summer in A...|
|   232|    Watermark (2014)|
+------+--------------------+



## Part 3.3: Recommend the Similar Movies according to the MovieID
Also based on ALS model results above.

In [0]:
# Construct the selectExpr dynamically for each feature
select_expr = ["id as movieId"]
for i in range(bestModel.rank):
    select_expr.append(f"features[{i}] as feature{i}")

# Select using selectExpr and create DataFrame
movie_factors = bestModel.itemFactors.selectExpr(select_expr)

# Register itemFactors DataFrame as temporary view
movie_factors.createOrReplaceTempView('movie_factors')

# Display the contents of movie_factors using Databricks display function
display(movie_factors)

#### Method 1: Euclidean Distance based Similarity

e.g. Movie A with factor [1,2,3] and movie B with factor [2,4,6].
The distance between them is sqrt(1^2+2^2+3^2).

In [0]:
def dist_similar(movieid, topn=10):
    '''
    Finds movies similar to a given movieid based on feature vectors in movie_factors.

    Parameters:
    - movieid: int, id of the movie to find similarities
    - topn: int, number of similar movies to find (default is 10)

    Returns:
    - out: pandas DataFrame, contains details of top 'topn' similar movies
    - ssd: pandas DataFrame, contains movie IDs and their sum of squared differences (SSD) from the input movie
    '''
    # Fetch feature vector of the specified movieid from movie_factors
    movie_info = movie_factors.filter(col('movieid') == movieid).toPandas()

    # Check if movieid exists in the data
    if movie_info.empty:
        print('No movie with id ' + str(movieid) + ' is found in the data.')
        return None, None

    # Construct a list to build SQL-like query for calculating squared differences (sd)
    temp = ['select movieid,']
    for i in range(bestModel.rank):
        val = movie_info.iloc[0, i+1]
        if val > 0:
            comd = 'feature' + str(i) + '-' + str(val)
        else:
            comd = 'feature' + str(i) + '+' + str(-val)

        if i < bestModel.rank - 1:
            temp.append('(' + comd + ')*(' + comd + ') as sd' + str(i) + ',')
        else:
            temp.append('(' + comd + ')*(' + comd + ') as sd' + str(i))

    temp.append('from movie_factors where movieId!=' + str(movieid))

    # Execute the constructed query and convert result to pandas DataFrame
    ssd = spark.sql(' '.join(temp)).toPandas()

    # Calculate sum of squared differences (ssd) and sort by ascending ssd values
    ssd['ssd'] = ssd.apply(lambda x: x['sd0']**2 + x['sd1']**2 + x['sd2']**2 + x['sd3']**2 +
                                    x['sd4']**2 + x['sd5']**2 + x['sd6']**2 + x['sd7']**2 +
                                    x['sd8']**2 + x['sd9']**2, axis=1)
    ssd = ssd.sort_values(by=['ssd'], ascending=[True]).head(topn)

    # Initialize DataFrame to store details of similar movies
    out = None

    # Fetch details of top 'topn' similar movies from movies_df based on movieId
    for i in ssd['movieid']:
        if not out:
            out = movies_df.where(movies_df.movieId == str(i))
        else:
            out = out.union(movies_df.where(movies_df.movieId == str(i)))

    # Convert to pandas DataFrame, reset index for readability
    out = out.toPandas()
    out.index = range(1, topn + 1)

    return out, ssd

####  Example 1: For the movie with id 390, the top 10 movies that we will recommend using this model:

In [0]:
movies_df.filter(col('movieId') == 390).show()

+-------+--------------------+------------------+
|movieId|               title|            genres|
+-------+--------------------+------------------+
|    390|Faster Pussycat! ...|Action|Crime|Drama|
+-------+--------------------+------------------+



In [0]:
res,ssd1=dist_similar(390)
res

Unnamed: 0,movieId,title,genres
1,105835,"Double, The (2013)",Comedy|Drama|Thriller
2,6935,"Revolution Will Not Be Televised, The (a.k.a. ...",Documentary
3,58191,Taxi to the Dark Side (2007),Documentary
4,5239,Rude Boy (1980),Documentary|Drama
5,45382,Down in the Valley (2005),Drama|Romance
6,6064,"Harder They Fall, The (1956)",Drama|Film-Noir
7,7620,Monster in a Box (1992),Comedy|Drama
8,5986,Fat City (1972),Drama
9,184721,First Reformed (2017),Drama|Thriller
10,183199,Quest (2017),Documentary


####  Example 2: For the movie with id 471, the top 10 movies that we will recommend using this model:

In [0]:
movies_df.filter(col('movieId') == 471).show()

+-------+--------------------+------+
|movieId|               title|genres|
+-------+--------------------+------+
|    471|Hudsucker Proxy, ...|Comedy|
+-------+--------------------+------+



In [0]:
res,ssd2=dist_similar(471)
res

Unnamed: 0,movieId,title,genres
1,2654,"Wolf Man, The (1941)",Drama|Fantasy|Horror
2,991,Michael Collins (1996),Drama
3,3418,Thelma & Louise (1991),Adventure|Crime|Drama
4,242,Farinelli: il castrato (1994),Drama|Musical
5,745,Wallace & Gromit: A Close Shave (1995),Animation|Children|Comedy
6,1097,E.T. the Extra-Terrestrial (1982),Children|Drama|Sci-Fi
7,6331,Spellbound (2002),Documentary
8,8493,Memphis Belle (1990),Action|Drama|War
9,2782,Pit and the Pendulum (1961),Horror
10,2764,"Thomas Crown Affair, The (1968)",Crime|Drama|Romance|Thriller


#### Method 2: Cosine Distance based Similarity

e.g. Movie A with factor [1,2,3] and movie B with factor [2,4,6]. The distance between them is 0. 
Because cosine similarity only considers the directions of two vectors.

In [0]:
def cos_similar(movieid_, topn=10):
    '''
    Finds movies similar to a given movieid based on cosine similarity of feature vectors in movie_factors.

    Parameters:
    - movieid_: int, id of the movie to find similarities
    - topn: int, number of similar movies to find (default is 10)

    Returns:
    - out: pandas DataFrame, contains details of top 'topn' similar movies
    - inner: pandas DataFrame, contains movie IDs and their cosine similarity (inner product) from the input movie
    '''
    # Fetch feature vector of the specified movieid from movie_factors
    movie_info = spark.sql('select * from movie_factors where movieId=' + str(movieid_)).toPandas()

    # Check if movieid exists in the data
    if movie_info.shape[0] <= 0:
        print('No movie with id ' + str(movieid_) + ' is found in the data.')
        return None, None

    # Calculate the Euclidean norm of the movie's feature vector
    norm_m = sum(movie_info.iloc[0, 1:].values**2)**0.5

    # Construct SQL-like query to calculate inner product and norm for cosine similarity
    temp = ['select movieId,']
    norm_str = ['sqrt(']
    for i in range(bestModel.rank):
        comd = 'feature' + str(i) + '*' + str(movie_info.iloc[0, i+1])
        temp.append(comd + ' as inner' + str(i) + ',')
        if i < bestModel.rank - 1:
            norm_str.append('feature' + str(i) + '*feature' + str(i) + '+')
        else:
            norm_str.append('feature' + str(i) + '*feature' + str(i))
    norm_str.append(') as norm')
    temp.append(''.join(norm_str))
    temp.append(' from movie_factors where movieId!=' + str(movieid_))

    # Execute the constructed query to calculate inner product and norm
    inner = spark.sql(' '.join(temp))
    inner = inner.selectExpr('movieId', '((inner0+inner1+inner2+inner3+inner4+inner5+inner6+inner7+inner8+inner9)/norm)/' + str(norm_m) + ' as innerP') \
                 .orderBy('innerP', ascending=False).limit(topn).toPandas()

    # Fetch details of top 'topn' similar movies from movies_df based on movieId
    out = None
    for i in inner['movieId']:
        if not out:
            out = movies_df.where(movies_df.movieId == str(i))
        else:
            out = out.union(movies_df.where(movies_df.movieId == str(i)))

    # Convert to pandas DataFrame, reset index for readability
    out = out.toPandas()
    out.index = range(1, topn + 1)

    return out, inner

#### Example 1: For the movie with id 390, the top 10 movies that we will recommend using this model:

In [0]:
movies_df.filter(col('movieId') == 390).show()

+-------+--------------------+------------------+
|movieId|               title|            genres|
+-------+--------------------+------------------+
|    390|Faster Pussycat! ...|Action|Crime|Drama|
+-------+--------------------+------------------+



In [0]:
res,inner1=cos_similar(390)
res

Unnamed: 0,movieId,title,genres
1,1979,Friday the 13th Part VI: Jason Lives (1986),Horror
2,30,Shanghai Triad (Yao a yao yao dao waipo qiao) ...,Crime|Drama
3,2829,"Muse, The (1999)",Comedy
4,8482,"Picture of Dorian Gray, The (1945)",Drama|Fantasy|Horror
5,8521,Dr. Jekyll and Mr. Hyde (1931),Drama|Horror
6,7883,I Walked with a Zombie (1943),Drama|Horror
7,487,Lightning Jack (1994),Comedy|Western
8,37240,Why We Fight (2005),Documentary
9,8121,"Seducing Doctor Lewis (Grande séduction, La) (...",Comedy
10,3770,Dreamscape (1984),Horror|Sci-Fi|Thriller


####  Example 2: For the movie with id 471, the top 10 movies that we will recommend using this model:

In [0]:
movies_df.filter(col('movieId') == 471).show()

+-------+--------------------+------+
|movieId|               title|genres|
+-------+--------------------+------+
|    471|Hudsucker Proxy, ...|Comedy|
+-------+--------------------+------+



In [0]:
res,inner2=cos_similar(471)
res

Unnamed: 0,movieId,title,genres
1,4774,Big Trouble (2002),Comedy|Crime
2,6143,Trail of the Pink Panther (1982),Comedy|Crime
3,47644,Invincible (2006),Drama
4,3436,Dying Young (1991),Drama|Romance
5,1035,"Sound of Music, The (1965)",Musical|Romance
6,3784,"Kid, The (2000)",Comedy|Fantasy
7,1760,Spice World (1997),Comedy
8,78772,"Twilight Saga: Eclipse, The (2010)",Fantasy|Romance|Thriller|IMAX
9,55052,Atonement (2007),Drama|Romance|War
10,42728,Tristan & Isolde (2006),Drama|Romance


# Part 4: Overall Summary

##4.1 Motivation:

In this project, I explored the creation of a recommendation system using data sourced from GroupLens. The rationale behind this endeavor lies in the substantial profitability such systems offer not only to movie website companies but also across various ecommerce platforms. A well-designed recommendation system can significantly enhance customer attraction by providing superior service compared to competitors. This project aimed to gain practical insights into the construction and functionality of recommendation systems, underscoring their importance in modern business strategies.

Link of the data source: (https://grouplens.org/datasets/movielens/latest/)

##4.2 Steps:


#### 1. Data ETL and Exploration

- **Data Overview**: Analyzed the GroupLens dataset to extract key insights.
- **User and Movie Statistics**: Identified {total_users} unique users and {total_movies} distinct movies.
- **OLAP Analysis**: Categorized movies into genres and analyzed their distribution.

#### 2. Spark ALS-based Model Training

- **Data Preparation**: Removed irrelevant columns and converted genres into numeric formats.
- **Data Splitting**: Partitioned data into 80% training and 20% testing sets.
- **Model Training**: Employed ALS algorithm for collaborative filtering on training data.
- **Hyper-parameter Tuning**: Optimized model parameters via grid search and 3-fold cross-validation.
- **Model Evaluation**: Assessed model performance using RMSE on the testing data.

#### 3. Model Application and Performance Evaluation

- **User Recommendations**: Generated personalized movie suggestions for specific userIds.
- **Similar Movie Identification**: Identified top similar movies based on ALS model's item features.


## 4.3 Conclusions:


#### 1. Project Summary

The ALS model employed in this project achieved optimal performance with the following parameters: maxIter=20, regParam=0.2, rank=15. The Root Mean Squared Error (RMSE) on the testing data was measured at 0.88, while on the entire dataset it was 0.74, both deemed acceptable.

#### 2. Latent Information Mining

Beyond recommendation capabilities, the ALS model effectively mined latent information from movie-related data, represented as a 10-feature matrix through matrix factorization. This latent information provides deeper insights and facilitates tasks such as measuring movie similarities.

#### 3. Similarity Measurement Methods

Two methods were explored for finding similar movies in this project: Euclidean distance and cosine distance. While both approaches have their merits, the cosine distance method is particularly recommended for movie recommendation systems. This method focuses solely on the thematic similarity ('direction') of movies, which is crucial for audience decision-making.

This project not only optimized recommendation accuracy using ALS but also demonstrated the value of latent information mining and the strategic use of cosine distance for enhancing movie recommendations.

