#**Setup SparkNLP and PySpark**

In this intial section, we setup and resolve all dependencies to run SparkNLP and PySpark on Google Collaboratory. Please note that this may require you to have a Collab Pro account! 

In [None]:
! wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

--2022-05-15 18:25:56--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 51.158.130.125
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.125|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://setup.johnsnowlabs.com/colab.sh [following]
--2022-05-15 18:25:56--  https://setup.johnsnowlabs.com/colab.sh
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.125|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2022-05-15 18:25:57--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:44

In [None]:
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import numpy as np
from pyspark.ml.linalg import *
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import BucketedRandomProjectionLSH

In [None]:
import sparknlp

spark = sparknlp.start(gpu=True)

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  3.4.4
Apache Spark version:  3.0.3


#**Generate Spark Pipeline that utilizes BERT for Embeddings**

The codeblock below sets up the Spark Pipeline for BERT described in the final report. Specifically, it generates a pipeline utilizing:

1. A documentAssembler
2. A SentenceDetector
3. A BertSentenceEmbeddings model
4. An EmbeddingsFinisher

The specific BERT implementation utilized is sent_small_bert_L2_128 as referenced in the paper [here](https://arxiv.org/abs/1908.08962) (Well-Read Students Learn Better: On the Importance of Pre-training Compact Models). 

In [None]:
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")
    
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_128") \
    .setInputCols(["sentence"]) \
    .setOutputCol("sentence_bert_embeddings")

embeddingsFinisher = EmbeddingsFinisher() \
    .setInputCols(["sentence_bert_embeddings"]) \
    .setOutputCols("finished_embeddings") \
    .setOutputAsVector(True)
    
pipeline = Pipeline().setStages([
    documentAssembler,
    sentence,
    embeddings,
    embeddingsFinisher
])

sent_small_bert_L2_128 download started this may take some time.
Approximate size to download 16.1 MB
[OK!]


#**Example Embeddings Extraction on Smaller Dataset**

In this section, we run a small test dataset against pipeline we generated above to confirm that we retain contextual information in the vectors. Initially, a small dataset of about 20 movies is generated using title and plot descriptions from TMDb. Afterwards, two query movies are run against the dataset.

##**Small Sample Dataset**

In [None]:
data = [
  ["Toy Story 1", "A cowboy doll is profoundly threatened and jealous when a new spaceman figure supplants him as top toy in a boy's room"],
  ["Toy Story 2", "When Woody is stolen by a toy collector, Buzz and his friends set out on a rescue mission to save Woody before he becomes a museum toy property with his roundup gang Jessie, Prospector, and Bullseye."],
  ["Toy Story 3", "The toys are mistakenly delivered to a day-care center instead of the attic right before Andy leaves for college, and it's up to Woody to convince the other toys that they weren't abandoned and to return home."],
  ["Avengers", "Earth's mightiest heroes must come together and learn to fight as a team if they are going to stop the mischievous Loki and his alien army from enslaving humanity."],
  ["Avengers: The Age of Ultron", "When Tony Stark and Bruce Banner try to jump-start a dormant peacekeeping program called Ultron, things go horribly wrong and it's up to Earth's mightiest heroes to stop the villainous Ultron from enacting his terrible plan."],
  ["Tenet", "Armed with only one word, Tenet, and fighting for the survival of the entire world, a Protagonist journeys through a twilight world of international espionage on a mission that will unfold in something beyond real time."],
  ["Choose or Die", "After firing up a lost 80s survival horror game, a young coder unleashes a hidden curse that tears reality apart, forcing her to make terrifying decisions and face deadly consequences"],
  ["X", "In 1979, a group of young filmmakers set out to make an adult film in rural Texas, but when their reclusive, elderly hosts catch them in the act, the cast find themselves fighting for their lives."],
  ["The Black Phone", "After being abducted by a child killer and locked in a soundproof basement, a 13-year-old boy starts receiving calls on a disconnected phone from the killer's previous victims."],
  ["Scream","25 years after a streak of brutal murders shocked the quiet town of Woodsboro, Calif, a new killer dons the Ghostface mask and begins targeting a group of teenagers to resurrect secrets from the town's deadly past."],
  ["The Lighthouse","Two lighthouse keepers try to maintain their sanity while living on a remote and mysterious New England island in the 1890s."],
  ["Jeepers Creepers: Reborn","Forced to travel with her boyfriend, Laine, she begins to experience premonitions associated with the urban myth of The Creeper Laine believes that something supernatural has been summoned - and that she is at the center of it all."],
  ["Get Out","A young African-American visits his white girlfriend's parents for the weekend, where his simmering uneasiness about their reception of him eventually reaches a boiling point."],
  ["A Quiet Place","In a post-apocalyptic world, a family is forced to live in silence while hiding from monsters with ultra-sensitive hearing."],
  ["Antman", "Armed with a super-suit with the astonishing ability to shrink in scale but increase in strength, cat burglar Scott Lang must embrace his inner hero and help his mentor, Dr. Hank Pym, pull off a plan that will save the world."],
  ["The Grudge", "A house is cursed by a mysterious ghost that dooms those who enter it with a violent death."],
  ["Ju-On", "A mysterious and vengeful spirit marks and pursues anybody who dares enter the house in which it resides."],
  ["The Grudge 2", "Three interwoven stories about a terrible curse - a young woman encounters a malevolent supernatural force while searching for her missing sister in Tokyo, a mean high school prank goes horribly wrong, a woman with a deadly secret moves into a Chicago apartment building."],
  ["Spiderman: Homecoming", "Peter Parker balances his life as an ordinary high school student in Queens with his superhero alter-ego Spider-Man, and finds himself on the trail of a new menace prowling the skies of New York City."]
]

df = spark.createDataFrame(data).toDF("title", "text")

df.show()

+--------------------+--------------------+
|               title|                text|
+--------------------+--------------------+
|         Toy Story 1|A cowboy doll is ...|
|         Toy Story 2|When Woody is sto...|
|         Toy Story 3|The toys are mist...|
|            Avengers|Earth's mightiest...|
|Avengers: The Age...|When Tony Stark a...|
|               Tenet|Armed with only o...|
|       Choose or Die|After firing up a...|
|                   X|In 1979, a group ...|
|     The Black Phone|After being abduc...|
|              Scream|25 years after a ...|
|      The Lighthouse|Two lighthouse ke...|
|Jeepers Creepers:...|Forced to travel ...|
|             Get Out|A young African-A...|
|       A Quiet Place|In a post-apocaly...|
|              Antman|Armed with a supe...|
|          The Grudge|A house is cursed...|
|               Ju-On|A mysterious and ...|
|        The Grudge 2|Three interwoven ...|
|Spiderman: Homeco...|Peter Parker bala...|
+--------------------+----------

##**Generating Embeddings for movies in the Sample Dataset**

In [None]:
df_bert = pipeline.fit(df).transform(df)
df_bert.selectExpr("title","text","explode(finished_embeddings) as result").show(20,100)

+---------------------------+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|                      title|                                                                                                text|                                                                                              result|
+---------------------------+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------+
|                Toy Story 1|A cowboy doll is profoundly threatened and jealous when a new spaceman figure supplants him as to...|[-0.9685783982276917,0.3691055476665497,-0.11737212538719177,-1.7240582704544067,0.15731708705425...|
|                Toy Story 2|When Woody is stolen by a toy collector, Bu

##**Generating the Test Queries for Iron Man 1 and Paranormal Activity**

In [None]:
query = [
  ["Iron Man 1","After being held captive in an Afghan cave, billionaire engineer Tony Stark creates a unique weaponized suit of armor to fight evil."],
]

query_df = spark.createDataFrame(query).toDF("title", "text")
query_df.show()

query2 = [
  ["Paranormal Activity","After moving into a suburban home, a couple becomes increasingly disturbed by a nightly demonic presence."],
]

query2_df = spark.createDataFrame(query2).toDF("title", "text")
query2_df.show()

+----------+--------------------+
|     title|                text|
+----------+--------------------+
|Iron Man 1|After being held ...|
+----------+--------------------+

+-------------------+--------------------+
|              title|                text|
+-------------------+--------------------+
|Paranormal Activity|After moving into...|
+-------------------+--------------------+



##**Generating the embeddings for the Test Queries**

In [None]:
query_bert = pipeline.fit(query_df).transform(query_df)
#query_bert.show()
query_bert.selectExpr("title", "explode(finished_embeddings) as result").show(truncate=False)

query2_bert = pipeline.fit(query2_df).transform(query2_df)
#query_bert.show()
query2_bert.selectExpr("title", "explode(finished_embeddings) as result").show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

##**Test Queries Using Cosine Similarity UDF**

Now that we have vector embeddings for data inside the small sample dataset and for our query movies, we run a Cosine Similarity UDF on the PySpark dataframes to confirm that we retain contextual information in the vectors. We expect the movies with the highest similarity to Iron Man to be Marvel movies and superhero movies. We expect the movies with the highest similarity to Paranormal Activity to be horror movies. 

The top 3 highest similarity movies, with their Cosine Similarity and their plot are displayed.

In [None]:
def random_dense_vector(length=10):
    return Vectors.dense([float(np.random.random()) for i in range(length)])

# create a query as dense vector from input
query_vec = query_bert.selectExpr("explode(finished_embeddings) as result").collect()[0][0]

# create a DF with dense vectors
df = df_bert.selectExpr("title","text","explode(finished_embeddings) as result")

# write our UDF for cosine similarity
def cos_sim(a,b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

# apply the UDF to the column
df = df.withColumn("coSim", udf(cos_sim, FloatType())(col("result"), array([lit(v) for v in query_vec])))
df.select("title","coSim","text").orderBy("coSim",ascending= False ).limit(3).show(truncate=False)

+---------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                      |coSim     |text                                                                                                                                                                                                                             |
+---------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Antman                     |0.8806755 |Armed with a super-suit with the astonishing ability to shrink in scale but increase in strength, cat burglar Scott Lang must embrace his inner hero and help h

In [None]:
# create a query as dense vector from input
first_record = query2_bert.selectExpr("explode(finished_embeddings) as result").collect()[0][0]

df = df_bert.selectExpr("title","text","explode(finished_embeddings) as result")
df = df.withColumn("coSim", udf(cos_sim, FloatType())(col("result"), array([lit(v) for v in first_record])))
df.select("title","coSim","text").orderBy("coSim",ascending= False ).limit(3).show(truncate=False)

+-------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title        |coSim     |text                                                                                                                                                                                                                                                                           |
+-------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|The Grudge 2 |0.85794115|Three interwoven stories about a terrible curse - a young woman encounters a 

##**Test Queries Using Extracted Embeddings with Bucketed Random Projection LSH**

As an extra step, we run the BRP LSH algorithm on the small sample dataset to ensure we can retrieve the top 3 approximate nearest neighbors and that the retrieved movies match the output from the Cosine Similarity check above. 

In [None]:
df_brp = df_bert.selectExpr("title","text","explode(finished_embeddings) as result")

In [None]:
brp = BucketedRandomProjectionLSH(inputCol="result", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(df_brp)


query_vec1 = query_bert.selectExpr("explode(finished_embeddings) as result").collect()[0][0]
query_vec2 = query2_bert.selectExpr("explode(finished_embeddings) as result").collect()[0][0]

print("Approximately searching df for 3 nearest neighbors of the key, Iron Man 1:")
model.approxNearestNeighbors(df_brp, query_vec1, 3).show()

print("Approximately searching df for 3 nearest neighbors of the key, Paranormal Activity:")
model.approxNearestNeighbors(df_brp, query_vec2, 3).show()

Approximately searching df for 3 nearest neighbors of the key, Iron Man 1:
+--------------------+--------------------+--------------------+--------------------+------------------+
|               title|                text|              result|              hashes|           distCol|
+--------------------+--------------------+--------------------+--------------------+------------------+
|              Antman|Armed with a supe...|[-0.5918337702751...|[[-1.0], [0.0], [...|4.8372219123421685|
|Avengers: The Age...|When Tony Stark a...|[-0.3572331964969...|[[-1.0], [0.0], [...| 4.947485084898582|
|Spiderman: Homeco...|Peter Parker bala...|[-0.1944843232631...|[[-1.0], [0.0], [...| 5.113532207528354|
+--------------------+--------------------+--------------------+--------------------+------------------+

Approximately searching df for 3 nearest neighbors of the key, Paranormal Activity:
+--------------------+--------------------+--------------------+--------------------+------------------+


##**BRP-LSH Example on Simple Test Data**

The codeblock below shows how BRP-LSH would work on a simple test dataset using vector representations of some information. Since BRP implements an LSH class for Euclidean Distances, we can expect that vectors with a high cosine similarity score will have a low Euclidean Distance metric. This is shown by the derivation below highlighting the relationship between Cosine Similarity and Euclidean Distance for vectors.

In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0, 1.0, 1.0, 1.0]),),
         (1, Vectors.dense([-1.0, -1.0, 1.0, -1.0, -1.0]),),
         (2, Vectors.dense([1.0, -1.0, 1.0, 1.0, -1.0]),),
         (3, Vectors.dense([1.0, -1.0, 1.0, -1.0, -1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

key = Vectors.dense([-1.0, -1.0, 0.0, -1.0, -1.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|[1.0,1.0,1.0,1.0,...|[[0.0], [-1.0], [...|
|  1|[-1.0,-1.0,1.0,-1...|[[-1.0], [-1.0], ...|
|  2|[1.0,-1.0,1.0,1.0...|[[0.0], [-1.0], [...|
|  3|[1.0,-1.0,1.0,-1....|[[0.0], [-1.0], [...|
+---+--------------------+--------------------+

Approximately searching dfA for 2 nearest neighbors of the key:
+---+--------------------+--------------------+----------------+
| id|            features|              hashes|         distCol|
+---+--------------------+--------------------+----------------+
|  1|[-1.0,-1.0,1.0,-1...|[[-1.0], [-1.0], ...|             1.0|
|  3|[1.0,-1.0,1.0,-1....|[[0.0], [-1.0], [...|2.23606797749979|
+---+--------------------+--------------------+----------------+



#**Data Preprocessing and Transformation at Scale**

The codeblocks provided below highlights how the MovieLens dataset can be augmented with movie descriptions from IMDb and TMDb. We strongly recommend making sure you've set up protections to work around anti-scraping systems if you want to run the cells below! We ultimately chose to run scraping offline and then save an augmented dataset to work with for future sections of the notebook. A sample subset of the augmented data is available at https://raw.githubusercontent.com/azraf-a/BERT_SparkNLP_Filter/main/final_data_small.csv and this small subset is used in future portions of this notebook. 

The cells below first show how the scraping process would work for IMDb and then how it would work for TMDb. In both cases we create a joined PySpark Dataframe, formulate a scraping function and then convert that function to UDFs and run it across the initial MovieLens dataset. The only difference is in the id values used, the format of the links generated and the area of the URL we scrape information from.

In [None]:
import os
import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql.functions import concat_ws
from pyspark.sql.types import StructType,IntegerType,StringType
import requests
from bs4 import BeautifulSoup

##**Upload and setup MovieLens data as PySpark Dataframes**

In [None]:
#Schema for links, reading movie id as String because of initial zeroes being ignored when reading as int
schema = StructType() \
      .add("movieId",IntegerType(),True) \
      .add("imdbId",StringType(),True) \
      .add("tmdbId",IntegerType(),True)
links = spark\
.read\
.schema(schema)\
.option("header", "true")\
.csv("links.csv")
movies = spark\
.read\
.option("inferSchema","true")\
.option("header", "true")\
.csv("movies.csv")

##**Perform Join Operation on Links and Movies Dataframes to access imdbId and tmdbId**

In [None]:
final_data=movies.join(links, ['movieId'])
final_data.show(truncate=False)

+-------+-------------------------------------+-------------------------------------------+-------+------+
|movieId|title                                |genres                                     |imdbId |tmdbId|
+-------+-------------------------------------+-------------------------------------------+-------+------+
|1      |Toy Story (1995)                     |Adventure|Animation|Children|Comedy|Fantasy|0114709|862   |
|2      |Jumanji (1995)                       |Adventure|Children|Fantasy                 |0113497|8844  |
|3      |Grumpier Old Men (1995)              |Comedy|Romance                             |0113228|15602 |
|4      |Waiting to Exhale (1995)             |Comedy|Drama|Romance                       |0114885|31357 |
|5      |Father of the Bride Part II (1995)   |Comedy                                     |0113041|11862 |
|6      |Heat (1995)                          |Action|Crime|Thriller                      |0113277|949   |
|7      |Sabrina (1995)              

##**Generate a link column to IMDb or TMDb using the id information**

###**For IMDb**

In [None]:
final_data.printSchema()
final_data=final_data.withColumn("link", concat_ws("",lit("https://www.imdb.com/title/tt"),'imdbId'))
final_data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: integer (nullable = true)
 |-- link: string (nullable = false)



###**For TMDb**

In [None]:
final_data.printSchema()
final_data=final_data.withColumn("link", concat_ws("",lit("https://www.themoviedb.org/movie/"),'tmdbId'))
final_data.printSchema()

##**Create Scraping Functions to retrieve appropriate movie plot descriptions and synopsis**

###**For IMDb**

In [None]:
def get_data_description(URL):
    page = requests.get(URL)
    soup = BeautifulSoup(page.content, "html.parser")
    description = soup.find("span", {"data-testid": "plot-xl"}).text
    description = description.replace(".","")
    description = description+"."
    return description
def get_data_synopsis(URL):
    page = requests.get(URL)
    soup = BeautifulSoup(page.content, "html.parser")
    synopsis = soup.find("div", {"class": "ipc-html-content-inner-div"}).text
    synopsis=synopsis.split("—")
    synopsis=synopsis[0]
    return synopsis

###**For TMDb**

In [None]:
def get_data_description(URL):
    page = requests.get(URL)
    soup = BeautifulSoup(page.content, "html.parser")
    description=""
    try:
      description = soup.findAll("div", {"class": "overview"}).text
      description = description.replace(".","")
      description = description+"."
    except:
      pass
    return description

##**Apply Scraping Functions as UDFs to retrieve data at scale**

In [None]:
desUDF = udf(lambda z: get_data_description(z),StringType())
synUDF = udf(lambda z: get_data_synopsis(z),StringType())
final_data=final_data.withColumn("Description", desUDF(col("link")))
final_data=final_data.withColumn("Synopsis", synUDF(col("link")))
final_data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: integer (nullable = true)
 |-- link: string (nullable = false)
 |-- Description: string (nullable = true)
 |-- Synopsis: string (nullable = true)



In [None]:
final_data.show(3)
final_data.cache()
final_data.printSchema()

+-------+--------------------+--------------------+-------+------+--------------------+--------------------+--------------------+
|movieId|               title|              genres| imdbId|tmdbId|                link|         Description|            Synopsis|
+-------+--------------------+--------------------+-------+------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|0114709|   862|https://www.imdb....|A cowboy doll is ...|A little boy name...|
|      2|      Jumanji (1995)|Adventure|Childre...|0113497|  8844|https://www.imdb....|When two kids fin...|Jumanji, one of t...|
|      3|Grumpier Old Men ...|      Comedy|Romance|0113228| 15602|https://www.imdb....|John and Max reso...|Things don't seem...|
+-------+--------------------+--------------------+-------+------+--------------------+--------------------+--------------------+
only showing top 3 rows

root
 |-- movieId: integer (nullable = true)
 |-- title: string (

In [None]:
final_data.is_cached

True

##**Drop Unnecessary Columns - we only need titles and descriptions**

In [None]:
final_data = final_data.drop('movieId')
final_data = final_data.drop('genres')
final_data = final_data.drop('imdbId')
final_data = final_data.drop('tmdbId')

In [None]:
final_data = final_data.drop('link')
final_data = final_data.drop('Synopsis')

##**Final Schema of the Dataset that is saved**

This final PySpark Dataframe with the titles of movies and plot descriptions is then saved for use with the BERT Pipeline. 

In [None]:
final_data.printSchema()

root
 |-- title: string (nullable = true)
 |-- Description: string (nullable = true)



#**Using Saved Output**

As noted in the final report and the sections above, the rest of the notebook shows the pipeline's performance on a saved augmented dataset. This dataset can be accessed at the GitHub link provided. This contains movie titles and plot descriptions gathered from TMDb. 10 random samples from this dataset is generated and displayed below.

In [None]:
!wget https://raw.githubusercontent.com/azraf-a/BERT_SparkNLP_Filter/main/final_data_small.csv -O final_data_small.csv

final_data = spark\
.read\
.option("inferSchema","true")\
.option("header", "true")\
.csv("final_data_small.csv")

--2022-05-15 18:41:31--  https://raw.githubusercontent.com/azraf-a/BERT_SparkNLP_Filter/main/final_data_small.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.110.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1565040 (1.5M) [text/plain]
Saving to: ‘final_data_small.csv’


2022-05-15 18:41:32 (23.1 MB/s) - ‘final_data_small.csv’ saved [1565040/1565040]



In [None]:
final_data.printSchema()

root
 |-- title: string (nullable = true)
 |-- Description: string (nullable = true)



In [None]:
df = (final_data.withColumn("Description", regexp_replace('Description', '\\.(?=\\s|$)', '')))

df.orderBy(rand()).show(10, truncate=False)

+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                    |Description                                                                                                                                                                                                                                                                                                                                 

#**Extracting Approximate Nearest Neighbors for Queries Across MovieLens Dataset**

##**Generate BERT Pipeline**

In [None]:
documentAssembler = DocumentAssembler() \
    .setInputCol("Description") \
    .setOutputCol("document")
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

embeddings = BertSentenceEmbeddings.pretrained("sent_small_bert_L2_128") \
    .setInputCols(["sentence"]) \
    .setOutputCol("sentence_bert_embeddings")

embeddingsFinisher = EmbeddingsFinisher() \
    .setInputCols(["sentence_bert_embeddings"]) \
    .setOutputCols("finished_embeddings") \
    .setOutputAsVector(True)
    
pipeline = Pipeline().setStages([
    documentAssembler,
    sentence,
    embeddings,
    embeddingsFinisher
])

sent_small_bert_L2_128 download started this may take some time.
Approximate size to download 16.1 MB
[OK!]


##**Generate BucketedRandomProjections LSH Pipeline**

In [None]:
brp = BucketedRandomProjectionLSH( \
                                  inputCol="result", \
                                  outputCol="hashes", \
                                  bucketLength=2.0, \
                                  numHashTables=3)

##**Generate BERT Embeddings Across Processed Dataset with Movie Descriptions**

In [None]:
movielens_bert = pipeline.fit(df).transform(df)
movielens_bert = movielens_bert.selectExpr("title","Description","explode(finished_embeddings) as result")

##**Setup BRP-LSH Model for Approximate Nearest Neighbors**

In [None]:
model = brp.fit(movielens_bert)
movielens_bert.cache()

DataFrame[title: string, Description: string, result: vector]

##**Final Queries: Selecting 3 Random Queries**

Three movies are randomly sampled from the dataset after passing it through the BERT Pipeline. The vectors for these movies are then used as queries into the BRP-LSH Pipeline to generate approximate nearest neighbor searches based on vectorized representations of the movie plots. 

In [None]:
from pyspark.sql.functions import *

random_queries = movielens_bert.orderBy(rand()).limit(3)
random_queries.show()

+--------------------+--------------------+--------------------+
|               title|         Description|              result|
+--------------------+--------------------+--------------------+
|            Fortress|A futuristic pris...|[-0.4393174946308...|
|Inglourious Basterds|"In Nazi-occupied...|[-0.4569685757160...|
|          The Wraith|Packard Walsh and...|[-0.3771486878395...|
+--------------------+--------------------+--------------------+



##**Final Queries: Output**

In the cell below, the first presented movie is the queried film. The three movies afterwards are the 3 approximate nearest neighbors whose vectors have the lowest Euclidean distance from the searched movie's vector.

In [None]:
query_vec4 = movielens_bert.filter(movielens_bert.title == "Gandhi").select("result").collect()[0][0]
query_vec5 = movielens_bert.filter(movielens_bert.title == "The Assassination of Jesse James by the Coward Robert Ford").select("result").collect()[0][0]
query_vec6 = movielens_bert.filter(movielens_bert.title == "Son of God").select("result").collect()[0][0]


print("Approximately searching df for 3 nearest neighbors of query 4 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec4, 4).select("title", "Description").show(truncate=False)

print("Approximately searching df for 3 nearest neighbors of query 5 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec5, 4).select("title", "Description").show(truncate=False)

print("Approximately searching df for 3 nearest neighbors of query 6 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec6, 4).select("title", "Description").show(truncate=False)

Approximately searching df for 3 nearest neighbors of query 4 (The first item listed is the queried movie):
+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title             |Description                                                                                                                                                                                                                                                                                                                                         |
+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------

##**Final Queries: Random Queries**

In [None]:
query_vec1 = movielens_bert.filter(movielens_bert.title == random_queries.collect()[0][0]).select("result").collect()[0][0]
query_vec2 = movielens_bert.filter(movielens_bert.title == random_queries.collect()[1][0]).select("result").collect()[0][0]
query_vec3 = movielens_bert.filter(movielens_bert.title == random_queries.collect()[2][0]).select("result").collect()[0][0]

print("Approximately searching df for 3 nearest neighbors of query 1 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec1, 4).select("title", "Description").show(truncate=False)

print("Approximately searching df for 3 nearest neighbors of query 2 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec2, 4).select("title", "Description").show(truncate=False)

print("Approximately searching df for 3 nearest neighbors of query 3 (The first item listed is the queried movie):")
model.approxNearestNeighbors(movielens_bert, query_vec3, 4).select("title", "Description").show(truncate=False)

Approximately searching df for 3 nearest neighbors of query 1 (The first item listed is the queried movie):
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title         |Description                                                                                                                                                                                                                                                                                                   