In [5]:
!pip install pyspark



In [6]:
#import libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [8]:
#Spark session
spark = SparkSession.builder.appName("RecommendationSystem").getOrCreate()

# Load Data

In [9]:
movieSchema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("movie_title", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("video_release_date", StringType(), True),
    StructField("imdb_url", StringType(), True),
    StructField("unknown", IntegerType(), True),
    StructField("action", IntegerType(), True),
    StructField("adventure", IntegerType(), True),
    StructField("animation", IntegerType(), True),
    StructField("children", IntegerType(), True),
    StructField("comedy", IntegerType(), True),
    StructField("crime", IntegerType(), True),
    StructField("documentary", IntegerType(), True),
    StructField("drama", IntegerType(), True),
    StructField("fantasy", IntegerType(), True),
    StructField("film_noir", IntegerType(), True),
    StructField("horror", IntegerType(), True),
    StructField("musical", IntegerType(), True),
    StructField("mystery", IntegerType(), True),
    StructField("romance", IntegerType(), True),
    StructField("sci_fi", IntegerType(), True),
    StructField("thriller", IntegerType(), True),
    StructField("war", IntegerType(), True),
    StructField("western", IntegerType(), True),
])


movie_df = spark.read.format("csv").schema(movieSchema).option("delimiter", "|").load("u.item")

In [10]:
movie_df.show(5)

+--------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|movie_id|      movie_title|release_date|video_release_date|            imdb_url|unknown|action|adventure|animation|children|comedy|crime|documentary|drama|fantasy|film_noir|horror|musical|mystery|romance|sci_fi|thriller|war|western|
+--------+-----------------+------------+------------------+--------------------+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+-------+-------+-------+------+--------+---+-------+
|       1| Toy Story (1995)| 01-Jan-1995|              NULL|http://us.imdb.co...|      0|     0|        0|        1|       1|     1|    0|          0|    0|      0|        0|     0|      0|      0|      0|     0|       0|  0|      0|
|       2| GoldenEye (1995)| 01-Jan-1995|              NULL|http

# Select Features

In [11]:
# Make genres from separate columns into a list

from pyspark.sql.functions import when, col, array

genre_name_list = movie_df.columns[5:]
genres_df = movie_df.select("movie_id", array([col(column) for column in genre_name_list]).alias("genres"))
genres_df.show(5)

