# Content-Based Recommender System

A recommender system is an algorithm or model that takes in information about a user and suggests an item—new to them—that is likely to be of interest. There are several approaches to building such a system, and this notebook will focus on **content-based methods**. Let's begin by starting the PySpark session:


In [2]:
# Load libraries
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import helper_functions as hf
import pyspark.sql.functions as f
from pyspark.sql.types import FloatType, StringType, ArrayType

# Initialize Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CBRS").getOrCreate()
spark

In [3]:
# Movies Metadata (Load Dataset)
df = spark.read.parquet('data/cleaned/movies2/')
display(df.limit(3).toPandas())

Unnamed: 0,adult,genres,id,original_language,overview,popularity,production_companies,production_countries,release_date,runtime,...,status,tagline,title,video,vote_average,vote_count,n_genres,n_production_companies,n_production_countries,n_spoken_languages
0,False,"[Drama, Music]",277216,en,"In 1987, five young men, using brutally honest...",21.183077,"[New Line Cinema, Universal Pictures, Legendar...",[United States of America],2015-08-13,147.0,...,Released,The Story of N.W.A.,Straight Outta Compton,False,7.7,1381,2,6,1,1
1,False,[Drama],167284,en,This movie portrays three women living in toda...,0.153287,[],[],2004-01-01,113.0,...,Released,,Viva Algeria,False,7.2,3,1,0,0,0
2,False,"[Crime, Music, Romance]",298078,en,A merciless hit man rescues a prostitute from ...,0.782841,[],[Philippines],2014-10-24,73.0,...,Released,"When violence is the only life you know, will ...",Ruined Heart: Another Love Story Between A Cri...,False,6.7,7,3,0,1,2



## Introduction



Content-Based Recommender Systems use the metadata of the items and users to give recommendations. In this notebook, the system will recommend movies to watch, so some examples of movie metadata could be movie genres, the name of the director, the production company, popularity, etc. At the same time, users metadata could be use, which include movies previously watched, age and the country they are from.

The main advantage of this approach is that in Cold-Start Scenarios—i.e. when there is still not enough user interaction—the system is still capable of giving sensible recommendations. This is due to the fact that metadata is already known—at least for the movies. However, this method does not compute similarity between the users, so the ones with a higher correlation cannot be given a stronger impact in the decision.

For the approaches to build a system like this, the options vary depending on the metadata to use or the computation power available.



## Jaccard and Cosine Similarities
> A simple approach and valid when the metadata can be treated as tags, which is common.





These two are popular similarity metrics that are useful when comparing metadata of various items. For both of them, the first step is to make a matrix $\underline{\underline{M}}$ of pairs (movie, tags), where the elements $m_{ij}$ are booleans indicating if the movie *i* has the tag *j*. For example, let's say that there is a DataFrame with two columns, the first the name of the film and the second one of its genres:

<center>

| **Movie** | **Genre** |
|:---------:|:---------:|
|     A     |     x     |
|     A     |     y     |
|     A     |     z     |
|     B     |     x     |
|     B     |     g     |

</center>

To transform it into the $\underline{\underline{M}}$ matrix, the movies will have to turn into rows and the genres into columns, like such:

<center>

| **Movie** \\ **Genre** | **x** | **y** | **z** | **g** |
|------------------------|-------|-------|-------|-------|
| **A**                  | 1     | 1     | 1     | 0     |
| **B**                  | 1     | 0     | 0     | 1     |

</center>

Consequently, the elements turn into indicators that show whether the corresponding movie falls under the listed genres. Finally, this allows the movies to be represented as vector—of the space of genres:

$$\underline{A} = (1,1,1,0) \;\text{ and }\; \underline{B} = (1,0,0,1)$$

Now that the movies are expressed as vector, the similarity measure is quite straightforward, as show the following equations:

+ **Jaccard Similarity**:   $J(\underline{A}, \underline{B}) = \frac{\underline{A}\cap\underline{B}}{\underline{A}\cup\underline{B}}$

+ **Cosine Similarity**:    $Sim(\underline{B},\underline{B}) = \frac{\underline{A}\cdot\underline{B}}{|\underline{A}||\underline{B}|}$

Coming back to the previous example, the results would be as follows:
+ **Jaccard Similarity**:   $J(\underline{A}, \underline{B}) = \frac{(1,1,1,0)\cap(1,0,0,1)}{(1,1,1,0)\cup(1,0,0,1)} = \frac{1+0+0+1}{1+1+1+1} = \frac{1}{2}$

+ **Cosine Similarity**:    $Sim(\underline{B},\underline{B}) = \frac{(1,1,1,0)\cdot(1,0,0,1)}{|(1,1,1,0)||(1,0,0,1)|} = \frac{1+0+0+1}{|\sqrt{1^2+1^2+1^2}||\sqrt{1^2+1^2}|} = \sqrt{\frac{2}{3}}$


### Coding Jaccard and Cosine Similarities

Let's do a simple recommender system by movie genre:

In [4]:
# Explode genre list into different columns
genres = df.select('genres', 'title')
genres = df.select(df.title, f.explode(df.genres).alias('genre'))
display(genres.limit(3).toPandas())

Unnamed: 0,title,genre
0,Straight Outta Compton,Drama
1,Straight Outta Compton,Music
2,Viva Algeria,Drama


In [5]:
# Create a columns of 1 to indicate a movies has a specific genre
genres = genres.withColumn('value', f.lit(1))
display(genres.limit(3).toPandas())

Unnamed: 0,title,genre,value
0,Straight Outta Compton,Drama,1
1,Straight Outta Compton,Music,1
2,Viva Algeria,Drama,1


In [6]:
# Set movies as rows (groupby) and genres as columns (pivot)
# sum() makes sure it is one on the (title, genre) pairs indicated above
genres = genres.groupby('title').pivot('genre').sum('value')
display(genres.limit(3).toPandas())

Unnamed: 0,title,Action,Adventure,Animation,Comedy,Crime,Documentary,Drama,Family,Fantasy,...,History,Horror,Music,Mystery,Romance,Science Fiction,TV Movie,Thriller,War,Western
0,We Are Many,,,,,,1.0,,,,...,,,,,,,,,,
1,All The Days Before Tomorrow,,,,1.0,,,1.0,,,...,,,,,1.0,,,,,
2,Snowman's Land,,,,1.0,1.0,,,,,...,,,,,,,,1.0,,


In [7]:
# Finally, set the NULLs as 0, which indicate that the movie hasn't that genre
genres = genres.fillna(0)
display(genres.limit(3).toPandas())

Unnamed: 0,title,Action,Adventure,Animation,Comedy,Crime,Documentary,Drama,Family,Fantasy,...,History,Horror,Music,Mystery,Romance,Science Fiction,TV Movie,Thriller,War,Western
0,We Are Many,0,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,All The Days Before Tomorrow,0,0,0,1,0,0,1,0,0,...,0,0,0,0,1,0,0,0,0,0
2,Snowman's Land,0,0,0,1,1,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0


This DataFrame stores the movie-genre relationships, which are required to calculate the similarities. In this scenario, it is a good idea to preemptively calculate similarities and cache them, so the recommendations can happen in real time. To do this, the first step is to turn the movies into vectors, which is an easy task considering the rows of the movie-genre DataFrame is a vector representation of the movies in the genre space. However, to save up memory, it is wise to use dense vectors, because most of the values are zeros:

In [8]:
from pyspark.ml.feature import VectorAssembler
# Train vector assembler on the dataframe
vector_assembler = VectorAssembler(inputCols=genres.columns[1:],
                                   outputCol='features')

# Create the column features with the Dense Vector representation
movie_features = vector_assembler.transform(genres).select('title', 'features')
movie_features.show(3, truncate=False)


+----------------------------+---------------------------+
|title                       |features                   |
+----------------------------+---------------------------+
|We Are Many                 |(20,[5],[1.0])             |
|All The Days Before Tomorrow|(20,[3,6,14],[1.0,1.0,1.0])|
|Snowman's Land              |(20,[3,4,17],[1.0,1.0,1.0])|
+----------------------------+---------------------------+
only showing top 3 rows



Although in pandas, dense vectors show like usual vectors:

In [9]:
display(movie_features.limit(3).toPandas())

Unnamed: 0,title,features
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
1,All The Days Before Tomorrow,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
2,Snowman's Land,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."


