In [1]:
import pyspark
import math
import itertools
import os
from decimal import Decimal
from operator import add
from pyspark.sql import Row

sc = pyspark.SparkContext.getOrCreate()
sqlContext = pyspark.sql.SQLContext(sc)

# File Reading into DataFrame

In [2]:
scripts_directory = '/usr/data/movie_scripts/'
files_list = os.listdir(scripts_directory)
files_list_rdd = sc.parallelize(files_list)

In [3]:
def map_script_to_meta(filename):
    if "txt" not in filename:
        return []
    script = open(scripts_directory + filename, 'r', encoding='utf8').read()
    actor, movie = filename[:-4].split("_")
    return [Row(actor=actor, movie_name=movie, script=script)]

scripts_df = files_list_rdd.flatMap(map_script_to_meta).toDF()
scripts_df.toPandas()

Unnamed: 0,actor,movie_name,script
0,Al Pacino,The Humbling,Ten minutes to curtain. Ten minutes. All the w...
1,Adam Sandler,Mixed Nuts,* [ group singing doo-wop ] * [ doo-wop contin...
2,Al Pacino,Donnie Brasco,You're not saying things that mean anything. I...
3,Anthony Hopkins,Instinct,- Are you listening? Are you listening to me? ...
4,Anne Hathaway,Brokeback Mountain,Shit. You pair of deuces lookin' for work... I...
...,...,...,...
221,Anne Hathaway,Bride Wars,- # I found # - # I found # # So many things #...
222,Anne Hathaway,Colossal,(CRICKETS CHIRPING) (GIRL SPEAKING KOREAN) (WO...
223,Arnold Schwarzenegger,Collateral Damage,OCD from engine 35. On scene at 902 Sunnyvale....
224,Angelina Jolie,Hell's Kitchen,"Fuck it! Shut up, all right? Listen, we gotta ..."


# Characters Sanitation and Tokenization

In [4]:
from pyspark.ml.feature import Tokenizer,  RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType


def tokenize(input_col_name: str, tokenized_col_name: str, output_columns: list, with_count: bool,df):    
    tokenizer = Tokenizer(inputCol=input_col_name, outputCol=tokenized_col_name)
    count_tokens = udf(lambda words: len(words), IntegerType())
    tokenized = tokenizer.transform(df)

    regexTokenizer = RegexTokenizer(inputCol=input_col_name, outputCol=tokenized_col_name, pattern="\\W")
    regexTokenized = regexTokenizer.transform(df)

    tokenized = regexTokenized.select(*output_columns)
    if with_count:
        tokenized = tokenized.withColumn("tokens_count", count_tokens(col("tokenized_script")))
    return tokenized

tokenized = tokenize("script", "tokenized_script", ["actor", "movie_name", "script", "tokenized_script"], True, scripts_df)
tokenized.toPandas()

Unnamed: 0,actor,movie_name,script,tokenized_script,tokens_count
0,Al Pacino,The Humbling,Ten minutes to curtain. Ten minutes. All the w...,"[ten, minutes, to, curtain, ten, minutes, all,...",11778
1,Adam Sandler,Mixed Nuts,* [ group singing doo-wop ] * [ doo-wop contin...,"[group, singing, doo, wop, doo, wop, continues...",12118
2,Al Pacino,Donnie Brasco,You're not saying things that mean anything. I...,"[you, re, not, saying, things, that, mean, any...",14465
3,Anthony Hopkins,Instinct,- Are you listening? Are you listening to me? ...,"[are, you, listening, are, you, listening, to,...",4083
4,Anne Hathaway,Brokeback Mountain,Shit. You pair of deuces lookin' for work... I...,"[shit, you, pair, of, deuces, lookin, for, wor...",8189
...,...,...,...,...,...
221,Anne Hathaway,Bride Wars,- # I found # - # I found # # So many things #...,"[i, found, i, found, so, many, things, i, drea...",11401
222,Anne Hathaway,Colossal,(CRICKETS CHIRPING) (GIRL SPEAKING KOREAN) (WO...,"[crickets, chirping, girl, speaking, korean, w...",9671
223,Arnold Schwarzenegger,Collateral Damage,OCD from engine 35. On scene at 902 Sunnyvale....,"[ocd, from, engine, 35, on, scene, at, 902, su...",5943
224,Angelina Jolie,Hell's Kitchen,"Fuck it! Shut up, all right? Listen, we gotta ...","[fuck, it, shut, up, all, right, listen, we, g...",6201


# Stop-Word Sanitation

In [5]:
from pyspark.ml.feature import StopWordsRemover

def remove_stop_words(input_col_name: str, sanitized_col_name: str, output_columns: list, with_count: bool,df):
    stop_word_remover = StopWordsRemover(inputCol=input_col_name, outputCol=sanitized_col_name)
    sanitized = stop_word_remover.transform(df)
    sanitized = sanitized.select(*output_columns)
    if with_count:
        count_tokens = udf(lambda words: len(words), IntegerType())
        sanitized = sanitized.withColumn("sanitized_count", count_tokens(col("sanitized_script")))
    return sanitized

sanitized = remove_stop_words("tokenized_script", "sanitized_script", ["actor", "movie_name", "script", "tokenized_script", "tokens_count", "sanitized_script"], True, tokenized)
sanitized.toPandas()

Unnamed: 0,actor,movie_name,script,tokenized_script,tokens_count,sanitized_script,sanitized_count
0,Al Pacino,The Humbling,Ten minutes to curtain. Ten minutes. All the w...,"[ten, minutes, to, curtain, ten, minutes, all,...",11778,"[ten, minutes, curtain, ten, minutes, world, w...",4956
1,Adam Sandler,Mixed Nuts,* [ group singing doo-wop ] * [ doo-wop contin...,"[group, singing, doo, wop, doo, wop, continues...",12118,"[group, singing, doo, wop, doo, wop, continues...",6133
2,Al Pacino,Donnie Brasco,You're not saying things that mean anything. I...,"[you, re, not, saying, things, that, mean, any...",14465,"[re, saying, things, mean, anything, even, deb...",6557
3,Anthony Hopkins,Instinct,- Are you listening? Are you listening to me? ...,"[are, you, listening, are, you, listening, to,...",4083,"[listening, listening, yes, one, gorilla, see,...",1786
4,Anne Hathaway,Brokeback Mountain,Shit. You pair of deuces lookin' for work... I...,"[shit, you, pair, of, deuces, lookin, for, wor...",8189,"[shit, pair, deuces, lookin, work, suggest, ge...",3969
...,...,...,...,...,...,...,...
221,Anne Hathaway,Bride Wars,- # I found # - # I found # # So many things #...,"[i, found, i, found, so, many, things, i, drea...",11401,"[found, found, many, things, dreamed, dreamed,...",5425
222,Anne Hathaway,Colossal,(CRICKETS CHIRPING) (GIRL SPEAKING KOREAN) (WO...,"[crickets, chirping, girl, speaking, korean, w...",9671,"[crickets, chirping, girl, speaking, korean, w...",4832
223,Arnold Schwarzenegger,Collateral Damage,OCD from engine 35. On scene at 902 Sunnyvale....,"[ocd, from, engine, 35, on, scene, at, 902, su...",5943,"[ocd, engine, 35, scene, 902, sunnyvale, six, ...",3033
224,Angelina Jolie,Hell's Kitchen,"Fuck it! Shut up, all right? Listen, we gotta ...","[fuck, it, shut, up, all, right, listen, we, g...",6201,"[fuck, shut, right, listen, gotta, get, roll, ...",2775


# Selecting Only What Really Matters
And adding an ID column

In [6]:
from pyspark.sql.functions import monotonically_increasing_id, lit, row_number
from pyspark.sql.window import *

# It could be just that way but because monotonically_increasing_id() produces unique values that might be huge,
# its much simpler on the eye to use "row id" as following
# sanitized = sanitized.select(monotonically_increasing_id().alias('id'), "actor", "movie_name", "sanitized_script")


sanitized = sanitized.withColumn("temp_lit",lit("ABC"))
w = Window().partitionBy('temp_lit').orderBy(lit('A'))
sanitized = sanitized.withColumn("id", row_number().over(w)).drop("temp_lit")
sanitized = sanitized.select("id" ,"actor", "movie_name", "sanitized_script")
sanitized.toPandas()


Unnamed: 0,id,actor,movie_name,sanitized_script
0,1,Al Pacino,The Humbling,"[ten, minutes, curtain, ten, minutes, world, w..."
1,2,Adam Sandler,Mixed Nuts,"[group, singing, doo, wop, doo, wop, continues..."
2,3,Al Pacino,Donnie Brasco,"[re, saying, things, mean, anything, even, deb..."
3,4,Anthony Hopkins,Instinct,"[listening, listening, yes, one, gorilla, see,..."
4,5,Anne Hathaway,Brokeback Mountain,"[shit, pair, deuces, lookin, work, suggest, ge..."
...,...,...,...,...
221,222,Anne Hathaway,Bride Wars,"[found, found, many, things, dreamed, dreamed,..."
222,223,Anne Hathaway,Colossal,"[crickets, chirping, girl, speaking, korean, w..."
223,224,Arnold Schwarzenegger,Collateral Damage,"[ocd, engine, 35, scene, 902, sunnyvale, six, ..."
224,225,Angelina Jolie,Hell's Kitchen,"[fuck, shut, right, listen, gotta, get, roll, ..."


# Creating An Inverted Index

In [7]:
def create_index (row):
    index = {}
    for token in row[3]:
        if row[0] not in index.get(token, []):
            if index.get(token):
                index[token].append(row[0])
            else:
                index[token] = [row[0]]
    return index

indexes_per_doc = sanitized.rdd.map(create_index)
print(f"outputs a list of dicts, of the following form: \n "
          "[{'tell': [0], 'hawaii': [0], 'unbelievable': [0], 'oh': [0], 'yeah': [0], 'well': [0]},"
              "'happened': [1], 'met': [1], 'guy': [1], 'best': [1], 'week': [1], 'life': [1]...")

outputs a list of dicts, of the following form: 
 [{'tell': [0], 'hawaii': [0], 'unbelievable': [0], 'oh': [0], 'yeah': [0], 'well': [0]},'happened': [1], 'met': [1], 'guy': [1], 'best': [1], 'week': [1], 'life': [1]...


In [8]:
flattened = indexes_per_doc.flatMap(lambda doc: (doc.items()))
flattened.take(10)

[('ten', [1]),
 ('minutes', [1]),
 ('curtain', [1]),
 ('world', [1]),
 ('stage', [1]),
 ('men', [1]),
 ('women', [1]),
 ('merely', [1]),
 ('players', [1]),
 ('exits', [1])]

In [9]:
inverted_index = flattened.reduceByKey(lambda a, b: a+b)
inverted_index.toDF(["token", "docs"]).toPandas()

Unnamed: 0,token,docs
0,everything,"[1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 1..."
1,sometimes,"[1, 2, 3, 4, 5, 6, 8, 10, 11, 12, 13, 14, 15, ..."
2,10,"[1, 3, 5, 6, 7, 8, 9, 10, 11, 13, 14, 16, 17, ..."
3,step,"[1, 3, 4, 7, 8, 10, 11, 16, 17, 18, 19, 21, 23..."
4,destroyed,"[1, 2, 18, 22, 26, 27, 29, 31, 33, 37, 38, 39,..."
...,...,...
37722,sunbathe,[216]
37723,repentant,"[217, 220]"
37724,uric,[220]
37725,vegetarianism,[220]


# TF
### Term Frequency
How many times a term appeared in each document?

In [10]:
all_tokens = inverted_index.toDF(["token", "docs"]).select("token")
all_tokens.toPandas()

Unnamed: 0,token
0,everything
1,sometimes
2,10
3,step
4,destroyed
...,...
37722,sunbathe
37723,repentant
37724,uric
37725,vegetarianism


In [11]:
def gather_tf(data):
    tf = {}
    for term in data.sanitized_script:
        tf[term] = data.sanitized_script.count(term)
    return (data.id, tf)

tf = sanitized.rdd.map(gather_tf)
print("tf looks like this (not dispalying actual content because its huge!): \n"
     """[(1,
  {'ten': 4,
   'minutes': 8,
   'curtain': 1,
   'world': 5,
   'stage': 15,
   'men': 7,
   'women': 8,
   'merely': 2,
   'players': 4,...""")

tf looks like this (not dispalying actual content because its huge!): 
[(1,
  {'ten': 4,
   'minutes': 8,
   'curtain': 1,
   'world': 5,
   'stage': 15,
   'men': 7,
   'women': 8,
   'merely': 2,
   'players': 4,...


In [12]:
all_tokens_list = all_tokens.rdd.flatMap(lambda x: x).collect()
print(f"Here's the first 10 tokens in the full tokens list: {all_tokens_list[1:10]}")

Here's the first 10 tokens in the full tokens list: ['sometimes', '10', 'step', 'destroyed', 'led', 'depressed', 'solve', 'healthy', 'whimpering']


In [13]:
all_tokens_bc = sc.broadcast(all_tokens_list)

In [14]:
def map_to_huge_tuple(row):
    word_count_list = [('doc_id', [row[0]])]
    for token in all_tokens_bc.value:
        word_count_list.append((token, [row[1].get(token,0)]))
    return word_count_list


In [15]:
full_vocabulary_tf_per_doc = tf.map(map_to_huge_tuple)
print(""" The full_vocabulary_tf_per_doc looks like this (not displaying all of it because of it's size): \n
[[('doc_id', [1]),
  ('everything', [10]),
  ('sometimes', [3]),
  ('10', [2]),
  ('step', [1]),
  ('destroyed', [1]),
  ('led', [1]),
  ('depressed', [0]),
  ('solve', [0]),
  ('healthy', [0]),
  ('whimpering', [0]),
  ('horrible', [0]),
  ('orange', [0]),...
""")

 The full_vocabulary_tf_per_doc looks like this (not displaying all of it because of it's size): 

[[('doc_id', [1]),
  ('everything', [10]),
  ('sometimes', [3]),
  ('10', [2]),
  ('step', [1]),
  ('destroyed', [1]),
  ('led', [1]),
  ('depressed', [0]),
  ('solve', [0]),
  ('healthy', [0]),
  ('whimpering', [0]),
  ('horrible', [0]),
  ('orange', [0]),...



In [16]:
full_vocabulary_tf_per_doc = full_vocabulary_tf_per_doc.flatMap(lambda x: x)

In [17]:
doc_ids = full_vocabulary_tf_per_doc.filter(lambda x: x[0] == 'doc_id').reduceByKey(lambda a, b: a + b).collect()
doc_ids_list = doc_ids[0][1]
doc_ids_list = [str(i) for i in doc_ids_list]

In [18]:
tf_rdd = full_vocabulary_tf_per_doc.reduceByKey(lambda a, b: a + b).map(lambda x: (x[0], *x[1]))

In [19]:
tf_table = tf_rdd.toDF(['token'] + doc_ids_list)

# The final TF Table:
each cell describes the term frequency of a term (row) in a given document (column)

In [20]:
tf_table.toPandas()

Unnamed: 0,token,1,2,3,4,5,6,7,8,9,...,217,218,219,220,221,222,223,224,225,226
0,everything,10,6,11,0,1,5,9,7,4,...,5,8,4,7,4,8,10,3,5,9
1,sometimes,3,1,4,1,2,2,0,3,0,...,2,1,3,1,1,5,0,0,2,1
2,10,2,0,1,0,1,1,1,3,1,...,12,4,1,2,3,4,1,2,1,1
3,step,1,0,2,1,0,0,1,1,0,...,1,2,0,3,2,1,0,0,1,2
4,destroyed,1,1,0,0,0,0,0,0,0,...,0,0,0,0,1,0,0,0,0,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37723,sunbathe,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
37724,repentant,0,0,0,0,0,0,0,0,0,...,1,0,0,1,0,0,0,0,0,0
37725,uric,0,0,0,0,0,0,0,0,0,...,0,0,0,1,0,0,0,0,0,0
37726,vegetarianism,0,0,0,0,0,0,0,0,0,...,0,0,0,2,0,0,0,0,0,0


# IDF
#### Essentially, a measure that helps us understand the importancy of a term
log (corpus size / 1 + document frequency of the term)
![](https://wikimedia.org/api/rest_v1/media/math/render/svg/ac67bc0f76b5b8e31e842d6b7d28f8949dab7937 "IDF equation")

Here we add 1 to the document frequency of the term in order to the avoid 0 devision

In [21]:
def counter(row):
    c = 0
    for i in row[1]:
        if i != 0:
            c += 1
    return row[0], c


idf_table = inverted_index.map(counter).map(lambda x: (x[0], x[1], math.log(226/(1+x[1])))).toDF(["token", "docs_count", "idf"])
idf_table.toPandas()

Unnamed: 0,token,docs_count,idf
0,everything,219,0.026907
1,sometimes,165,0.308547
2,10,152,0.390097
3,step,126,0.576348
4,destroyed,40,1.706963
...,...,...,...
37722,sunbathe,1,4.727388
37723,repentant,2,4.321923
37724,uric,1,4.727388
37725,vegetarianism,1,4.727388


# TF-IDF Table
Each cell in the table describes the TF-IDF value of a term (row) in a document (column) <br>
It is essentially the TF value taken from the TF table, multiplied by the term's IDF value <br> <br>

As you can see, the vast majority of the values are zeros which makes the columns Sparse Vectors. <br>
Every column vector is a vector that describes the document.

![](https://wikimedia.org/api/rest_v1/media/math/render/svg/10109d0e60cc9d50a1ea2f189bac0ac29a030a00 "tfIDF equation")

In [22]:
tfidf_table = tf_table.join(idf_table, ["token"])


def calculate_tfidf(row):
    result = [row[0]]
    for i in range(1, len(row)-2):
        result.append(row[i]*row[-1])
    result.extend(row[-2:])
    return result

tfidf_table = tfidf_table.rdd.map(calculate_tfidf).toDF(['token'] + doc_ids_list + ["docs_count", "idf"])
tfidf_table.toPandas()


Unnamed: 0,token,1,2,3,4,5,6,7,8,9,...,219,220,221,222,223,224,225,226,docs_count,idf
0,1970s,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
1,296,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
2,57th,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
3,675,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
4,829,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37722,wack,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4,3.811097
37723,wane,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388
37724,weed,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,15,2.647946
37725,widening,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1,4.727388


# Cosine similarity:
A measurement that describes the distance between every two vectors. It's forumula is: <br>

![](https://wikimedia.org/api/rest_v1/media/math/render/svg/1d94e5903f7936d3c131e040ef2c51b473dd071d "Cosine Similarity")


In [23]:
from math import sqrt

def cosine_similarity(v1, v2):
    dot_product = 0
    v1i_sqr = 0
    v2i_sqr = 0
    
    for i in range(len(v1)):
        dot_product += v1[i] * v2[i]
    
    for i in range(len(v1)):
        v1i_sqr += v1[i] ** 2
        v2i_sqr += v2[i] ** 2
    
    v1i_sqr = sqrt(v1i_sqr)
    v2i_sqr = sqrt(v2i_sqr)
    
    return dot_product / (v1i_sqr * v2i_sqr)
    

In [24]:
from pyspark.mllib.linalg import Vectors

column_names = tfidf_table.schema.names[1:-4]
vectors_list = []
for column_name in column_names:
    vectors_list.append(Vectors.dense(tfidf_table.select(column_name).rdd.map(lambda x: x[0]).collect()))

In [25]:
def get_max_cos_similarity_for_every_doc():
    max_similarity_per_doc = []
    for i,v1 in enumerate(vectors_list):
        top_5_list = []
        for j,v2 in enumerate(vectors_list):
            if i==j: continue
            cos = cosine_similarity(v1, v2)
            if len(top_5_list) < 5 or any([cos > tup[0] for tup in top_5_list]):
                top_5_list.append((cos, i, j))
            if len(top_5_list) > 5: top_5_list.remove(min(top_5_list))
        max_similarity_per_doc.append(top_5_list)
    return max_similarity_per_doc

max_similarity_per_doc = get_max_cos_similarity_for_every_doc()

In [26]:
all_highest_similarities = []
for top5_of_doc in max_similarity_per_doc:
        all_highest_similarities += top5_of_doc
print(f"Here is a taste of the highest similarities of each doc: \n {all_highest_similarities[0:12]}")

Here is a taste of the highest similarities of each doc: 
 [(0.07878699169387886, 0, 49), (0.07468789964132368, 0, 60), (0.052398588254967, 0, 100), (0.07571166555522885, 0, 162), (0.0523730593905043, 0, 173), (0.1333927725566076, 1, 72), (0.11232127827208745, 1, 76), (0.08447551046251867, 1, 92), (0.08916196906111647, 1, 110), (0.19917297488347485, 1, 218), (0.19880983714233572, 2, 29), (0.059779197390971976, 2, 69)]


In [27]:
def find_top_5():
    sorted_similarities = sorted(all_highest_similarities, key=lambda x: x[0], reverse=True)
    top_10_with_duplicates = sorted_similarities[:10]
    # Since every similarity will appear twice (permutaions of a couple), we just take the even-indexed similarities
    return [sim for i, sim in enumerate(top_10_with_duplicates) if i%2 ==0]
print(find_top_5())

[(1.0, 108, 204), (0.9989728021326493, 100, 173), (0.9771074450258495, 181, 205), (0.9747325030154703, 132, 205), (0.9705073954735131, 132, 181)]


# An alternative method that brings less data to the driver
### I chose to stick with the first method as it was already ran (and it took the time 🕓)

In [28]:
"""
    import pyspark.sql.functions as func

    def cosine_similarity_alternative(df, col1, col2):
        df_cosine = df.select(func.sum(df[col1] * df[col2]).alias('dot'), 
                              func.sqrt(func.sum(df[col1]**2)).alias('norm1'), 
                              func.sqrt(func.sum(df[col2] **2)).alias('norm2'))
        d = df_cosine.rdd.collect()[0].asDict()
        return d['dot']/(d['norm1'] * d['norm2'])

    results = []
    for i in range(1, 15):
        for j in range(1, 15):
            if i == j: continue
            results.append(cosine_similarity_alternative(tfidf_table, str(i), str(j)))
    results
"""

"\n    import pyspark.sql.functions as func\n\n    def cosine_similarity_alternative(df, col1, col2):\n        df_cosine = df.select(func.sum(df[col1] * df[col2]).alias('dot'), \n                              func.sqrt(func.sum(df[col1]**2)).alias('norm1'), \n                              func.sqrt(func.sum(df[col2] **2)).alias('norm2'))\n        d = df_cosine.rdd.collect()[0].asDict()\n        return d['dot']/(d['norm1'] * d['norm2'])\n\n    results = []\n    for i in range(1, 15):\n        for j in range(1, 15):\n            if i == j: continue\n            results.append(cosine_similarity_alternative(tfidf_table, str(i), str(j)))\n    results\n"

# Queries and search agains the vectors base
#### Here I implemented a search function that takes a search term as a string and K int value and returns the *closest documents

*closest by cosine similarity<br>


In [29]:
import pyspark.sql.functions as F


def search(search_term: str, k_results = 10):
    results = []
    search_vector = prepare_for_search(search_term)
    for i, v2 in enumerate(vectors_list):
        results.append((i, cosine_similarity(search_vector, v2)))
    results = sorted(results, key=lambda x: x[1], reverse=True)
    return results[:k_results]
    
def prepare_for_search(search_term: str):  
    cleaned = clean(search_term)
    return vectorize(cleaned)
    
def clean(search_term):    
    term = search_term.split(" ")
    term_df = sc.parallelize(term).map(lambda x: [x]).toDF(["text"])
    tokenized_search = tokenize("text", "tokens", ["tokens"], False, term_df)
    cleared_term = remove_stop_words("tokens", "clear_text", ["clear_text"], False, tokenized_search)
    cleared_term = cleared_term.rdd.map(lambda x: x[0]).filter(lambda x: len(x) != 0).flatMap(lambda list: list)
    return cleared_term.collect()

def vectorize(cleaned_term: list):
    result = tfidf_table.withColumn("score", F.when((tfidf_table.token.isin(cleaned_term)), F.col("idf")).otherwise(0))
    result = result.select("score")
    return Vectors.dense(result.rdd.map(lambda x: x[0]).collect())
    
    
    
ten_terms  = {
    "pacino_1" : """Look at me. Look at the kite. Jack... Oh, I've seen that outfit before. I can't remember when. - Last night. - Last night! These things do return. God, my head is still pounding like a drum. You party hard. Yeah, well, you don't look worse than I am. I didn't drink as much as you. History of my life, Sara. Psychiatrist by day, party animal by night. I thought you've told me you were a lawyer. I'm not, yet. Excuse me. - Yeah. - Hey, Jack. It's Shelly. - I've been calling all morning. You OK? - I'm Ok. What's up? I was worried. You worry too much, Shelly. What's going on? Frank Parks is been calling. He needs to speak with you immediately. Pass him through. I had fun last night, Miss. Pollard. Me too. Thanks for the wine tasting, Dr. Gramm. Jack. Hey, Jack, I've got Frank Parks. Go ahead. - Hey, Frank. - Jack. We got another one. Oh, no. The same? Every detail. Right down to the lateral laceration. It's the Seattle slayer again. - Where are you now? - Look, Jack. There's a tape. You're gonna wanna see it. Jack, are you still there? Yeah. Meet me at my office, okay? Sure, Jack. Where to? 114 Western, please. Quickly. Welcome back, Seattle. We're continuing our interview with John Forster... who was convicted for the death of Janie Kay, nine years ago... based on the eyewitness testimony of her twin sister, Janie Kay. John Forster was dated to die at midnight, at Walla Walla Penitentiary. - Do you mind changing it? - John Forster. What's your reaction to the State... Thank you. Idiot! Congratulations! For what? I was referring to the hottie that you left with the party last night. She was a quite piece of ass. - Remember her name? - Yes, I do. Sara Pollard. Is that a cut on your nose? You didn't have that before I left. I fell out of the bed. FBI hit here yet? - Yeah, 2 minutes in the conference room. - Okay. Any calls? New York Times, Washington times, Newsweek. They all wanna know if you have a quote about Forster's execution. - What else is new? - Kim Cummings called twice. - She was worried about you too. - Kim knows this procedure. Apparently not.""",
    "amy_adams_2" : """Play one of the best new FPS shooters, search Steam for PROJECT WARLOCK RADIO ANNOUNCER: All news, all the time. This is WINS. You give us 22 minutes, we'll give you the world. REPORTER: Good evening. It's 42 degrees at 5:00 and here's what's happening. Mayor Koch urges the talks to resume between the city and unions, but he says the unions have to give in a bit. The PBA negotiator says he wants real salary increases. Nassau police ask an 18-year-old mother why she abandoned her child. A federal judge rules an accused war criminal must testify in his deportation hearing. A House committee tells the president it does not like proposed Mideast arms sales. The news watch never stops! This is WINS. WINS news time, 5:02. Contract talks have ended temporarily between the city and its labor union with both sides making some angry charges. ANNOUNCER: The municipal labor unions broke off talks demanding that the city withdraw what they call their outrageous and irresponsible contract proposals. The Patrolmen's Benevolent Association and the Uniformed Firefighters Association are the two unions not participating in the coalition bargaining with the city, and the PBA chief negotiator says he wants some solid gains for the men in blue. A bankrupt New York City can't afford to lose money, but city officials say it did, between two and five million dollars in the first two months of 1978 alone... (A HORSE WITH NO NAME BY AMERICA PLAYING) ♪ On the first part of the journey I was lookin' at all the life ♪ There were plants and birds and rocks and things ♪ There was sand and hills and rings ♪ The first thing I met was a fly with a buzz ♪ And the sky with no clouds ♪ The heat was hot and the ground was dry but the air was full of sound ♪ CARL: I do all kinds of business with him. He knows Carl Elway. He knows exactly what he's getting into. """,
    "jolie_3" : """Hrothgar! Hrothgar! Hrothgar! Hrothgar! Hrothgar! Hrothgar! I want mead! Give me some mead, my Queen! Thank you, my beautiful Queen. Hrothgar! Hrothgar! Hrothgar! This is how it works, Aesher. After you die, you wouldn't really be dead providing you have accepted him as the one and only God. All right, back! Back! Here, my beauty, give me a kiss. I want a kiss! Give me a kiss! I want a kiss! Please, stop it! More! My thanes, my beautiful thanes! One year ago, I, Hrothgar, your King swore that we would celebrate our victories in a new hall, mighty and beautiful! Have I not kept my oath? Yeah. In this hall, we shall divide the spoils of our conquests, the gold and the treasure. And this shall be a place of merriment, joy, and fornication! From now until the end of time, I name this hall Herot! Treasure! Let's hand out some treasure! Give me some of that! From my conquests! Unferth! For Unferth, for Unferth, my wisest advisor, violator of virgins and best and bravest of brave brawlers. Unferth, where the hell are you, you weasel-faced bastard? I'm here, my King. Unferth, come here, you ungrateful lout! Hrothgar! Hrothgar! Hrothgar! Hrothgar! Hrothgar! Hrothgar! He faced a demon dragon When other men would freeze And then, my lords, he took his sword And brought it to its knees... Hrothgar! Hrothgar! The greatest of our kings He broke the dragon's wings Hrothgar! Hrothgar! The kingdom fell in darkness And shadows ruled the night With no sign of dawn, he soldiered on And brought us back to life Hrothgar! Hrothgar! He never shook your faith Hrothgar! Hrothgar! Let every cup be raised Hrothgar! Hrothgar! He offered us protection When monsters roamed the land And one by one, he took them on They perished at his hand Mead! Mead! Mead! You're spilling it. Where's my mead? You're spilling it. You're spilling it! Cain, you clumsy idiot! How dare you waste the King's mead? He rose up like a savior When every hope was gone The beast was gored and peace restored His memory will live on Hrothgar! Hrothgar! Let every cup be raised Hrothgar! Hrothgar! Now and forever A sword! Give me a sword! Come! Arm yourselves! Stay down, my Lady! - Give me a sword! A sword! - My Lord! No! Fight me! Fight me! Fight me. You fight me, damn you. Nay. What was that? Grendel. Grendel, what have you done? What have you done, Grendel? Fish and wolf and bear and sheep or two, ac nan men. Men, Grendel. They have slain so many of our kind. Was Hrothgar there?""",
    "hathaway_4" : """Charles, you have finally lost your senses. This venture is impossible. For some. Gentlemen, the only way to achieve the impossible is to believe it is possible. That kind of thinking could ruin you. I'm willing to take that chance. Imagine trading posts in Rangoon, Bangkok, Jakarta... The nightmare again? I won't be long. I'm falling down a dark hole, then I see strange creatures. What kind of creatures? Well, there's a dodo bird, a rabbit in a waistcoat, a smiling cat. I didn't know cats could smile. Neither did I. And there's a blue caterpillar. Blue caterpillar. Do you think I've gone round the bend? I'm afraid so. You're mad, bonkers, off your head. But I'll tell you a secret. All the best people are. It's only a dream, Alice. Nothing can harm you there. But if you get too frightened, you can always wake up. Like this. Ow! Must we go? Doubt they'll notice if we never arrive. They will notice. Where's your corset? And no stockings. I'm against them. But you're not properly dressed. Who's to say what is proper? What if it was agreed that "proper" was wearing a codfish on your head? - Would you wear it? - Alice. To me, a corset is like a codfish. Please, not today. Father would have laughed. I'm sorry. I'm tired. I didn't sleep well last night. Did you have bad dreams again? Only one. It's always the same, ever since I can remember. Do you think that's normal? Don't most people have different dreams? I don't know. There. You're beautiful. Now, can you manage a smile? At last. We thought you'd never arrive. Alice, Hamish is waiting to dance with you. Go. You do realize it's well past 4:00. Now everything will have to be rushed through. - I am sorry. - Oh, never mind! Forgive my wife. She's been planning this affair for over 20 years. If only Charles were here... My condolences. I think of your husband often. He was truly a man of vision. I hope you don't think I've taken advantage of your misfortunes. Of course not. I'm pleased that you purchased the company. I was a fool for not investing in his mad venture when I had the chance. Charles thought so, too. Hamish, do you ever tire of quadrille? On the contrary. I find it invigorating.""",
    "hopkins_5" : """Some people hear their own inner voices... with great clearness... and they live by what they hear. Such people become crazy... or they become legends. Tristan Ludlow was born in the Moon of the Falling Leaves. It was a terrible winter. His mother almost died bringing him into this world. His father, the Colonel, brought him to me. I wrapped him in a bear skin and held him all that night. As he grew into a man... I taught him the great joy of the kill... when the hunter cuts out its warm heart and hold sit in his hands... setting its spirit free. Colonel Ludlow had three sons... but Tristan was his favorite. I had had sons too. But they were gone now... forever. It was a very bad time. The Colonel had tried to help the People... but it was no use. So he decided to go his own way. He wanted to lose the madness over the mountains, he said... and begin again. "Lose the madness", he said. And so we lived for many years... and the boys grew strong. Alfred was the older brother... old even for his years. Samuel was the youngest. There was nothing these brothers would not do for him. They watched over him like a treasure. One year-- I am an old man and cannot remember the year. But it was the Moon of the Red Grass... when Isabel Ludlow, their mother... went away for the winter. She said the winters were too cruel for her. She said she was afraid of the bears. She was a strange woman anyway. That spring, though, she did not return. And, after that, she did not come much to see us. Alfred wrote her many letters... but Tristan refused to speak of her. His world was here with me. Every warrior hopes a good death will find him... but Tristan couldn't wait. He went looking for his. Tristan! Here. - Was it a bear? - Yes, sir. - Can you breathe? """,
    
    "autocad_wiki_page_6" : """AutoCAD is a commercial computer-aided design (CAD) """,
    "ynet_hebrew_article_7" : """ בימים האחרונים חודשו כאמור הפרחות הבלונים וזוהו עשרות בלוני נפץ שהופרחו לעבר עוטף עזה. הבוקר אותר צרור בלונים עם רימון יד מאולתר באזור קיבוץ ניר עוז שבחבל אשכול. לפי רשות הכיבוי, ביום חמישי פרצו לפחות שלוש שריפות באזור מועצת אשכול בשל בלוני תבערה. מרשות הטבע והגנים נמסר כי כ-300 דונמים עלו באש בשמורת באר .אחרי התקיפה כתב שר הביטחון בני גנץ בטוויטר: "מדינת ישראל לא תקבל שום הפרת ריבונות ופגיעה בתושבי""",
    "data_mining_wiki_8" : """Data mining is a process of discovering patterns in large data sets """,
    "sharp_pc_wiki9" : """ The Sharp PC-E500S was a 1995 pocket computer by Sharp Corporation """,
    "simple": "my name is eliran shem tov"
}

ten_results = []
for term in ten_terms.values():
    ten_results.append(search(term, 10))


# Search Results:
Unfortunately the results doesn't show the uniquness that I desired. I thought that every actor's movies uses some sort of unique vocabulary. As you can see, in some cases (Ann hathaway for example) it is kind of correct but not in a strict manner that I hoped for.

In [30]:
aggregated_results = []
for i, result in enumerate(ten_results):
    aggregated_results += [(i+1, doc_id) for doc_id, vector in result]

agg_df = sc.parallelize(aggregated_results).toDF(["search_id", "doc_id"])
agg_df.join(sanitized, agg_df.doc_id == sanitized.id).select("search_id", "doc_id", "actor").orderBy("search_id").collect()

[Row(search_id=1, doc_id=167, actor='Angelina Jolie'),
 Row(search_id=1, doc_id=130, actor='Anne Hathaway'),
 Row(search_id=1, doc_id=184, actor='Anne Hathaway'),
 Row(search_id=1, doc_id=136, actor='Anthony Hopkins'),
 Row(search_id=1, doc_id=139, actor='Arnold Schwarzenegger'),
 Row(search_id=1, doc_id=179, actor='Angelina Jolie'),
 Row(search_id=1, doc_id=125, actor='Angelina Jolie'),
 Row(search_id=1, doc_id=21, actor='Al Pacino'),
 Row(search_id=1, doc_id=165, actor='Anne Hathaway'),
 Row(search_id=1, doc_id=93, actor='Arnold Schwarzenegger'),
 Row(search_id=2, doc_id=130, actor='Anne Hathaway'),
 Row(search_id=2, doc_id=77, actor='Al Pacino'),
 Row(search_id=2, doc_id=50, actor='Arnold Schwarzenegger'),
 Row(search_id=2, doc_id=9, actor='Angelina Jolie'),
 Row(search_id=2, doc_id=85, actor='Anne Hathaway'),
 Row(search_id=2, doc_id=179, actor='Angelina Jolie'),
 Row(search_id=2, doc_id=101, actor='Al Pacino'),
 Row(search_id=2, doc_id=164, actor='Amy Adams'),
 Row(search_id=2, do

# Actors as categories
7 actors --> 7 categories

In [31]:
sanitized.select("id", "actor").orderBy(["actor"]).toPandas()

Unnamed: 0,id,actor
0,2,Adam Sandler
1,12,Adam Sandler
2,24,Adam Sandler
3,26,Adam Sandler
4,30,Adam Sandler
...,...,...
221,201,Arnold Schwarzenegger
222,207,Arnold Schwarzenegger
223,208,Arnold Schwarzenegger
224,221,Arnold Schwarzenegger


In [32]:
import pyspark.sql.functions as func

sanitized.select(func.countDistinct("actor")).show()

+---------------------+
|count(DISTINCT actor)|
+---------------------+
|                    7|
+---------------------+



# K Means
We have 7 actors, thats 7 groups. I expect 7 distinct clusters<br>
First we randomly pick K centers (7 in our case) <br>
Then we can go over all of our docs vectors and match to a center.<br>
then we calculate the average of all the vectors that are assigned to each center.<br>
Next we set those averages as our new centers.<br> <br>

We will go over this process over and over again until the centers will stop changing. The centers we'll get are our K means! :)

In [33]:
import numpy as np


# vectors is an rdd of (index(doc id), vector)
def kmeans(vectors, K):
    centers =randomly_pick_centers(vectors, K)
    change_center = True
    while change_center:
        change_center = False
        closest_centers = vectors.map(lambda v: (find_nearest_center(v[1], centers), (v[1], 1, v[0])))
        pointStats = closest_centers.reduceByKey(lambda v1, v2: (v1[0] + v2[0], v1[1] + v2[1]))
        newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()
        
        for (iK, v) in newPoints:
            if v != centers[iK]:
                centers[iK] = v
                change_center = True
    return centers

def find_nearest_center(v1, centers):
    best_center_index = 0
    closest_center = float('inf')
    for i, c in enumerate(centers):
        subtraction_vector = v1 - c[1]
        current_distance = np.sum(subtraction_vector.dot(subtraction_vector))
        current_distance = 1
        if current_distance < closest_center:
            closest_center = current_distance
            best_center_index = i
    return best_center_index
            

def randomly_pick_centers(vectors, centers_amount):
    """
    Uses Pyspark's RDD "takeSample" methid which returns a fixed-size sampled subset
    of an RDD.
        note:: This method should only be used if the resulting array is expected
            to be small, as all the data is loaded into the driver's memory.
            
    Here I'm using it as we are dealing with a relatively small data set and its an 
    Academic project
    """
    return vectors.takeSample(False, centers_amount, 1)


In [34]:
kmeans_result = kmeans(sc.parallelize([(i,v) for i, v in enumerate(vectors_list)]), 7)
kmeans_result.collect()

[(0,
  (DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.8652, 0.0, 0.0, 0.0, 2.9005, 0.0, 0.0, 0.0, 0.0, 0.1578, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.6479, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.5873, 4.0342, 0.0, 0.0, 0.0, 0.2422, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.3262, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 

# Assign Every vector to it's closest center
By doing that final step, we assign every document to a "cluster" <br>
I expected to see 7 distinct cluster under my assumption that every actor has a unique vocabulary in his movies.  As we saw already, this assumption was not correct.

In [35]:
clusters = []
for i, v in enumerate(vectors_list):
    center = (0, float("inf"))
    for j, c in enumerate(kmeans_result):
        tempcos = cosine_similarity(v, c)
        if tempcos < center [1]:
            center = (j, tempcos)
    clusters.append((i+1, center[0]))

TypeError: 'PipelinedRDD' object is not iterable

In [None]:
clusters_df = sc.parallelize(clusters).toDF(["doc_id", "cluster_id"])
clusters_df.join(sanitized, clusters_df.doc_id == sanitized.id).select("doc_id", "cluster_id", "actor")