+--------+--------------------+
|movie_id|              genres|
+--------+--------------------+
|       1|[0, 0, 0, 1, 1, 1...|
|       2|[0, 1, 1, 0, 0, 0...|
|       3|[0, 0, 0, 0, 0, 0...|
|       4|[0, 1, 0, 0, 0, 1...|
|       5|[0, 0, 0, 0, 0, 0...|
+--------+--------------------+
only showing top 5 rows



In [12]:
# Make genres from 0,1 format to its names
from pyspark.sql.functions import when, col, array

genres_df = movie_df.withColumn("genres", array([when(col(col_name) == 1, col_name).otherwise(None) for col_name in genre_name_list]))
genres_df = genres_df.select("movie_id", "movie_title", "genres")

In [13]:
genres_df.show(5)

+--------+-----------------+--------------------+
|movie_id|      movie_title|              genres|
+--------+-----------------+--------------------+
|       1| Toy Story (1995)|[NULL, NULL, NULL...|
|       2| GoldenEye (1995)|[NULL, action, ad...|
|       3|Four Rooms (1995)|[NULL, NULL, NULL...|
|       4|Get Shorty (1995)|[NULL, action, NU...|
|       5|   Copycat (1995)|[NULL, NULL, NULL...|
+--------+-----------------+--------------------+
only showing top 5 rows



In [14]:
# Compute similarity scores between movies based on their genres
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [15]:
# Convert genres to a feature vector using TF-IDF
hashingTF = HashingTF(inputCol="genres", outputCol="rawFeatures", numFeatures=19)
idf = IDF(inputCol="rawFeatures", outputCol="features")
assembler = VectorAssembler(inputCols=["features"], outputCol="feature")

In [16]:
genres_df = hashingTF.transform(genres_df)
genres_df = idf.fit(genres_df).transform(genres_df)
genres_df = assembler.transform(genres_df)

In [17]:
# Compute similarity scores between movies based on their feature vectors
from pyspark.ml.feature import Normalizer

# Normalize the feature vectors
normalizer = Normalizer(inputCol="feature", outputCol="norm")
genres_df = normalizer.transform(genres_df)

In [18]:
genres_df.show(5)

+--------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|      movie_title|              genres|         rawFeatures|            features|             feature|                norm|
+--------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       1| Toy Story (1995)|[NULL, NULL, NULL...|(19,[0,4,12],[2.0...|(19,[0,4,12],[2.1...|(19,[0,12],[2.106...|(19,[0,12],[0.498...|
|       2| GoldenEye (1995)|[NULL, action, ad...|(19,[4,8,11,17],[...|(19,[4,8,11,17],[...|(19,[8,11,17],[1....|(19,[8,11,17],[0....|
|       3|Four Rooms (1995)|[NULL, NULL, NULL...|(19,[4,11],[18.0,...|(19,[4,11],[0.0,1...|(19,[11],[1.89890...|     (19,[11],[1.0])|
|       4|Get Shorty (1995)|[NULL, action, NU...|(19,[0,4,9,17],[1...|(19,[0,4,9,17],[1...|(19,[0,9,17],[1.0...|(19,[0,9,17],[0.5...|
|       5|   Copycat (1995)|[NULL, NULL, NULL...|(19,[4,9,11,1

In [19]:
genres_df.select(genres_df.features).where(genres_df.movie_id==1).cache().collect()[0][0]

SparseVector(19, {0: 2.1066, 4: 0.0, 12: 3.6671})

In [30]:
rec_movie_id = 420 # Example target movie
target_movie = genres_df.filter(f"movie_id == {rec_movie_id}").first()

In [31]:
target_movie

Row(movie_id=420, movie_title='Alice in Wonderland (1951)', genres=[None, None, None, 'animation', 'children', None, None, None, None, None, None, None, 'musical', None, None, None, None, None, None], rawFeatures=SparseVector(19, {0: 1.0, 1: 1.0, 4: 16.0, 12: 1.0}), features=SparseVector(19, {0: 1.0533, 1: 3.3853, 4: 0.0, 12: 3.6671}), feature=SparseVector(19, {0: 1.0533, 1: 3.3853, 12: 3.6671}), norm=SparseVector(19, {0: 0.2065, 1: 0.6637, 12: 0.7189}))

In [32]:
similarity_scores = genres_df.rdd.map(lambda row: (row["movie_id"], row["movie_title"],float(row["norm"].dot(target_movie["norm"])))).toDF(["movie_id", "movie_title", "similarity"])

In [39]:
similarity_scores.orderBy(["similarity"], ascending=False).show(10)

+--------+--------------------+------------------+
|movie_id|         movie_title|        similarity|
+--------+--------------------+------------------+
|     418|   Cinderella (1950)|0.9999999999999999|
|     624|Three Caballeros,...|0.9999999999999999|
|     420|Alice in Wonderla...|0.9999999999999999|
|     473|James and the Gia...|0.9999999999999999|
|     501|        Dumbo (1941)|0.9999999999999999|
|     103|All Dogs Go to He...|0.9999999999999999|
|     538|    Anastasia (1997)|0.9999999999999999|
|      99|Snow White and th...|0.9999999999999999|
|     588|Beauty and the Be...|0.9999999999999999|
|     596|Hunchback of Notr...|0.9999999999999999|
+--------+--------------------+------------------+
only showing top 10 rows



In [41]:
recs = similarity_scores.orderBy(["similarity"], ascending=False).head(5)
recs

[Row(movie_id=418, movie_title='Cinderella (1950)', similarity=0.9999999999999999),
 Row(movie_id=501, movie_title='Dumbo (1941)', similarity=0.9999999999999999),
 Row(movie_id=420, movie_title='Alice in Wonderland (1951)', similarity=0.9999999999999999),
 Row(movie_id=99, movie_title='Snow White and the Seven Dwarfs (1937)', similarity=0.9999999999999999),
 Row(movie_id=432, movie_title='Fantasia (1940)', similarity=0.9999999999999999)]

In [43]:
print("Movie: ", target_movie.movie_title)
print("Recommendations: ")
for i in recs:
  print(i.movie_title)

Movie:  Alice in Wonderland (1951)
Recommendations: 
Cinderella (1950)
Dumbo (1941)
Alice in Wonderland (1951)
Snow White and the Seven Dwarfs (1937)
Fantasia (1940)