Now, to calculate the similarities for each pair of movies, let's join the resulting DataFrame with itself. This will result into a DataFrame whose rows will contain two movies—thus, two dense vectors. However, there are two things that must be guaranteed:

+ For every row, the two movies cannot be the same. This would lead to the calculation of the similarity between them, which will not do any good to the recommender system.

+ The pair (A,B) yields the same similarity than the pair (B,A), so there is no need to calculate them both.

In [10]:
# Cross Join: Cartesian Product of the tables
df_cross = movie_features.alias('left').crossJoin(movie_features.alias('right'))
display(df_cross.limit(3).toPandas())

Unnamed: 0,title,features,title.1,features.1
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
1,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",All The Days Before Tomorrow,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
2,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",Snowman's Land,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."


To filter out the rows with the same movie twice and the repeated pairs, Python allows the use of '<' to compare text. For example:

+ 'The Hunger Games' < 'Braveheart' = False.

+ 'Braveheart' < 'The Hunger Games' = True.

Because 'B' comes before 'T', due to the alphabetical order.

In [11]:
df_cross = df_cross.filter(f.col('left.title') < f.col('right.title'))
display(df_cross.limit(3).toPandas())

Unnamed: 0,title,features,title.1,features.1
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",What No One Knows,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
1,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",What Have They Done to Your Daughters?,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",We are the tide,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."


Finally, the similarities will be stored in a new column and calculated via User Defined Functions (udf):

In [12]:
# --- Jaccard Similarity ---
def jaccard_similarity(v1, v2):
    # Formula:
    #   J = Intersection(A,B) / Union(A,B)
    A_and_B = sum(1 for (a,b) in zip(v1,v2) if a==1 and a==b) # Intersection
    A_or_B = sum(1 for (a,b) in zip(v1,v2) if a==1 or b==1) # Union
    return float(A_and_B) / A_or_B if A_or_B != 0 else 0.0 # Similarity

# --- Cosine Similarity ---
def cosine_similarity(v1,v2):
    # Formula:
    #   Sim = A·B / |A||B|

    # Numerator: scalar product
    num = sum(c1*c2 for (c1,c2) in zip(v1,v2))
    
    # Denominator: modules
    mod_a = np.sqrt(sum(c1**2 for c1 in v1))
    mod_b = np.sqrt(sum(c2**2 for c2 in v2))
    den = mod_a * mod_b

    # Similarity
    return float(num) / float(den) if den != 0.0 else 0.0

# --- User Defined Functions ---
jaccard_udf = f.udf(lambda v1,v2: jaccard_similarity(v1,v2), FloatType())
cosine_udf = f.udf(lambda v1,v2: cosine_similarity(v1,v2), FloatType())

# Apply udf to the pairs of movies in every row
df_similarities = df_cross.\
    withColumn('jaccard', jaccard_udf(f.col('left.features'), f.col('right.features'))).\
    withColumn('cosine', cosine_udf(f.col('left.features'), f.col('right.features'))).\
    select('left.title', 'right.title', 'jaccard', 'cosine')

display(df_similarities.limit(10).toPandas())

Unnamed: 0,title,title.1,jaccard,cosine
0,We Are Many,What No One Knows,0.0,0.0
1,We Are Many,What Have They Done to Your Daughters?,0.0,0.0
2,We Are Many,We are the tide,0.0,0.0
3,We Are Many,Window Water Baby Moving,1.0,1.0
4,We Are Many,Wood Job!,0.0,0.0
5,We Are Many,Zodiac: The Race Begins...,0.0,0.0
6,We Are Many,Убить дракона,0.0,0.0
7,We Are Many,Women of the Night,0.0,0.0
8,We Are Many,Wise Guys,0.0,0.0
9,We Are Many,You Wont Miss Me,0.0,0.0


## TF-IDF
>An appropriate approach when the metadata is text-based, which could enclose additional information not represented in tags. 


Let's say there is term *t* in a document *d*, which is part of a collection of documents—or corpus—called *D*. Then the *Term Frequency* or $TF(t,d)$ is defined as the number of times a term *t* appears in a document *d*. This can be used on its own as a way to vectorize documents, but it is very easy to **over-emphasize terms** that appear often yet **carry very little information**, such as the terms 'a', 'the' and 'of'. To counter this issue, every term should be weighted by its 'importance' or its ability to uniquely identify a specific document. 

To obtain a measure of the importance of every term, firstly it is necessary to define the *Document Frequency* or $DF(t,D)$, which is the number of documents in the corpus *D* that contain the term *t*. According to this, the higher the value of $DF(t,D)$, the lower the importance of *t*, because it doesn't **point to a specific document**. Consequently, a low value in $DF(t,D)$ would indicate that the term *t* carries special information about a particular document, thus the interest is in minimizing this value or alternatively in **maximizing the inverse** of $DF(t,D)$, which can be defined as:

$$IDF(t,D) = \log\left( \frac{|D| + 1}{DF(t,D) + 1} \right),$$

where: 

+ $|D|$ is the total number of documents in the corpus *D*.

+ The $+1$ in the denominator is a smoothing factor that avoids dividing by zero, which happens when the term *t* does not appear in any document *D*.

+ The $+1$ in the numerator is necessary to avoid computing $\log(0)$, which can happen if there is no document in the corpus *D*.

Note that this equation succeeds in representing the importance of a term *t*, because when a term *t* appears in all documents of *D*, then $|D| = DF(t,D)$, thus $IDF(t,D) = \log(1) = 0$, whereas the lower $DF(t,D)$ is, the greater the value of $IDF(t,D)$.

Finally, the *TF-IDF* measure is calculated following:

$$TFIDF(t,d,D) = TF(t,d)\, IDF(t,D).$$


#### Code application



In code, the process is similar, if done in a simplistic manner:

1. **Tokenization of text:** split sentences into individual words and stored in lists.

2. **Compute TF:** using *CountVectorizer* or *HashingTF*—more about them later—, calculate the term frequency for every word and store the resulting list in a new column.

3. **Compute IDF:** take the column where the TF is stored—i.e. previous output—and apply IDF to *scale the values* according to term importance.

4. **Computing Similarity:** with the vectorize version of the the documents via TF-IDF, similarity can be calculated like in the previous section.

However, it is possible to perform additional actions to **improve the quality of the documents**, leading to better results in a more efficient way:

1. **Text preprocessing:** cleaning the documents. This includes converting to lowercase, removing punctuation and special characters—so that for example, 'word' and 'word.' are treated equally—, and removing numeric values.

2. **Tokenization.**

3. **Removing stopwords:** common words, such as 'a', 'the' and 'of', will receive a low IDF score in most cases, so removing them just accelerates the process.

4. **Optional, but costly options:**

    + **Stemming:** reduces words to their root. E.g. 'running' turns into 'run'.

    + **Lemmatization:** finds the dictionary form of words. E.g. 'better' turns into 'good'.

5. **Computing TF.**

6. **Computing IDF.**

7. **Computing Similarity.**


#### About CountVectorizer and HashingTF



Both *CountVetorizer* and *HashingTF* can be used to generate the $TF(t,d)$ values, thus vectorizing the document based on its terms. However, they are fundamentally different approaches, starting with their definitions:

+ **CountVectorizer:** It is a feature extraction method that converts text documents into a sparse matrix of term counts. To do this, firstly it *builds a vocabulary* of all unique terms in the corpus *D* and then counts occurrences of each term in every document. This approach is useful when it is necessary to *keep track of words* and their frequencies.

+ **HashingTF:** It is a transformation that *maps terms* into a fixed-length feature vector *using a hashing function*. Instead of building a vocabulary, it applies a hash function to each term and assigns it to a predefined number of features, also called 'buckets'. This allows for *efficient computation* in exchange of interpretability, since different terms might hash to the same index (*hash collision*).

Moreover, let's discuss some specific differences between the two:

