In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, regexp_replace
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType, StringType

# stop any existing Spark session, if Spark is already running, creating a new session might fail ***
try:
    spark.stop()
except Exception:
    pass

# create session with adjusted memory settings based on your cluster
# .config("spark.local.dir", r"E:\Apache Spark\spark-temp"): change the spark local dir, as the c disk memory is not enough, may cause Py4JJavaError exception ***
spark = SparkSession.builder.appName("MovieRecommender_Movielens") \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.local.dir", r"E:\Apache Spark\spark-temp") \
    .config("spark.jars", r"D:\MySQL\MySQL ConnectorJ\mysql-connector-j-8.4.0\mysql-connector-j-8.4.0\mysql-connector-j-8.4.0.jar") \
    .getOrCreate()

In [4]:
spark

### Analyze Movie CSV and Write it into the Databases

In [120]:
movies = spark.read \
    .format("csv") \
    .option("sep", ",") \
    .option("header", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "DROPMALFORMED") \
    .load("../data/MovieLens 20M Dataset/movie.csv")

In [121]:
movies = movies.withColumnRenamed('movieId', 'movie_id')

In [122]:
movies.show()

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Adventure|Animati...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|Comedy|Drama|Romance|
|       5|Father of the Bri...|              Comedy|
|       6|         Heat (1995)|Action|Crime|Thri...|
|       7|      Sabrina (1995)|      Comedy|Romance|
|       8| Tom and Huck (1995)|  Adventure|Children|
|       9| Sudden Death (1995)|              Action|
|      10|    GoldenEye (1995)|Action|Adventure|...|
|      11|American Presiden...|Comedy|Drama|Romance|
|      12|Dracula: Dead and...|       Comedy|Horror|
|      13|        Balto (1995)|Adventure|Animati...|
|      14|        Nixon (1995)|               Drama|
|      15|Cutthroat Island ...|Action|Adventure|...|
|      16|       Casino (1995)|         Crime|

### Analyze Ratings CSV

In [124]:
ratings = spark.read \
    .format("csv") \
    .option("sep", ",") \
    .option("header", "true") \
    .option("encoding", "utf-8") \
    .option("mode", "DROPMALFORMED") \
    .load("../data/MovieLens 20M Dataset/rating.csv")

In [125]:
rename_dict = {
    "userId": "user_id",
    "movieId": "movie_id"
}


for old_name, new_name in rename_dict.items():
    ratings = ratings.withColumnRenamed(old_name, new_name)

In [126]:
ratings.show()

+-------+--------+------+-------------------+
|user_id|movie_id|rating|          timestamp|
+-------+--------+------+-------------------+
|      1|       2|   3.5|2005-04-02 23:53:47|
|      1|      29|   3.5|2005-04-02 23:31:16|
|      1|      32|   3.5|2005-04-02 23:33:39|
|      1|      47|   3.5|2005-04-02 23:32:07|
|      1|      50|   3.5|2005-04-02 23:29:40|
|      1|     112|   3.5|2004-09-10 03:09:00|
|      1|     151|     4|2004-09-10 03:08:54|
|      1|     223|     4|2005-04-02 23:46:13|
|      1|     253|     4|2005-04-02 23:35:40|
|      1|     260|     4|2005-04-02 23:33:46|
|      1|     293|     4|2005-04-02 23:31:43|
|      1|     296|     4|2005-04-02 23:32:47|
|      1|     318|     4|2005-04-02 23:33:18|
|      1|     337|   3.5|2004-09-10 03:08:29|
|      1|     367|   3.5|2005-04-02 23:53:00|
|      1|     541|     4|2005-04-02 23:30:03|
|      1|     589|   3.5|2005-04-02 23:45:57|
|      1|     593|   3.5|2005-04-02 23:31:01|
|      1|     653|     3|2004-09-1

In [127]:
ratings = ratings.sample(fraction=0.5, seed=42) # remove half of the data in rating.csv

In [128]:
print(ratings.count())

9997614


In [129]:
ratings = ratings.sample(fraction=0.5, seed=42) # remove half of the data in rating.csv
ratings = ratings.sample(fraction=0.5, seed=42) # remove half of the data in rating.csv

In [130]:
print(ratings.count())

2501155


In [131]:
from pyspark.sql.functions import col  # the count is every user's rating counting

rating_count = ratings.groupBy("user_id") \
       .count() \
       .orderBy(col("count"), ascending=False)

rating_count.show()

