In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.linalg import Vectors, VectorUDT

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
import sys
import json

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
# Initialize Spark
spark = SparkSession.builder.appName("CBR System").getOrCreate()

### Load Datasets

In [4]:
movies = pd.read_csv('dataset/tmdb_5000_movies.csv')
credits = pd.read_csv('dataset/tmdb_5000_credits.csv')

In [5]:
movies.head(2)

Unnamed: 0,budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title,vote_average,vote_count
0,237000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...",http://www.avatarmovie.com/,19995,"[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...",en,Avatar,"In the 22nd century, a paraplegic Marine is di...",150.437577,"[{""name"": ""Ingenious Film Partners"", ""id"": 289...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2009-12-10,2787965087,162.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}, {""iso...",Released,Enter the World of Pandora.,Avatar,7.2,11800
1,300000000,"[{""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""...",http://disney.go.com/disneypictures/pirates/,285,"[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...",en,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...",139.082615,"[{""name"": ""Walt Disney Pictures"", ""id"": 2}, {""...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...",2007-05-19,961000000,169.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,"At the end of the world, the adventure begins.",Pirates of the Caribbean: At World's End,6.9,4500


In [6]:
credits.head(2)

Unnamed: 0,movie_id,title,cast,crew
0,19995,Avatar,"[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,285,Pirates of the Caribbean: At World's End,"[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [7]:
movies.shape

(4803, 20)

In [8]:
credits.shape

(4803, 4)

### Data Pre-Processing

In [9]:
# merge into single dataset
movies = movies.merge(credits, on='title')

In [10]:
movies.dtypes

budget                    int64
genres                   object
homepage                 object
id                        int64
keywords                 object
original_language        object
original_title           object
overview                 object
popularity              float64
production_companies     object
production_countries     object
release_date             object
revenue                   int64
runtime                 float64
spoken_languages         object
status                   object
tagline                  object
title                    object
vote_average            float64
vote_count                int64
movie_id                  int64
cast                     object
crew                     object
dtype: object

In [11]:
movies.head(2)

Unnamed: 0,budget,genres,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,...,runtime,spoken_languages,status,tagline,title,vote_average,vote_count,movie_id,cast,crew
0,237000000,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...",http://www.avatarmovie.com/,19995,"[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...",en,Avatar,"In the 22nd century, a paraplegic Marine is di...",150.437577,"[{""name"": ""Ingenious Film Partners"", ""id"": 289...",...,162.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}, {""iso...",Released,Enter the World of Pandora.,Avatar,7.2,11800,19995,"[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,300000000,"[{""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""...",http://disney.go.com/disneypictures/pirates/,285,"[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...",en,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...",139.082615,"[{""name"": ""Walt Disney Pictures"", ""id"": 2}, {""...",...,169.0,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,"At the end of the world, the adventure begins.",Pirates of the Caribbean: At World's End,6.9,4500,285,"[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [12]:
movies.shape

(4809, 23)

In [13]:
# Keeping important columns for recommendation
movies = movies[['movie_id','title','overview','genres','keywords','cast','crew']]

In [14]:
movies.head(2)

Unnamed: 0,movie_id,title,overview,genres,keywords,cast,crew
0,19995,Avatar,"In the 22nd century, a paraplegic Marine is di...","[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...","[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...","[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,285,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...","[{""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""...","[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...","[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [15]:
# Check null
movies.isnull().sum()

movie_id    0
title       0
overview    3
genres      0
keywords    0
cast        0
crew        0
dtype: int64

In [16]:
movies.dropna(inplace=True)

In [17]:
movies.isnull().sum()

movie_id    0
title       0
overview    0
genres      0
keywords    0
cast        0
crew        0
dtype: int64

In [18]:
movies.shape

(4806, 7)

In [19]:
# check duplicate
movies.duplicated().sum()

0

In [20]:
movies.iloc[0]

movie_id                                                19995
title                                                  Avatar
overview    In the 22nd century, a paraplegic Marine is di...
genres      [{"id": 28, "name": "Action"}, {"id": 12, "nam...
keywords    [{"id": 1463, "name": "culture clash"}, {"id":...
cast        [{"cast_id": 242, "character": "Jake Sully", "...
crew        [{"credit_id": "52fe48009251416c750aca23", "de...
Name: 0, dtype: object

In [21]:
movies['genres']

0       [{"id": 28, "name": "Action"}, {"id": 12, "nam...
1       [{"id": 12, "name": "Adventure"}, {"id": 14, "...
2       [{"id": 28, "name": "Action"}, {"id": 12, "nam...
3       [{"id": 28, "name": "Action"}, {"id": 80, "nam...
4       [{"id": 28, "name": "Action"}, {"id": 12, "nam...
                              ...                        
4804    [{"id": 28, "name": "Action"}, {"id": 80, "nam...
4805    [{"id": 35, "name": "Comedy"}, {"id": 10749, "...
4806    [{"id": 35, "name": "Comedy"}, {"id": 18, "nam...
4807                                                   []
4808                  [{"id": 99, "name": "Documentary"}]
Name: genres, Length: 4806, dtype: object

In [22]:
movies['genres'][0]

'[{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 878, "name": "Science Fiction"}]'

- #### parse JSON

In [23]:
def extract_names(genre_string):
    # Parse the JSON string to a list of dictionaries
    genre_list = json.loads(genre_string)
    
    # Extract the names from the list of dictionaries
    return [item['name'] for item in genre_list]

In [24]:
# handle genre
movies['genres'] = movies['genres'].apply(extract_names)

In [25]:
movies.head(2)

Unnamed: 0,movie_id,title,overview,genres,keywords,cast,crew
0,19995,Avatar,"In the 22nd century, a paraplegic Marine is di...","[Action, Adventure, Fantasy, Science Fiction]","[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...","[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,285,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...","[Adventure, Fantasy, Action]","[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...","[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [26]:
# handle keywords
movies['keywords'][0]

'[{"id": 1463, "name": "culture clash"}, {"id": 2964, "name": "future"}, {"id": 3386, "name": "space war"}, {"id": 3388, "name": "space colony"}, {"id": 3679, "name": "society"}, {"id": 3801, "name": "space travel"}, {"id": 9685, "name": "futuristic"}, {"id": 9840, "name": "romance"}, {"id": 9882, "name": "space"}, {"id": 9951, "name": "alien"}, {"id": 10148, "name": "tribe"}, {"id": 10158, "name": "alien planet"}, {"id": 10987, "name": "cgi"}, {"id": 11399, "name": "marine"}, {"id": 13065, "name": "soldier"}, {"id": 14643, "name": "battle"}, {"id": 14720, "name": "love affair"}, {"id": 165431, "name": "anti war"}, {"id": 193554, "name": "power relations"}, {"id": 206690, "name": "mind and soul"}, {"id": 209714, "name": "3d"}]'

In [27]:
movies['keywords'] = movies['keywords'].apply(extract_names)
movies.head(2)

Unnamed: 0,movie_id,title,overview,genres,keywords,cast,crew
0,19995,Avatar,"In the 22nd century, a paraplegic Marine is di...","[Action, Adventure, Fantasy, Science Fiction]","[culture clash, future, space war, space colon...","[{""cast_id"": 242, ""character"": ""Jake Sully"", ""...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,285,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...","[Adventure, Fantasy, Action]","[ocean, drug abuse, exotic island, east india ...","[{""cast_id"": 4, ""character"": ""Captain Jack Spa...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [28]:
movies['cast'] = movies['cast'].apply(extract_names)
movies.head(2)

Unnamed: 0,movie_id,title,overview,genres,keywords,cast,crew
0,19995,Avatar,"In the 22nd century, a paraplegic Marine is di...","[Action, Adventure, Fantasy, Science Fiction]","[culture clash, future, space war, space colon...","[Sam Worthington, Zoe Saldana, Sigourney Weave...","[{""credit_id"": ""52fe48009251416c750aca23"", ""de..."
1,285,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...","[Adventure, Fantasy, Action]","[ocean, drug abuse, exotic island, east india ...","[Johnny Depp, Orlando Bloom, Keira Knightley, ...","[{""credit_id"": ""52fe4232c3a36847f800b579"", ""de..."


In [29]:
def extract_director(crew_string):
    # Parse the JSON string to a list of dictionaries
    crew_list = json.loads(crew_string)
    
    # Extract the names from the list of dictionaries
    return [item['name'] for item in crew_list if item['job']=='Director']

In [30]:
movies['crew'] = movies['crew'].apply(extract_director)

In [31]:
movies.head(2)

Unnamed: 0,movie_id,title,overview,genres,keywords,cast,crew
0,19995,Avatar,"In the 22nd century, a paraplegic Marine is di...","[Action, Adventure, Fantasy, Science Fiction]","[culture clash, future, space war, space colon...","[Sam Worthington, Zoe Saldana, Sigourney Weave...",[James Cameron]
1,285,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, ha...","[Adventure, Fantasy, Action]","[ocean, drug abuse, exotic island, east india ...","[Johnny Depp, Orlando Bloom, Keira Knightley, ...",[Gore Verbinski]


In [32]:
movies.dtypes

movie_id     int64
title       object
overview    object
genres      object
keywords    object
cast        object
crew        object
dtype: object

- #### Convert pandas DataFrame to Spark DataFrame

In [33]:
# Convert pandas DataFrame to Spark DataFrame
movie_df = spark.createDataFrame(movies)

In [34]:
movie_df.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- cast: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- crew: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [35]:
movie_df.show(2)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
|movie_id|               title|            overview|              genres|            keywords|                cast|            crew|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
|   19995|              Avatar|In the 22nd centu...|[Action, Adventur...|[culture clash, f...|[Sam Worthington,...| [James Cameron]|
|     285|Pirates of the Ca...|Captain Barbossa,...|[Adventure, Fanta...|[ocean, drug abus...|[Johnny Depp, Orl...|[Gore Verbinski]|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------+
only showing top 2 rows



In [36]:
movie_df.count()

4806

## Build Recommendation System

### Plot description based Recommender
  - We will compute pairwise similarity scores for all movies based on their plot descriptions and recommend movies based on that similarity score. The plot description is given in the **overview feature** of our dataset
  - This approach is indeed based on computing the cosine similarity between the vector representations of movie overviews.

In [37]:
def create_pb_recommender(movie_df):
    # Tokenize overview
    tokenizer = Tokenizer(inputCol="overview", outputCol="words")
    
    # Remove stop words
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    
    # Convert words to term frequency vectors
    hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=20000)
    
    # Calculate IDF
    idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
    
    # Create and fit the pipeline
    pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf])
    model = pipeline.fit(movie_df)
    tfidf_df = model.transform(movie_df)
    
    return tfidf_df

In [38]:
# cosine similarity
def cosine_similarity_vectors(v1, v2):
    v1_array = v1.toArray()
    v2_array = v2.toArray()
    dot_product = float(v1_array.dot(v2_array))
    norm1 = float(np.sqrt(v1_array.dot(v1_array)))
    norm2 = float(np.sqrt(v2_array.dot(v2_array)))
    
    return float(dot_product / (norm1 * norm2)) if norm1 * norm2 != 0 else 0.0

In [39]:
def get_pb_recommendations(movie_title, feature_df, feature_col, top_n=10):
    # Get the feature vector for the input movie
    movie_features = feature_df.filter(col("title") == movie_title) \
                              .select(feature_col) \
                              .first()
    
    if not movie_features:
        return None
    
    # Create broadcast variable for movie features
    movie_vector_broadcast = spark.sparkContext.broadcast(movie_features[0])
    
    # Create UDF with broadcasted vector
    @udf(returnType=FloatType())
    def cosine_sim_with_broadcast(v1):
        return cosine_similarity_vectors(v1, movie_vector_broadcast.value)
    
    # Calculate similarities
    similarities = feature_df.withColumn(
        "similarity",
        cosine_sim_with_broadcast(col(feature_col))
    )
    
    # Get top N similar movies
    result = similarities.filter(col("title") != movie_title) \
                        .orderBy(col("similarity").desc()) \
                        .select("title", "similarity") \
                        .limit(top_n)
    
    # Clean up broadcast variable
    movie_vector_broadcast.unpersist()
    
    return result

In [40]:
# create recommender
tfidf_df = create_pb_recommender(movie_df)

In [41]:
# Plot-based recommendations
plot_recommendations = get_pb_recommendations("The Dark Knight Rises", tfidf_df, "tfidf_features")

In [42]:
plot_recommendations.show()

+---------------+-----------+
|          title| similarity|
+---------------+-----------+
|The Dark Knight| 0.22959843|
| Batman Returns| 0.19577514|
| Batman Forever| 0.19064705|
|         Batman| 0.16277784|
|         Batman| 0.16277784|
|        RoboCop|0.107336655|
| Batman & Robin| 0.10499269|
|      Slow Burn| 0.10296699|
|  Just Visiting| 0.10018901|
|  Despicable Me| 0.09012854|
+---------------+-----------+



### Metadata-based Recommendations
- A recommender based on the following metadata: **the 3 top actors**, **the director**, **related genres** and **the movie plot keywords**.

- #### Pre-process data for Metadata-based Recommendations
    - Since there are many genres, actor/actress and keyword in each row, we will choose top 3 most related related items for each column.
    - This will improve recommendation quality, reduces noise and enhances computation efficiency

In [43]:
# movie_df.show(3, truncate=False)

In [44]:
'''
convert the names and keyword instances into lowercase and strip all the spaces between them. 
This is done so that our vectorizer doesn't count the Johnny of "Johnny Depp" and "Johnny Galecki" as the same.
'''

# Define UDF for cleaning array/list data
@udf(returnType=ArrayType(StringType()))
def clean_array_data(x):
    if x is None:
        return []
    return [str(i).lower().replace(" ", "") for i in x]

# Define UDF for cleaning string data
@udf(returnType=StringType())
def clean_string_data(x):
    if x is None:
        return ''
    return str(x).lower().replace(" ", "")

# Clean the features
features = ['cast', 'keywords', 'crew', 'genres']

# Apply cleaning to each feature
for feature in features:
    # Check if the column is an array type
    if isinstance(movie_df.schema[feature].dataType, ArrayType):
        movie_df = movie_df.withColumn(feature, clean_array_data(col(feature)))
    else:
        movie_df = movie_df.withColumn(feature, clean_string_data(col(feature)))

In [45]:
# select the top 3
features = ['cast', 'keywords', 'genres']

for feature in features:
    # Using slice function is more efficient than UDF
    movie_df = movie_df.withColumn(
        feature,
        # When array size > 3, take first 3 elements, otherwise keep all
        when(size(col(feature)) > 3,
             slice(col(feature), 1, 3))
        .otherwise(col(feature))
    )

In [46]:
movie_df.show(2, truncate=False)

+--------+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+--------------------------------+---------------------------------------------+---------------+
|movie_id|title                                   |overview                                                                                                                                                                        |genres                      |keywords                        |cast                                         |crew           |
+--------+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+--------------------------------+----

- Create tags that that contains all the metadata that we want to feed to our vectorizer (namely actors, director and keywords).

In [47]:
movie_df = movie_df.withColumn(
    "tags",
    lower(
        concat_ws(
            " ",
            array_join(col("keywords"), " "),
            array_join(col("cast"), " "),
            array_join(col("crew"), " "),
            array_join(col("genres"), " ")
        )
    )
)

In [48]:
movie_df.show(n=1, truncate=False)

+--------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+--------------------------------+---------------------------------------------+--------------+------------------------------------------------------------------------------------------------------------+
|movie_id|title |overview                                                                                                                                                                       |genres                      |keywords                        |cast                                         |crew          |tags                                                                                                        |
+--------+------+-----------------------------------------------------------------------------------------------------------------------------------

- #### Perform Metadata-based Recommendations

In [49]:
@udf(returnType=FloatType())
def cosine_similarity_vectors(v1, v2):
    v1_array = v1.toArray()
    v2_array = v2.toArray()
    dot_product = float(v1_array.dot(v2_array))
    norm1 = float(np.sqrt(v1_array.dot(v1_array)))
    norm2 = float(np.sqrt(v2_array.dot(v2_array)))
    
    return float(dot_product / (norm1 * norm2)) if norm1 * norm2 != 0 else 0.

In [50]:
def create_metadata_based_recommender(movie_df):
    # Create pipeline for count vectorization
    tokenizer = Tokenizer(inputCol="tags", outputCol="words")
    countVectorizer = CountVectorizer(inputCol="words", 
                                    outputCol="features",
                                    vocabSize=20000,
                                    minDF=1.0)
    
    # Create and fit pipeline
    pipeline = Pipeline(stages=[tokenizer, countVectorizer])
    model = pipeline.fit(movie_df)
    features_df = model.transform(movie_df)
    
    return features_df, model

In [51]:
def get_recommendations(title, feature_df, n=10):
    # Get the feature vector for the input movie
    input_vector = feature_df.filter(col("title") == title).first()
    
    if not input_vector:
        return None
    
    # Create a cross join with the input vector's features
    recommendations = feature_df.crossJoin(
        broadcast(feature_df.filter(col("title") == title)
        .select("features")
        .withColumnRenamed("features", "input_features"))
    )
    
    # Calculate similarities
    recommendations = recommendations.withColumn(
        "similarity",
        cosine_similarity_vectors(col("features"), col("input_features"))
    )
    
    # Get top N recommendations
    return recommendations.filter(col("title") != title) \
                         .orderBy(col("similarity").desc()) \
                         .select("title", "similarity") \
                         .limit(n)

In [52]:
feature_df, model = create_metadata_based_recommender(movie_df)

In [53]:
meta_recommendations = get_recommendations("The Dark Knight Rises", feature_df)

In [54]:
meta_recommendations.show()

+-----------------+----------+
|            title|similarity|
+-----------------+----------+
|  The Dark Knight|       0.7|
|    Batman Begins|       0.7|
|Romeo Is Bleeding|       0.4|
|     The Prestige|       0.4|
|   Black November| 0.3354102|
|           Faster| 0.3354102|
|    Kiss of Death| 0.3354102|
|           Takers| 0.3354102|
|      Harry Brown|0.31622776|
|     Street Kings|0.31622776|
+-----------------+----------+



In [55]:
spark.stop()