<table border="1">
    <tr>
        <th>Feature</th>
        <th>CountVectorizer</th>
        <th>HashingTF</th>
    </tr>
    <tr>
        <td><b>Interpretability</b></td>
        <td>Keeps the actual words in the vocabulary, making it easy to understand which words contribute to a document.</td>
        <td>The words are hashed, so we lose the ability to interpret which word corresponds to which feature.</td>
    </tr>
    <tr>
        <td><b>Memory Usage</b></td>
        <td>Requires storing the vocabulary, which can be large for big datasets.</td>
        <td>Uses a fixed-length vector, reducing memory requirements since it does not store the vocabulary.</td>
    </tr>
    <tr>
        <td><b>Computational Overhead</b></td>
        <td>Requires an extra step to scan the dataset and build the vocabulary before transforming text into vectors.</td>
        <td>Directly transforms text using a hashing function, i.e. it can transform the documents in one pass.</td>
    </tr>
    <tr>
        <td><b>Handling of New Words</b></td>
        <td>If a new word appears that was not in the training data, it will be ignored (unless vocabulary expansion is allowed).</td>
        <td>Can handle new words dynamically without needing retraining.</td>
    </tr>
    <tr>
        <td><b>Risk of Collisions</b></td>
        <td>No risk; each word has a unique index.</td>
        <td>Has a small chance of hash collisions where different words map to the same index, potentially reducing accuracy.</td>
    </tr>
    <tr>
        <td><b>Scalability</b></td>
        <td>May struggle with very large datasets due to vocabulary size constraints.</td>
        <td>More scalable since it does not require storing or updating a vocabulary.</td>
    </tr>
</table>


In the end, it is possible to summarize this information focusing on their use cases:

+ **CountVectorizer:** recommended if *interpretability* is important and there is enough memory to store the vocabulary.

+ **HashingTF:** more optimized for larger datasets, especially when *memory efficiency* is required, and when *new words* are expected frequently.

### Coding TF-IDF


From the dataset, only the *movie_id*, *title* and *overview* will be chosen. The terms in the overviews will yield the TF-IDF vectors.

In [13]:
overview = df.select('id', 'title', 'overview').orderBy('id').na.drop(subset=["overview"])
display(overview.limit(3).toPandas())

Unnamed: 0,id,title,overview
0,2,Ariel,Taisto Kasurinen is a Finnish coal miner whose...
1,3,Shadows in Paradise,"An episode in the life of Nikander, a garbage ..."
2,5,Four Rooms,It's Ted the Bellhop's first night on the job....


Now, let's focus on transforming the data to make the process more efficient and vectorize the documents. As it was established before, this should include: preprocessing, tokenization, removing stopwords and computing both TF and IDF. *PySpark* can manage most of these, but for the preprocessing is required a specific behavior, thus the best approach would be to implement a custom stage for the pipeline.

To create a custom stage for a pipeline one option is to create an user defined function (udf), but the preprocessing stage is slightly complex, as it must convert the text to lowercase and remove any punctuation and special characters. For this reason, a more elegant solution is to use **custom Transformers**.

In [14]:
from pyspark.ml import Transformer
from pyspark.sql import DataFrame
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import re

class TextPreprocessing(Transformer, HasInputCol, HasOutputCol):
    '''
    A custom Transformer which applies preprocessing to the text: convert to lowercase and remove punctuation and special characters
    '''

    def __init__(self, input_column, output_column='clean_text'):
        super().__init__()
        
        # Define inputCol and inputOut
        self.inputCol = input_column
        self.outputCol = output_column
    
    def _transform(self, df: DataFrame) -> DataFrame:
        # Define text transformation
        def lowercase_and_punctuation(text):
            lower = text.lower() # lowercase
            return re.sub(pattern=r'[^\w\s]', repl='', string=lower) # remove all but words, digits and white spaces

        udf_preprocess = f.udf(lambda text: lowercase_and_punctuation(text),
                               StringType())

        # Apply transformation
        return df.withColumn(
            self.outputCol, udf_preprocess(f.col('overview'))
        )

For the rest of them, the classes included in PySpark are enough to build a proper Pipeline:

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

# Text preprocessing: convert to lowercase and remove punctuation and special characters
preprocessor = TextPreprocessing(input_column='overview', output_column='preprocessed')

# Tokenization
tokenizer = Tokenizer(inputCol="preprocessed", outputCol="words")

# Remove Stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# TF
count_vectorizer = CountVectorizer(
    inputCol="words", outputCol="rawFeatures", minDF=2, vocabSize=500
)

hashing_tf = HashingTF(
    inputCol='words', outputCol='rawFeatures', numFeatures=500
)

# IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Build a Pipeline with CountVectorizer and another with HashingTF 
pipeline_CV = Pipeline(stages=[preprocessor, tokenizer, remover, count_vectorizer, idf])
pipeline_HTF = Pipeline(stages=[preprocessor, tokenizer, remover, hashing_tf, idf])

Note that for both CountVectorizer and HashingTF—in the TF stage definition—some limits were imposed:

+ Firstly and more evident, there is a maximum number of features, which is actually much lower than the default size. This is a consequence of having a large number of movies, due to the elevate time it takes to evaluate the similarities of just one movie and the rest. To solve this issue, one option would be to make a previous selection of movies with a lighter method of based on cached information, to then apply the TF-IDF method to a smaller sample.

+ Secondly, with CountVectorizer exists the possibility of imposing conditions on the terms to be considered. For this matter, one of the most important one that can be done is requiring a term to appear in more than 1 (or more) documents, because if a term only appears in one document, then the similarity calculation of this element will always be zero—except for when a document is compared with itself. Furthermore, having terms that appear only in a few documents could lead to overfitting.

Now, let's continue by building the models to transform the text into vectors and applying them to the data:

In [16]:
# Get features by applying the pipeline, a Hashing technique and the IDF portion
model_CV = pipeline_CV.fit(overview)
results_CV = model_CV.transform(overview).select('id', 'title', 'overview', 'features')

model_HTF = pipeline_HTF.fit(overview)
results_HTF = model_HTF.transform(overview).select('id', 'title', 'overview', 'features')

Compute similarity of a chosen movie with the rest instead of calculating it for every possible combination, as TF-IDF is a costly method:

In [17]:
# Create similarity functions
example_movie_id = 22

udf_similiarity_CV = hf.create_similarity_function(
    spark=spark, movie_id=example_movie_id, # <-- Specify movie id to compare
    method='cosine', results=results_CV 
)

udf_similiarity_HTF = hf.create_similarity_function(
    spark=spark, movie_id=example_movie_id, # <-- Specify movie id to compare
    method='cosine', results=results_HTF 
)

# Compute Similarities
similarities_CV = results_CV.withColumn(
    'similarity', udf_similiarity_CV('features')
).\
    select('id', 'title', 'similarity').\
    orderBy('similarity', ascending=False).\
    filter(f.col('id') != example_movie_id)

similarities_HTF = results_HTF.withColumn(
    'similarity', udf_similiarity_HTF('features')
).\
    select('id', 'title', 'similarity').\
    orderBy('similarity', ascending=False).\
    filter(f.col('id') != example_movie_id)


Chosen movie: Pirates of the Caribbean: The Curse of the Black Pearl.
Chosen movie: Pirates of the Caribbean: The Curse of the Black Pearl.


In [18]:
import time

# Show similarity values
start = time.time()
print('--- CountVectorizer Method ---')
display(similarities_CV.limit(3).toPandas())
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.\n\n')

start = time.time()
print('--- HashingTF Method ---')
display(similarities_HTF.limit(3).toPandas())
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.')


--- CountVectorizer Method ---


Unnamed: 0,id,title,similarity
0,966,The Magnificent Seven,0.432751
1,149218,Come Dance with Me,0.42997
2,206183,Bad Karma,0.421883


Time elapsed: 85.0 seconds.


--- HashingTF Method ---


Unnamed: 0,id,title,similarity
0,353069,Mother's Day,0.384116
1,157129,Table No. 21,0.361648
2,37602,Oysters at Nam Kee's,0.350613


Time elapsed: 100.0 seconds.


The results were similar in time and in value spread, although for *HashingTF* the first recommendation is slightly clearer.

Finally, there is an alternative method for similarity calculation which involves LSH, an approximated method that only considers the closest neighbours. The function to use is the following one:

```
BucketedRandomProjectionLSH(
    inputCol, outputCol, bucketLength, numHashTables
)
```
+ *inputCol* and *outputCol* are the same as other PySpark functions (i.e. are used to indicate which column to take the data from and the column to dump the output).

+ *bucketLength*: LSH creates group together hashes into the same bucket. The width of said buckets depends on this parameter, so the smaller it is, the more precision, due to the separation of hashes into different buckets. In other words, the greater this parameter, the greater the dimensionality reduction.

+ *numHashTables*: In LSH, instead of relying on a single hash function, multiple ones are used to increase the chance that similar points are hashed in the same bucket. However, this also tends to introduce false positives.

To choose these parameters, there are several guidelines:

+ If the features are normalized *bucketLength* should be around 1 to 10, but if there is high variability, opt for 1% to 10% of the standard deviation of the data—although this requires calculating pairwise distances.

+ For most applications, starting with a *numHashTables* from 10 to 25 is reasonable, but for high dimensionality data the numbers could go up around 30 to 50 hash tables.

+ Finally, if the results seem irrelevant (too many false positive), either increase *bucketLength* or reduce *numHashTables*. Ans if the results are missing important matches (too many false negatives), decrease *bucketLength* or increase *nuHashTables*.

In [19]:
# Hash words with HashingTF (with expanded vocabulary)
from pyspark.ml.feature import BucketedRandomProjectionLSH

hashing_tf = HashingTF(
    inputCol='words', outputCol='rawFeatures', numFeatures=1500 # <--- more words!
)

pipeline_HTF = Pipeline(stages=[preprocessor, tokenizer, remover,
                                hashing_tf, idf])

results_HTF = pipeline_HTF.fit(overview).transform(overview).\
    select('id', 'title', 'overview', 'features')

# Apply LSH (Bucketed Random Projection LSH)
lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", 
                                  bucketLength=1, numHashTables=25)
lsh_model = lsh.fit(results_HTF)

# Find similar items for the example movie
similar_items = lsh_model.approxNearestNeighbors(
    results_HTF.filter(f.col('id') != example_movie_id),
    results_HTF.filter(f.col('id') == example_movie_id).select('features').collect()[0][0],
    numNearestNeighbors=5
)

# Show results
def similarity_from_distance(distance):
    return float(round(1 / (distance+1), 3))
udf_similarity_from_distance = f.udf(lambda d: similarity_from_distance(d),
                                     FloatType())

start = time.time()
print('--- LSH Method ---')

display(similar_items.select("title", "distCol").\
        withColumn('similarity', udf_similarity_from_distance(f.col('distCol'))).\
        select('title', 'similarity').\
            limit(5).toPandas())


end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.')

--- LSH Method ---


Unnamed: 0,title,similarity
0,Happy Ever Afters,0.043
1,Mother's Day,0.042
2,Enola Gay and the Atomic Bombing of Japan,0.042
3,Tour de Pharmacy,0.041
4,Iceland,0.041


Time elapsed: 24.0 seconds.


## Word2Vec and LSH
>A model with a Neural Network architecture capable of grouping in the embedding vector space synonyms and words with similar semantic function.


Word2Vec computes distributed vector representation of the words of the corpus. To do that, the context of the word is taking into account—i.e. the surrounding words—, so that terms with similar meaning (synonyms) or with similar semantic function are assigned vectors close to each other. This improves the model generalization and makes it more robust.

At a technical level, Word2Vec is a two-layer neural network that processes batches of text and returns a vector for each unique word. This vector will belong to a vector space usually of several hundred dimensions.

The Word2Vec model can use either of two architectures: Continuous Bag of Words (CBOW) or Continuous Skip-Gram. Both models aim to reduce dimensionality and produce dense vector representations of words (embeddings) from a large corpus.



### CBOW: Continuous Bag of Words



According to *Wikipedia*, the CBOW model can be viewed as a "fill the blank" task. The reason for this is that the model is trained by predicting what word was removed from a batch of text based on the surrounding words, under the assumption that the order does not influence the prediction (bag of words). 

For example, if in the sentence "the cat on the mat" the target word is "cat", then the model will give a prediction for the set of words ["the", "on", "the", "mat"], which turns into ["the", "on", "mat"] due to the bag of words assumption. Based on the similarity of the prediction compared to "cat" the weights of the model will change, and the mentioned prediction is obtained assuming that the missing word must be similar to the average embedding of the surrounding words.



### Continuous Skip-Gram



According to *Wikipedia*, the Skip-Gram model can be viewed as a "find the neighbors" task, so in a way is the opposite problem of CBOW. The reason for this is that the model is trained by predicting the context surrounding the target word. This makes the Slip-Gram model versatile with rare words, as it improves its ability to capture semantic relationships and flexibility to linguistic context, but it requires higher computation effort to train than CBOW.

Let's focus on the mathematical view of the problem:
+ Sequence of training words: $w_1,\dots,w_T$.

+ Objective: maximize the average log-likelihood: $$\frac{1}{T}\sum_{t=1}^{T}\sum_{j=-k}^{+k} \log p(w_{t+j}|w_t),$$ where $k$ is the size of the training window. For example: $$w_1,\dots,w_{t-2},[w_{t-1},w_{t},w_{t+1}],w_{t+2},\dots,w_T,$$ where the brackets mark the window of $k=1$ around the target word, $w_t$.

+ Definitions:
    + $u_w$: vector representation of $w$ as a word.

    + $v_w$: vector representation of $w$ as context for another word.

    + From softmax: $p(w_i|w_j) = \frac{exp\left( u_{w_{i}}^T\cdot v_{w_j} \right)}{\sum_{l=1}^{V} exp\left( u_l^T\cdot v_{w_j} \right)},$ where $V$ is the vocabulary size.

From these definitions, the objective is to maximize:
$$\frac{1}{T}\sum_{t=1}^{T}\sum_{j=-k}^{+k} \left( u_{w_{i}}^T\cdot v_{w_j} - \log\left[ \sum_{l=1}^{V} exp\left( u_l^T\cdot v_{w_j} \right) \right] \right).$$

However, the last portion of the equation—the logarithm of the sum of all vocabulary—is expensive, because the size of the vocabulary, $V$, can be in the order of millions. For this reason, in many cases—one of them the PySpark function—, instead hierarchical softmax is used, which reduces the cost to $O(\log(v))$.


### Code approach

The implementation of *Word2Vec* in *PySpark*  only includes **Skip-Gram**, not CBOW. This means that the model's architecture is designed to learn word vector representations that are good at predicting its context in the same sentence.

Let's start understanding the parameters of the *Word2Vec* function call:

    Word2Vec(inputCol, outputCol, seed, numPartitions,  vectorSize, ...)

+ The parameters *inputCol* and *outputCol* serve to identify the column to feed the model (input), and the column to dump the results (output).

+ *seed*: random value used to set the initial weights of the neural network.

+ *numPartitions*: defaults to 1, meaning that training the *Word2Vec* model would happen at a single executor. Increasing it will introduce parallelization, making the training faster, but there are two things to have into account:
    
    1. **Risk of overheat**: Splitting, managing and merging partitions takes "extra effort" (overheat), and it increases the more partitions are assigned.

    2. **Decrease in model accuracy**: due to data partition, each of the executors will train a *Word2Vec* model on a portion of the data before merging them together. To counter this to a certain extent, the optional input parameter *maxIter* is very useful, because in each iteration the data is shuffled, allowing the executor to access he rest of the data. The number can range between $\text{\textit{numPartition}}/5$ for large datasets and $\text{\textit{numPartition}} - 1$ for smaller ones.

+ **vectorSize**: the length of the embedding assigned to every term in the vocabulary. Its value affects the computational cost directly, but the larger it is, the more meaning that can be captured. However, overfitting is also an issue, thus for smaller datasets a value between 50 and 100 is a great starting point, while for larger data sets the values range between 200 and 300.

### Coding Word2Vec

The text input will come from the movies overview, like for the TF-IDF model:

In [20]:
overview = df.select('id', 'title', 'overview').orderBy('id').na.drop(subset=["overview"])
display(overview.limit(3).toPandas())

Unnamed: 0,id,title,overview
0,2,Ariel,Taisto Kasurinen is a Finnish coal miner whose...
1,3,Shadows in Paradise,"An episode in the life of Nikander, a garbage ..."
2,5,Four Rooms,It's Ted the Bellhop's first night on the job....


#### Text Cleaning

In [21]:
from pyspark.sql import DataFrame
from pyspark.sql.types import FloatType, StringType, ArrayType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

# Original data
original_sample = overview.limit(3).toPandas()
display(original_sample)

Unnamed: 0,id,title,overview
0,2,Ariel,Taisto Kasurinen is a Finnish coal miner whose...
1,3,Shadows in Paradise,"An episode in the life of Nikander, a garbage ..."
2,5,Four Rooms,It's Ted the Bellhop's first night on the job....


In the TF-IDF section, the data preparation was handled using a Pipeline. However, in this case, the process is itemized to provide an alternative approach. Additionally, a new stage called **"Stemming"** is introduced, which removes suffixes of words to heuristically form their root. This process is often linguistically incorrect, but it is less costly than other exhaustive techniques like "Lemmatization". For example, stemming could produce the following results:

+ "involved" → "involv" (linguistically correct).

+ "running" → "runn" (linguistically incorrect, lemmatization would return "run").

+ "flies" → "fli" (linguistically incorrect, lemmatization would return "fly").

Let's start the text preprocessing by transforming the words into lowercase, so that the model doesn't treat differently, for example, "Houses" and "houses". Nevertheless, this is a choice made to reduce vocabulary size, and it could be reasonable to distinguish between them.

In [22]:
# Lowercase
def lowercase(df: DataFrame, inputCol: str, outputCol: str = 'lower'):
    return df.withColumn(outputCol, f.lower(f.col(inputCol)))

cleaned = lowercase(overview, 'overview')
display(cleaned.limit(3).select('overview', 'lower').toPandas())

Unnamed: 0,overview,lower
0,Taisto Kasurinen is a Finnish coal miner whose...,taisto kasurinen is a finnish coal miner whose...
1,"An episode in the life of Nikander, a garbage ...","an episode in the life of nikander, a garbage ..."
2,It's Ted the Bellhop's first night on the job....,it's ted the bellhop's first night on the job....


The next step is to remove special character. This is done with regular expressions, thus there is freedom to skip some specific symbols. In the TF-IDF section, the pattern to be removed was "[^\w\s]", which means all but alphanumeric symbols (\w) and white-spaces (\s). It is reasonable to use the same one for Word2Vec, but to avoid affecting words like "It's", let's use in this case the pattern "[^\w\s\\']" to skip apostrophes as well:

In [23]:
# Remove Special Characters
def filter_special_characters(df: DataFrame, inputCol: str, outputCol: str = 'no_special_characters'):
    return df.withColumn(
        outputCol,
        f.regexp_replace(
            string=f.col(inputCol), pattern=r"[^\w\s\']", replacement=''
        )
    )

cleaned = filter_special_characters(cleaned, 'lower')
display(cleaned.limit(3).select('overview', 'no_special_characters').toPandas())

Unnamed: 0,overview,no_special_characters
0,Taisto Kasurinen is a Finnish coal miner whose...,taisto kasurinen is a finnish coal miner whose...
1,"An episode in the life of Nikander, a garbage ...",an episode in the life of nikander a garbage m...
2,It's Ted the Bellhop's first night on the job....,it's ted the bellhop's first night on the joba...


Now, tokenization will split the sentences into lists of words:

In [24]:
# Tokenization
def tokenization(df: DataFrame, inputCol: str, outputCol: str = 'tokenized'):
    tokenizer = Tokenizer(inputCol=inputCol, outputCol=outputCol) # Initialize
    return tokenizer.transform(df) # Transform

cleaned = tokenization(cleaned, 'no_special_characters')
display(cleaned.limit(3).select('overview', 'tokenized').toPandas())

Unnamed: 0,overview,tokenized
0,Taisto Kasurinen is a Finnish coal miner whose...,"[taisto, kasurinen, is, a, finnish, coal, mine..."
1,"An episode in the life of Nikander, a garbage ...","[an, episode, in, the, life, of, nikander, a, ..."
2,It's Ted the Bellhop's first night on the job....,"[it's, ted, the, bellhop's, first, night, on, ..."


After tokenization, it is possible to remove stopwords from the vocabulary, because they appear in most of the documents, making them have low impact on understanding the meaning of the sentence. However, it is reasonable to keep them if the dataset is small—to lose as little information as possible—and for some specific cases, such as sentiment analysis.

In [25]:
# Remove StopWords
def remove_stopwords(df: DataFrame, inputCol: str, outputCol: str = 'no_stopwords'):
    remover = StopWordsRemover(inputCol=inputCol, outputCol=outputCol) # initialize
    return remover.transform(cleaned) # transform

cleaned = remove_stopwords(cleaned, 'tokenized')
display(cleaned.limit(3).select('overview', 'no_stopwords').toPandas())

Unnamed: 0,overview,no_stopwords
0,Taisto Kasurinen is a Finnish coal miner whose...,"[taisto, kasurinen, finnish, coal, miner, whos..."
1,"An episode in the life of Nikander, a garbage ...","[episode, life, nikander, garbage, man, involv..."
2,It's Ted the Bellhop's first night on the job....,"[ted, bellhop's, first, night, joband, hotel's..."


Finally, the new stage, Stemming. In this case, PySpark does not have an implementation for this kind of preprocessing, but the library *nltk* has a Stemmer that can be used with PySpark's udf to transform the data.

In [26]:
# Stemming
def stem(df: DataFrame, inputCol: str, outputCol: str = 'stemmed'):
  stemmer = SnowballStemmer(language='english') # initialize
  
  # The stemmer has to be applied to every word of the list and each row 
  # has a list of words
  udf_stemmer = f.udf(
    lambda term_list: [stemmer.stem(term) for term in term_list], 
    ArrayType(StringType()))

  # Apply to dataframe
  return df.withColumn(outputCol, udf_stemmer(inputCol))

cleaned = stem(cleaned, 'no_stopwords')
display(cleaned.limit(3).select('overview', 'stemmed').toPandas())

Unnamed: 0,overview,stemmed
0,Taisto Kasurinen is a Finnish coal miner whose...,"[taisto, kasurinen, finnish, coal, miner, whos..."
1,"An episode in the life of Nikander, a garbage ...","[episod, life, nikand, garbag, man, involv, de..."
2,It's Ted the Bellhop's first night on the job....,"[ted, bellhop, first, night, joband, hotel, un..."


#### Embedding with Word2Vec

Note: make sure that there is no column named like *outputCol* in the dataframe.

In [27]:
from pyspark.ml.feature import Word2Vec

def embedding_W2V(df: DataFrame, inputCol: str, outputCol: str = 'features'):
    # Initialize
    word2vec = Word2Vec(inputCol=inputCol, outputCol=outputCol,
                        seed=33, vectorSize=50,
                        numPartitions=11, minCount=10)

    # Learn vocabulary and learn the weights
    model = word2vec.fit(cleaned)

    # Embeddings
    return model.transform(df)

cleaned = embedding_W2V(cleaned, 'stemmed')
display(cleaned.limit(3).select('overview', 'features').toPandas())

Unnamed: 0,overview,features
0,Taisto Kasurinen is a Finnish coal miner whose...,"[-0.08147076639113948, 0.09922557100653649, 0...."
1,"An episode in the life of Nikander, a garbage ...","[-0.06687724217772484, 0.09172882363200188, 0...."
2,It's Ted the Bellhop's first night on the job....,"[-0.05516240233555436, 0.10162156131118537, -0..."


#### Similarity Computation

In [28]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
import time

# Apply LSH (Bucketed Random Projection LSH)
lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", 
                                  bucketLength=1, numHashTables=25)
lsh_model = lsh.fit(cleaned)

# Find items similar to the example movie
example_movie_id = 22 # <-- Arbitrary id
similar_items = lsh_model.approxNearestNeighbors(
    cleaned.filter(f.col('id') != example_movie_id),
    cleaned.filter(f.col('id') == example_movie_id).select('features').collect()[0][0],
    numNearestNeighbors=5
)

# Start timer and print reference movie
start = time.time()
print('--- LSH Method ---\n' +
      f'Movies similar to {cleaned.filter(f.col("id") == example_movie_id).select("title").collect()[0][0]}.')

# Custom similarity from distance metric
udf_similarity_from_distance = f.udf(lambda d: float(round(1 / (d+1), 3)),
                                     FloatType())

# Queue similarity transformation
results = similar_items.select("title", "distCol").\
    withColumn('similarity', udf_similarity_from_distance(f.col('distCol'))).\
        select('title', 'similarity')

# Compute similarity
display(results.limit(3).select('title', 'similarity').toPandas())

# Stop timer and show time elapsed
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.')


--- LSH Method ---
Movies similar to Pirates of the Caribbean: The Curse of the Black Pearl.


Unnamed: 0,title,similarity
0,The Adventures of Tarzan,0.828
1,Ten Tall Men,0.824
2,Alaska,0.823


Time elapsed: 29.0 seconds.


## Singular Value Decomposition (SVD)
>A dimensionality reduction technique to increase efficiency in similarity calculations. Works best for medium and small datasets.



Singular Value Decomposition or SVD is a technique that decomposes a matrix into three. For instance, let's say a matrix $\underline{\underline{A}}$ is storing information about movies like this:

<br />
<center>

|             | **Feature 1** | **Feature 2** | **Feature 3** |
|:-----------:|:-------------:|:-------------:|:-------------:|
| **Movie 1** |      0.3      |      0.1      |      0.0      |
| **Movie 2** |      0.5      |      0.0      |      0.8      |

Table 1: Example of a movie-feature matrix.

</center>
<br />

Where 'Features' refer to any way to identify a movie. Let's be more specific and work with movie genres, although the values will be arbitrary:

<br />
<center>

|             | **Space** | **Time** | **Travel** | **Alien** | **Love** |
|:-----------:|:---------:|:--------:|:----------:|:---------:|:--------:|
| **Movie 1** |     1     |     1    |      1     |     0     |     0    |
| **Movie 2** |     0     |     0    |      1     |     0     |     1    |

Table 2: A more specific example of a movie-feature matrix using genres as features.

</center>
<br />

Now that the matrix $\underline{\underline{A}}$ is defined, the basic idea of SVD is that instead of representing the movies with these 5 genres—which in turn is a five dimensional space—, they will be represented in fewer, yet more general 'Topics'. These 'Topics' could relate to the original genres as follows:

<br />
<center>

|                      | **Space** | **Time** | **Travel** | **Alien** | **Love** |
|:--------------------:|:---------:|:--------:|:----------:|:---------:|:--------:|
|  **Topic 1: Sci-Fi** |    0.7    |    0.6   |     0.8    |    0.5    |    0.0   |
| **Topic 2: Romance** |    0.0    |    0.1   |     0.5    |    0.3    |    0.9   |

Table 3: Representation of Features in the new space.

</center>
<br />

And consequently, allow this new representation:

<br />
<center>

|             | **Topic 1: Sci-Fi** | **Topic 2: Romance** |
|:-----------:|:-------------------:|:--------------------:|
| **Movie 1** |         0.9         |          0.1         |
| **Movie 2** |         0.2         |          0.8         |

Table 4: Representation of Movies in the new space.

</center>
<br />

In summary, with SVD it is possible to reduce the dimensionality of the problem at hand, making it less computationally intensive, and thus making the similarity calculation faster.



### SVD with a more mathematical approach

Let's say that our matrix $\underline{\underline{A}}$ (movies as rows and features as columns) is a $n\times m$ matrix, meaning that it is representing $n$ movies with $m$ features. To describe the movies in a different way—with more general features, as with the 'Topics' before—, $\underline{\underline{A}}$ will be factorized in its *latent factors*.

The maximum number of latent factors is determined by $\underline{\underline{A}}$ and can be extracted from its *rank*:

$$k \equiv \text{rank}(\underline{\underline{A}}_{n\times m}) \leq \min(n,m).$$

So the factorization of a matrix $\underline{\underline{A}}_{n\times m}$ and rank $k$ in its latent factors would be as follows:

$$\underline{\underline{A}}_{n\times m} = \underline{\underline{U}}_{n\times k}\; \underline{\underline{\Sigma}}_{k\times k}\; \underline{\underline{V}}_{k\times m}^T,$$

where:

+ $\underline{\underline{U}}_{n\times k}$ is the representation of the movies in the new space, i.e. Table 4.

+ $\underline{\underline{\Sigma}}_{k\times k}$ is a diagonal matrix that contains the *singular values* of the original matrix, which indicate the importance of each latent factor.

+ $\underline{\underline{V}}_{k\times m}^T$ represents the features in the new space, i.e. Table 3.

Finally, using the singular values from the matrix $\underline{\underline{\Sigma}}$ it is possible to determine which latent values are more important to the current data. This opens the possibility of truncation, meaning that there is no need to keep all $k$ latent factors, but only the most important ones to represent the data more efficiently, if only sacrificing accuracy.

### Coding SVD

In [29]:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
import time

# Create Movie-Genres dataframe
genres = df.select('id', 'title',
                   f.explode(f.col('genres')).alias('genre'), # A row per genre
                   f.lit(1).alias('value')) # Mark as present

genres = genres.groupby('id', 'title').pivot('genre').sum('value').fillna(0)
display(genres.limit(3).toPandas())

Unnamed: 0,id,title,Action,Adventure,Animation,Comedy,Crime,Documentary,Drama,Family,...,History,Horror,Music,Mystery,Romance,Science Fiction,TV Movie,Thriller,War,Western
0,68720,Tammy and the Doctor,0,0,0,1,0,0,0,0,...,0,0,0,0,1,0,0,0,0,0
1,72074,Murderer,0,0,0,0,1,0,1,0,...,0,0,0,1,0,0,0,1,0,0
2,288154,Jersey Shore Massacre,0,0,0,1,0,0,0,0,...,0,1,0,0,0,0,0,0,0,0


In [30]:
# Convert Genre Columns to Feature Vector
vector_assembler = VectorAssembler(inputCols=genres.drop('title', 'id').columns, outputCol="genres_features")
df_features = vector_assembler.transform(genres).select('id', "title", "genres_features")

# Apply SVD (using PCA as a substitute for Truncated SVD)
k = 5  # Reduce to 5 latent dimensions
pca = PCA(k=k, inputCol="genres_features", outputCol="features")
df_svd = pca.fit(df_features).transform(df_features).select('id', "title", "features")

display(df_svd.limit(3).toPandas())

Unnamed: 0,id,title,features
0,68720,Tammy and the Doctor,"[-0.37249461810173123, -0.9533260450342833, -0..."
1,72074,Murderer,"[1.1079085340316965, 0.36656159395470356, -1.0..."
2,288154,Jersey Shore Massacre,"[-0.5493537603056138, -0.37159302194402466, -0..."


In [31]:
# Create similarity functions
example_movie_id = 22

udf_similiarity = hf.create_similarity_function(
    spark=spark, movie_id=example_movie_id, 
    method='cosine', results=df_svd # <-- Specify movie id to compare
)

# Compute Similarities
similarities = df_svd.withColumn(
    'similarity', udf_similiarity('features')
).\
    select('id', 'title', 'similarity').\
    orderBy('similarity', ascending=False).\
    filter(f.col('id') != example_movie_id)

# Show similarity values
start = time.time()
print('--- SVD Method ---')
display(similarities.limit(10).toPandas())
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.\n\n')

Chosen movie: Pirates of the Caribbean: The Curse of the Black Pearl.
--- SVD Method ---


Unnamed: 0,id,title,similarity
0,558,Spider-Man 2,1.0
1,84633,Quest of the Delta Knights,1.0
2,297762,Wonder Woman,1.0
3,37430,Conan the Barbarian,1.0
4,347096,Mythica: The Darkspore,1.0
5,23047,Season of the Witch,1.0
6,121,The Lord of the Rings: The Two Towers,1.0
7,8009,Highlander,1.0
8,32654,The Storm Warriors,1.0
9,9387,Conan the Barbarian,1.0


Time elapsed: 3.0 seconds.




Let's do a simple recommender system by movie genre:

In [32]:
# Explode genre list into different columns
genres = df.select('genres', 'title')
genres = df.select(df.title, f.explode(df.genres).alias('genre'))
display(genres.limit(3).toPandas())

Unnamed: 0,title,genre
0,Straight Outta Compton,Drama
1,Straight Outta Compton,Music
2,Viva Algeria,Drama


In [33]:
# Create a columns of 1 to indicate a movies has a specific genre
genres = genres.withColumn('value', f.lit(1))
display(genres.limit(3).toPandas())

Unnamed: 0,title,genre,value
0,Straight Outta Compton,Drama,1
1,Straight Outta Compton,Music,1
2,Viva Algeria,Drama,1


In [34]:
# Set movies as rows (groupby) and genres as columns (pivot)
# sum() makes sure it is one on the (title, genre) pairs indicated above
genres = genres.groupby('title').pivot('genre').sum('value')
display(genres.limit(3).toPandas())

Unnamed: 0,title,Action,Adventure,Animation,Comedy,Crime,Documentary,Drama,Family,Fantasy,...,History,Horror,Music,Mystery,Romance,Science Fiction,TV Movie,Thriller,War,Western
0,We Are Many,,,,,,1.0,,,,...,,,,,,,,,,
1,All The Days Before Tomorrow,,,,1.0,,,1.0,,,...,,,,,1.0,,,,,
2,Snowman's Land,,,,1.0,1.0,,,,,...,,,,,,,,1.0,,