+-------+-----+
|user_id|count|
+-------+-----+
| 118205| 1198|
|   8405|  878|
| 125794|  709|
|  74142|  691|
| 121535|  669|
|  34576|  649|
|  82418|  649|
|  83090|  640|
| 131904|  632|
|  59477|  622|
|   8963|  599|
| 130767|  575|
|  15617|  571|
|  92011|  555|
|  79159|  547|
|  46470|  546|
| 120575|  534|
|  20132|  494|
|  71975|  490|
|  18138|  489|
+-------+-----+
only showing top 20 rows



In [132]:
rating_count_nums_row = rating_count.count()
print(rating_count_nums_row)

136972


In [133]:
rating_count = rating_count.filter(rating_count['count'] > 200)

In [134]:
rating_count_nums_row = rating_count.count()
print(rating_count_nums_row)

472


In [135]:
user_ids = rating_count.select("user_id")

user_id_list = [row['user_id'] for row in user_ids.collect()]  # get the user id list

In [136]:
user_id_list

['118205',
 '8405',
 '125794',
 '74142',
 '121535',
 '34576',
 '82418',
 '83090',
 '131904',
 '59477',
 '8963',
 '130767',
 '15617',
 '92011',
 '79159',
 '46470',
 '120575',
 '20132',
 '71975',
 '18138',
 '130459',
 '18611',
 '9544',
 '92269',
 '31122',
 '125978',
 '88820',
 '91193',
 '63147',
 '111549',
 '35128',
 '54465',
 '24688',
 '114406',
 '68026',
 '51703',
 '53346',
 '41267',
 '70201',
 '105580',
 '136268',
 '14705',
 '129583',
 '7201',
 '123606',
 '61168',
 '67346',
 '107326',
 '79531',
 '91867',
 '131347',
 '27469',
 '68063',
 '119048',
 '24219',
 '26867',
 '12131',
 '86529',
 '113668',
 '62812',
 '92956',
 '107640',
 '131894',
 '76630',
 '128309',
 '50297',
 '80920',
 '33736',
 '59414',
 '131961',
 '32344',
 '51558',
 '22901',
 '80092',
 '42929',
 '43194',
 '72008',
 '4222',
 '27053',
 '116317',
 '2261',
 '128258',
 '116189',
 '97853',
 '106441',
 '133811',
 '3907',
 '75299',
 '52009',
 '52636',
 '66763',
 '31404',
 '26193',
 '117144',
 '49554',
 '52260',
 '73611',
 '119531'

In [137]:
# filters the ratings DataFrame to include only rows with user IDs that are in y—that is, 
# only ratings by users with more than 200 ratings.
ratings = ratings.filter(ratings.user_id.isin(user_id_list))

In [138]:
ratings.show()

+-------+--------+------+-------------------+
|user_id|movie_id|rating|          timestamp|
+-------+--------+------+-------------------+
|    156|       3|     2|2002-12-02 03:53:45|
|    156|       7|     4|2002-12-02 03:58:27|
|    156|      76|     4|2002-12-27 22:33:03|
|    156|      79|     3|2002-11-25 20:00:02|
|    156|      87|     2|2002-11-26 20:08:49|
|    156|      92|     3|2002-12-26 20:07:46|
|    156|     129|     4|2002-12-16 19:51:32|
|    156|     157|     2|2002-12-06 20:01:32|
|    156|     158|     3|2002-12-27 22:06:44|
|    156|     171|     3|2002-11-14 20:17:35|
|    156|     177|     3|2002-11-24 01:59:46|
|    156|     204|     3|2002-11-23 18:31:17|
|    156|     208|     3|2002-12-27 22:55:18|
|    156|     237|     3|2002-12-13 20:33:40|
|    156|     344|     3|2002-12-18 19:34:45|
|    156|     360|     3|2002-11-26 20:07:36|
|    156|     362|     4|2002-12-18 20:08:29|
|    156|     387|     3|2002-12-06 19:40:58|
|    156|     396|     3|2002-12-2

In [139]:
print(ratings.count())

135483


### Integrate Movies and Ratings Data and Analyze them

In [140]:
# join the ratings with the movies
ratings_with_movies = ratings.join(movies, on="movie_id", how="inner")

In [141]:
ratings_with_movies.show()

+--------+-------+------+-------------------+--------------------+--------------------+
|movie_id|user_id|rating|          timestamp|               title|              genres|
+--------+-------+------+-------------------+--------------------+--------------------+
|       3|    156|     2|2002-12-02 03:53:45|Grumpier Old Men ...|      Comedy|Romance|
|       7|    156|     4|2002-12-02 03:58:27|      Sabrina (1995)|      Comedy|Romance|
|      76|    156|     4|2002-12-27 22:33:03|    Screamers (1995)|Action|Sci-Fi|Thr...|
|      79|    156|     3|2002-11-25 20:00:02|   Juror, The (1996)|      Drama|Thriller|
|      87|    156|     2|2002-11-26 20:08:49|Dunston Checks In...|     Children|Comedy|
|      92|    156|     3|2002-12-26 20:07:46|  Mary Reilly (1996)|Drama|Horror|Thri...|
|     129|    156|     4|2002-12-16 19:51:32|Pie in the Sky (1...|      Comedy|Romance|
|     157|    156|     2|2002-12-06 20:01:32|Canadian Bacon (1...|          Comedy|War|
|     158|    156|     3|2002-12

In [142]:
ratings_with_movies = ratings_with_movies.dropDuplicates(['movie_id', 'user_id']) # this is quite important, each user can rate the same movie multiple times

In [143]:
# The agg() function in this code is used to apply one or more aggregate functions to a DataFrame after grouping it by a specific column (in this case, "title").
from pyspark.sql.functions import count

rating_count_every_title = ratings_with_movies.groupBy('title').agg(count('rating').alias('rating_count'))

In [144]:
rating_count_every_title.show()

+--------------------+------------+
|               title|rating_count|
+--------------------+------------+
|22 Jump Street (2...|           5|
|When We Were King...|          20|
|       Psycho (1960)|          70|
|   Annie Hall (1977)|          50|
|Men in Black (a.k...|          51|
|Odd Couple II, Th...|           8|
|In the Heat of th...|          37|
| Three Wishes (1995)|           7|
|Seven Beauties (P...|           8|
|    Elizabeth (1998)|          35|
|First Blood (Ramb...|          36|
|Heavenly Creature...|          26|
|Problem Child (1990)|          22|
|7th Voyage of Sin...|          18|
|Cry in the Dark, ...|           5|
|One False Move (1...|          17|
|       Quills (2000)|          31|
|Before Night Fall...|          12|
|O Brother, Where ...|          41|
|Don't Tell Mom th...|          22|
+--------------------+------------+
only showing top 20 rows



In [145]:
ratings_with_movies = ratings_with_movies.join(rating_count_every_title, on='title', how='inner')

In [146]:
ratings_with_movies = ratings_with_movies.filter(ratings_with_movies['rating_count'] >= 50)

In [147]:
ratings_with_movies.show()

+--------------------+--------+-------+------+-------------------+--------------------+------------+
|               title|movie_id|user_id|rating|          timestamp|              genres|rating_count|
+--------------------+--------+-------+------+-------------------+--------------------+------------+
|       Psycho (1960)|    1219|  65304|     3|1999-01-19 14:33:51|        Crime|Horror|          70|
|   Annie Hall (1977)|    1230| 114579|     3|2009-12-04 01:02:21|      Comedy|Romance|          50|
|   Annie Hall (1977)|    1230| 130622|     3|2010-11-16 08:18:55|      Comedy|Romance|          50|
|   Annie Hall (1977)|    1230|  69683|     5|2002-01-27 05:21:52|      Comedy|Romance|          50|
|   Annie Hall (1977)|    1230|  84476|   4.5|2006-01-10 12:37:57|      Comedy|Romance|          50|
|   Annie Hall (1977)|    1230|  89307|   0.5|2006-06-22 12:21:24|      Comedy|Romance|          50|
|Men in Black (a.k...|    1580| 129540|     4|2001-05-10 23:11:56|Action|Comedy|Sci-Fi|    

In [148]:
user_id_count = ratings_with_movies.groupBy('user_id').count().orderBy("count", ascending=False).show()


+-------+-----+
|user_id|count|
+-------+-----+
|  56504|   41|
|  53346|   40|
|  24688|   39|
|  68358|   39|
|  76630|   39|
|   5843|   38|
|  59873|   38|
|  77884|   37|
| 113991|   37|
|  61168|   37|
| 118754|   36|
|  94259|   36|
|  91893|   36|
|   4507|   36|
| 123606|   36|
|  54465|   36|
|  68063|   35|
|  32984|   35|
|  89307|   35|
|  51507|   35|
+-------+-----+
only showing top 20 rows



### Convert the final analyzed Datas into Pivot Table

In [149]:
spark.conf.set("spark.sql.pivotMaxValues", 20000)  # Adjust based on your data

In [150]:
# Get the pivot table of final data "ratings"
from pyspark.sql.functions import avg

# Pivot the table with user_id as columns, title as rows, and average rating as values
movie_pivot = ratings_with_movies.groupBy("title") \
                         .pivot("user_id") \
                         .agg(avg("rating")) \
                         .fillna(0)  # Optional: Replace nulls with 0


In [151]:
movie_pivot.show()

+--------------------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+------

In [153]:
from pyspark.sql.functions import monotonically_increasing_id

# Add an "index" column using monotonically_increasing_id()
movie_pivot = movie_pivot.withColumn("index", monotonically_increasing_id())

movie_pivot_indexed = movie_pivot.select("index", *[col for col in movie_pivot.columns if col != "index"]) # Reorder columns: Move "index" to the first column

In [162]:
movie_pivot_indexed.show()

+-----+--------------------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+

In [163]:
movie_pivot_indexed_collected = movie_pivot_indexed.collect()

### Convert the movie_pivot into Sparse Matrix

### Using CoordinateMatrix, MatrixEntry
there are always some problems here, maybe latter

In [None]:
# from pyspark.sql.functions import monotonically_increasing_id
# from pyspark.sql.types import StructType, StructField, LongType, IntegerType, DoubleType
# from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# # Add a row index column to the pivot table.
# movie_pivot_indexed = movie_pivot.withColumn("row_index", monotonically_increasing_id())

# movie_pivot_indexed.show()

# # Define the rating columns (all columns except 'title' and the new 'row_index')
# rating_columns = movie_pivot.columns[1:]

# # Convert each row into MatrixEntry objects.
# entries = movie_pivot_indexed.rdd.flatMap(lambda row: [
#     MatrixEntry(row.row_index, j, float(row[col]))
#     for j, col in enumerate(rating_columns)
#     if float(row[col]) != 0.0  # only include nonzero ratings
# ])

# movie_pivot_indexed.cache()

# # Explicitly convert each MatrixEntry to a tuple (i, j, value)
# entries_tuples = entries.map(lambda me: (me.i, me.j, me.value))

# # Define a schema matching MatrixEntry's fields.
# schema = StructType([
#     StructField("i", LongType(), False),
#     StructField("j", IntegerType(), False),
#     StructField("value", DoubleType(), False)
# ])

# # Create a DataFrame using the schema.
# entries_df = spark.createDataFrame(entries_tuples, schema=schema)

### Using the traditional way
Make sure you data row is lower than 250 0000, or the computer will crushed

In [155]:
# why use Spark and its pyspark.sql.DataFrame instead of directly using Pandas? That way, we wouldn't need to convert it to a Pandas DataFrame in the code below ***
import numpy as np
from scipy.sparse import csr_matrix

movie_pivot_pandas = movie_pivot_indexed.toPandas()

# remove the non-numeric 'title' column
# (Alternatively, if 'title' is the index, you could reset the index or extract it separately)
numeric_matrix = movie_pivot_pandas.drop(columns=["index", "title"]).values.astype(np.float64)

# now create the sparse matrix
movie_sparse = csr_matrix(numeric_matrix)

In [156]:
type(movie_sparse)

scipy.sparse._csr.csr_matrix

### Using KNearestNeighbors Algorithmn to achieve the Model

In [157]:
# Now import our clustering algoritm which is Nearest Neighbors this is an unsupervised ml algo
from sklearn.neighbors import NearestNeighbors
model = NearestNeighbors(algorithm= 'brute')

In [158]:
model.fit(movie_sparse)

In [160]:
movie_pivot_indexed.filter(movie_pivot_indexed.title == "Annie Hall (1977)").show()

+-----+-----------------+------+------+------+------+------+------+------+------+------+------+------+------+-----+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+-----+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+-----+------+------+------+---

In [164]:
suggested_book_index = 107
movie_pivot_indexed_collected[suggested_book_index]["title"]

'Shrek (2001)'

In [165]:
# reshape(1, -1) ensures that the row is treated as a single data point (as a 2D array with one row and multiple columns). 
# reshape(1, -1) function is used to convert this 1D array into a 2D array with one row and multiple columns (features).
# for example: [1, 2, 3, 4, 5] --> [[1, 2, 3, 4, 5]]

row = movie_pivot_indexed_collected[suggested_book_index] # this gets suggested_book_index th row from the DataFrame

row_values = np.array(row[2:]) # the first col is index, the second row is title, we don't need it

distance, suggestion = model.kneighbors(row_values.reshape(1,-1), n_neighbors=6 )


In [166]:
distance

array([[ 0.        , 33.17755265, 33.35416016, 34.46012188, 34.86402157,
        35.13545218]])

In [167]:
suggestion

array([[107, 175,  36, 200,  33, 109]])

In [168]:
for i in range(len(suggestion[0])):
    index = int(suggestion[0][i]) # convert NumPy array value to integer
    print(movie_pivot_indexed_collected[index]['title'])

Shrek (2001)
Swordfish (2001)
Batman & Robin (1997)
Dogma (1999)
Pearl Harbor (2001)
Star Wars: Episode I - The Phantom Menace (1999)