In [35]:
# Finally, set the NULLs as 0, which indicate that the movie hasn't that genre
genres = genres.fillna(0)
display(genres.limit(3).toPandas())

Unnamed: 0,title,Action,Adventure,Animation,Comedy,Crime,Documentary,Drama,Family,Fantasy,...,History,Horror,Music,Mystery,Romance,Science Fiction,TV Movie,Thriller,War,Western
0,We Are Many,0,0,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,All The Days Before Tomorrow,0,0,0,1,0,0,1,0,0,...,0,0,0,0,1,0,0,0,0,0
2,Snowman's Land,0,0,0,1,1,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0


This DataFrame stores the movie-genre relationships, which are required to calculate the similarities. In this scenario, it is a good idea to preemptively calculate similarities and cache them, so the recommendations can happen in real time. To do this, the first step is to turn the movies into vectors, which is an easy task considering the rows of the movie-genre DataFrame is a vector representation of the movies in the genre space. However, to save up memory, it is wise to use dense vectors, because most of the values are zeros:

In [36]:
from pyspark.ml.feature import VectorAssembler
# Train vector assembler on the dataframe
vector_assembler = VectorAssembler(inputCols=genres.columns[1:],
                                   outputCol='features')

# Create the column features with the Dense Vector representation
movie_features = vector_assembler.transform(genres).select('title', 'features')
movie_features.show(3, truncate=False)


+----------------------------+---------------------------+
|title                       |features                   |
+----------------------------+---------------------------+
|We Are Many                 |(20,[5],[1.0])             |
|All The Days Before Tomorrow|(20,[3,6,14],[1.0,1.0,1.0])|
|Snowman's Land              |(20,[3,4,17],[1.0,1.0,1.0])|
+----------------------------+---------------------------+
only showing top 3 rows



Although in pandas, dense vectors show like usual vectors:

In [37]:
display(movie_features.limit(3).toPandas())

Unnamed: 0,title,features
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
1,All The Days Before Tomorrow,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
2,Snowman's Land,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."


Now, to calculate the similarities for each pair of movies, let's join the resulting DataFrame with itself. This will result into a DataFrame whose rows will contain two movies—thus, two dense vectors. However, there are two things that must be guaranteed:

+ For every row, the two movies cannot be the same. This would lead to the calculation of the similarity between them, which will not do any good to the recommender system.

+ The pair (A,B) yields the same similarity than the pair (B,A), so there is no need to calculate them both.

In [38]:
# Cross Join: Cartesian Product of the tables
df_cross = movie_features.alias('left').crossJoin(movie_features.alias('right'))
display(df_cross.limit(3).toPandas())

Unnamed: 0,title,features,title.1,features.1
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ..."
1,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",All The Days Before Tomorrow,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
2,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",Snowman's Land,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, ..."


To filter out the rows with the same movie twice and the repeated pairs, Python allows the use of '<' to compare text. For example:

+ 'The Hunger Games' < 'Braveheart' = False.

+ 'Braveheart' < 'The Hunger Games' = True.

Because 'B' comes before 'T', due to the alphabetical order.

In [39]:
df_cross = df_cross.filter(f.col('left.title') < f.col('right.title'))
display(df_cross.limit(3).toPandas())

Unnamed: 0,title,features,title.1,features.1
0,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",What No One Knows,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."
1,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",What Have They Done to Your Daughters?,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,We Are Many,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...",We are the tide,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, ..."


Finally, the similarities will be stored in a new column and calculated via User Defined Functions (udf):

In [40]:
# --- Jaccard Similarity ---
def jaccard_similarity(v1, v2):
    # Formula:
    #   J = Intersection(A,B) / Union(A,B)
    A_and_B = sum(1 for (a,b) in zip(v1,v2) if a==1 and a==b) # Intersection
    A_or_B = sum(1 for (a,b) in zip(v1,v2) if a==1 or b==1) # Union
    return float(A_and_B) / A_or_B if A_or_B != 0 else 0.0 # Similarity

# --- Cosine Similarity ---
def cosine_similarity(v1,v2):
    # Formula:
    #   Sim = A·B / |A||B|

    # Numerator: scalar product
    num = sum(c1*c2 for (c1,c2) in zip(v1,v2))
    
    # Denominator: modules
    mod_a = np.sqrt(sum(c1**2 for c1 in v1))
    mod_b = np.sqrt(sum(c2**2 for c2 in v2))
    den = mod_a * mod_b

    # Similarity
    return float(num) / float(den) if den != 0.0 else 0.0

# --- User Defined Functions ---
jaccard_udf = f.udf(lambda v1,v2: jaccard_similarity(v1,v2), FloatType())
cosine_udf = f.udf(lambda v1,v2: cosine_similarity(v1,v2), FloatType())

# Apply udf to the pairs of movies in every row
df_similarities = df_cross.\
    withColumn('jaccard', jaccard_udf(f.col('left.features'), f.col('right.features'))).\
    withColumn('cosine', cosine_udf(f.col('left.features'), f.col('right.features'))).\
    select('left.title', 'right.title', 'jaccard', 'cosine')

display(df_similarities.limit(10).toPandas())

Unnamed: 0,title,title.1,jaccard,cosine
0,We Are Many,What No One Knows,0.0,0.0
1,We Are Many,What Have They Done to Your Daughters?,0.0,0.0
2,We Are Many,We are the tide,0.0,0.0
3,We Are Many,Window Water Baby Moving,1.0,1.0
4,We Are Many,Wood Job!,0.0,0.0
5,We Are Many,Zodiac: The Race Begins...,0.0,0.0
6,We Are Many,Убить дракона,0.0,0.0
7,We Are Many,Women of the Night,0.0,0.0
8,We Are Many,Wise Guys,0.0,0.0
9,We Are Many,You Wont Miss Me,0.0,0.0



From the dataset, only the *movie_id*, *title* and *overview* will be chosen. The terms in the overviews will yield the TF-IDF vectors.

In [41]:
overview = df.select('id', 'title', 'overview').orderBy('id').na.drop(subset=["overview"])
display(overview.limit(3).toPandas())

Unnamed: 0,id,title,overview
0,2,Ariel,Taisto Kasurinen is a Finnish coal miner whose...
1,3,Shadows in Paradise,"An episode in the life of Nikander, a garbage ..."
2,5,Four Rooms,It's Ted the Bellhop's first night on the job....


Now, let's focus on transforming the data to make the process more efficient and vectorize the documents. As it was established before, this should include: preprocessing, tokenization, removing stopwords and computing both TF and IDF. *PySpark* can manage most of these, but for the preprocessing is required a specific behavior, thus the best approach would be to implement a custom stage for the pipeline.

To create a custom stage for a pipeline one option is to create an user defined function (udf), but the preprocessing stage is slightly complex, as it must convert the text to lowercase and remove any punctuation and special characters. For this reason, a more elegant solution is to use **custom Transformers**.

In [42]:
from pyspark.ml import Transformer
from pyspark.sql import DataFrame
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import re

class TextPreprocessing(Transformer, HasInputCol, HasOutputCol):
    '''
    A custom Transformer which applies preprocessing to the text: convert to lowercase and remove punctuation and special characters
    '''

    def __init__(self, input_column, output_column='clean_text'):
        super().__init__()
        
        # Define inputCol and inputOut
        self.inputCol = input_column
        self.outputCol = output_column
    
    def _transform(self, df: DataFrame) -> DataFrame:
        # Define text transformation
        def lowercase_and_punctuation(text):
            lower = text.lower() # lowercase
            return re.sub(pattern=r'[^\w\s]', repl='', string=lower) # remove all but words, digits and white spaces

        udf_preprocess = f.udf(lambda text: lowercase_and_punctuation(text),
                               StringType())

        # Apply transformation
        return df.withColumn(
            self.outputCol, udf_preprocess(f.col('overview'))
        )

For the rest of them, the classes included in PySpark are enough to build a proper Pipeline:

In [43]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

# Text preprocessing: convert to lowercase and remove punctuation and special characters
preprocessor = TextPreprocessing(input_column='overview', output_column='preprocessed')

# Tokenization
tokenizer = Tokenizer(inputCol="preprocessed", outputCol="words")

# Remove Stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

# TF
count_vectorizer = CountVectorizer(
    inputCol="words", outputCol="rawFeatures", minDF=2, vocabSize=500
)

hashing_tf = HashingTF(
    inputCol='words', outputCol='rawFeatures', numFeatures=500
)

# IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Build a Pipeline with CountVectorizer and another with HashingTF 
pipeline_CV = Pipeline(stages=[preprocessor, tokenizer, remover, count_vectorizer, idf])
pipeline_HTF = Pipeline(stages=[preprocessor, tokenizer, remover, hashing_tf, idf])

Note that for both CountVectorizer and HashingTF—in the TF stage definition—some limits were imposed:

+ Firstly and more evident, there is a maximum number of features, which is actually much lower than the default size. This is a consequence of having a large number of movies, due to the elevate time it takes to evaluate the similarities of just one movie and the rest. To solve this issue, one option would be to make a previous selection of movies with a lighter method of based on cached information, to then apply the TF-IDF method to a smaller sample.

+ Secondly, with CountVectorizer exists the possibility of imposing conditions on the terms to be considered. For this matter, one of the most important one that can be done is requiring a term to appear in more than 1 (or more) documents, because if a term only appears in one document, then the similarity calculation of this element will always be zero—except for when a document is compared with itself. Furthermore, having terms that appear only in a few documents could lead to overfitting.

Now, let's continue by building the models to transform the text into vectors and applying them to the data:

In [44]:
# Get features by applying the pipeline, a Hashing technique and the IDF portion
model_CV = pipeline_CV.fit(overview)
results_CV = model_CV.transform(overview).select('id', 'title', 'overview', 'features')

model_HTF = pipeline_HTF.fit(overview)
results_HTF = model_HTF.transform(overview).select('id', 'title', 'overview', 'features')

Compute similarity of a chosen movie with the rest instead of calculating it for every possible combination, as TF-IDF is a costly method:

In [45]:
# Create similarity functions
example_movie_id = 22

udf_similiarity_CV = hf.create_similarity_function(
    spark=spark, movie_id=example_movie_id, # <-- Specify movie id to compare
    method='cosine', results=results_CV 
)

udf_similiarity_HTF = hf.create_similarity_function(
    spark=spark, movie_id=example_movie_id, # <-- Specify movie id to compare
    method='cosine', results=results_HTF 
)

# Compute Similarities
similarities_CV = results_CV.withColumn(
    'similarity', udf_similiarity_CV('features')
).\
    select('id', 'title', 'similarity').\
    orderBy('similarity', ascending=False).\
    filter(f.col('id') != example_movie_id)

similarities_HTF = results_HTF.withColumn(
    'similarity', udf_similiarity_HTF('features')
).\
    select('id', 'title', 'similarity').\
    orderBy('similarity', ascending=False).\
    filter(f.col('id') != example_movie_id)


Chosen movie: Pirates of the Caribbean: The Curse of the Black Pearl.
Chosen movie: Pirates of the Caribbean: The Curse of the Black Pearl.


In [46]:
import time

# Show similarity values
start = time.time()
print('--- CountVectorizer Method ---')
display(similarities_CV.limit(3).toPandas())
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.\n\n')

start = time.time()
print('--- HashingTF Method ---')
display(similarities_HTF.limit(3).toPandas())
end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.')


--- CountVectorizer Method ---


Unnamed: 0,id,title,similarity
0,966,The Magnificent Seven,0.432751
1,149218,Come Dance with Me,0.42997
2,206183,Bad Karma,0.421883


Time elapsed: 89.0 seconds.


--- HashingTF Method ---


Unnamed: 0,id,title,similarity
0,353069,Mother's Day,0.384116
1,157129,Table No. 21,0.361648
2,37602,Oysters at Nam Kee's,0.350613


Time elapsed: 167.0 seconds.


The results were similar in time and in value spread, although for *HashingTF* the first recommendation is slightly clearer.

Finally, there is an alternative method for similarity calculation which involves LSH, an approximated method that only considers the closest neighbours. The function to use is the following one:

```
BucketedRandomProjectionLSH(
    inputCol, outputCol, bucketLength, numHashTables
)
```
+ *inputCol* and *outputCol* are the same as other PySpark functions (i.e. are used to indicate which column to take the data from and the column to dump the output).

+ *bucketLength*: LSH creates group together hashes into the same bucket. The width of said buckets depends on this parameter, so the smaller it is, the more precision, due to the separation of hashes into different buckets. In other words, the greater this parameter, the greater the dimensionality reduction.

+ *numHashTables*: In LSH, instead of relying on a single hash function, multiple ones are used to increase the chance that similar points are hashed in the same bucket. However, this also tends to introduce false positives.

To choose these parameters, there are several guidelines:

+ If the features are normalized *bucketLength* should be around 1 to 10, but if there is high variability, opt for 1% to 10% of the standard deviation of the data—although this requires calculating pairwise distances.

+ For most applications, starting with a *numHashTables* from 10 to 25 is reasonable, but for high dimensionality data the numbers could go up around 30 to 50 hash tables.

+ Finally, if the results seem irrelevant (too many false positive), either increase *bucketLength* or reduce *numHashTables*. Ans if the results are missing important matches (too many false negatives), decrease *bucketLength* or increase *nuHashTables*.

In [47]:
# Hash words with HashingTF (with expanded vocabulary)
from pyspark.ml.feature import BucketedRandomProjectionLSH

hashing_tf = HashingTF(
    inputCol='words', outputCol='rawFeatures', numFeatures=1500 # <--- more words!
)

pipeline_HTF = Pipeline(stages=[preprocessor, tokenizer, remover,
                                hashing_tf, idf])

results_HTF = pipeline_HTF.fit(overview).transform(overview).\
    select('id', 'title', 'overview', 'features')

# Apply LSH (Bucketed Random Projection LSH)
lsh = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", 
                                  bucketLength=1, numHashTables=25)
lsh_model = lsh.fit(results_HTF)

# Find similar items for the example movie
similar_items = lsh_model.approxNearestNeighbors(
    results_HTF.filter(f.col('id') != example_movie_id),
    results_HTF.filter(f.col('id') == example_movie_id).select('features').collect()[0][0],
    numNearestNeighbors=5
)

# Show results
def similarity_from_distance(distance):
    return float(round(1 / (distance+1), 3))
udf_similarity_from_distance = f.udf(lambda d: similarity_from_distance(d),
                                     FloatType())

start = time.time()
print('--- LSH Method ---')

display(similar_items.select("title", "distCol").\
        withColumn('similarity', udf_similarity_from_distance(f.col('distCol'))).\
        select('title', 'similarity').\
            limit(5).toPandas())


end = time.time()
print(f'Time elapsed: {np.round(end-start)} seconds.')

--- LSH Method ---


Unnamed: 0,title,similarity
0,Happy Ever Afters,0.043
1,Mother's Day,0.042
2,Enola Gay and the Atomic Bombing of Japan,0.042
3,Tour de Pharmacy,0.041
4,Iceland,0.041


Time elapsed: 24.0 seconds.


# Stop Spark Session

In [48]:
spark.stop()

# References
+ [SVD](https://machinelearningmastery.com/using-singular-value-decomposition-to-build-a-recommender-system/)

**TF-IDF**

+ [TF-IDF Documentation (PySpark).](https://spark.apache.org/docs/3.5.2/mllib-feature-extraction.html#tf-idf)

+ [TF-IDF with CountVectorize.](https://www.analyticsvidhya.com/blog/2022/09/implementing-count-vectorizer-and-tf-idf-in-nlp-using-pyspark/)

+ [TF-IDF Example.](https://runawayhorse001.github.io/LearningApacheSpark/manipulation.html)

**Word2Vec**

+ [What Is Word2Vec and How Does It Work?](https://swimm.io/learn/large-language-models/what-is-word2vec-and-how-does-it-work)

+ [Word2Vec Wikipedia page.](https://en.wikipedia.org/wiki/Word2vec)

+ [Word2Vec Documentation (PySpark).](https://spark.apache.org/docs/3.5.2/mllib-feature-extraction.html#word2vec)

+ [Word2Vec: parameters and distributed training.](https://medium.com/@the.data.yoga/creating-word2vec-embeddings-on-a-large-text-corpus-with-pyspark-469007116551)

+ [PySpark Sentiment Analysis with Word2Vec Embedding.](https://www.kaggle.com/code/muhammetzahitaydn/pyspark-sentiment-analysis-with-word2vec-embedding)

+ [Python | Stemming words with NLTK](https://www.geeksforgeeks.org/python-stemming-words-with-nltk/)