In [0]:
movieText = sc.textFile("dbfs:/FileStore/tables/plot_summaries.txt")

movieText.collect()

Out[2]: ["23890098\tShlykov, a hard-working taxi driver and Lyosha, a saxophonist, develop a bizarre love-hate relationship, and despite their prejudices, realize they aren't so different after all.",
 '31186339\tThe nation of Panem consists of a wealthy Capitol and twelve poorer districts. As punishment for a past rebellion, each district must provide a boy and girl  between the ages of 12 and 18 selected by lottery  for the annual Hunger Games. The tributes must fight to the death in an arena; the sole survivor is rewarded with fame and wealth. In her first Reaping, 12-year-old Primrose Everdeen is chosen from District 12. Her older sister Katniss volunteers to take her place. Peeta Mellark, a baker\'s son who once gave Katniss bread when she was starving, is the other District 12 tribute. Katniss and Peeta are taken to the Capitol, accompanied by their frequently drunk mentor, past victor Haymitch Abernathy. He warns them about the "Career" tributes who train intensively at special 

In [0]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import explode, udf, sum, sqrt, col
import re
import math



In [0]:
movieSummary = sc.textFile("dbfs:/FileStore/tables/plot_summaries.txt")
# movieString = ' '.join(movieSummary)

# changing to lowercaser
movieSummary = movieSummary.map(lambda x: x.lower())

# Splitting the summaries
movieSummaryList = movieSummary.map(lambda y: y.split("\t"))

#Removing punctuations
movieSummaryPuncRemoved = movieSummaryList.map(lambda x: (x[0],re.sub(r'[^\w\s]', '', x[1]).split(' ')))

movieSummaryPuncRemoved.collect()





Out[2]: [('23890098',
  ['shlykov',
   'a',
   'hardworking',
   'taxi',
   'driver',
   'and',
   'lyosha',
   'a',
   'saxophonist',
   'develop',
   'a',
   'bizarre',
   'lovehate',
   'relationship',
   'and',
   'despite',
   'their',
   'prejudices',
   'realize',
   'they',
   'arent',
   'so',
   'different',
   'after',
   'all']),
 ('31186339',
  ['the',
   'nation',
   'of',
   'panem',
   'consists',
   'of',
   'a',
   'wealthy',
   'capitol',
   'and',
   'twelve',
   'poorer',
   'districts',
   'as',
   'punishment',
   'for',
   'a',
   'past',
   'rebellion',
   'each',
   'district',
   'must',
   'provide',
   'a',
   'boy',
   'and',
   'girl',
   '',
   'between',
   'the',
   'ages',
   'of',
   '12',
   'and',
   '18',
   'selected',
   'by',
   'lottery',
   '',
   'for',
   'the',
   'annual',
   'hunger',
   'games',
   'the',
   'tributes',
   'must',
   'fight',
   'to',
   'the',
   'death',
   'in',
   'an',
   'arena',
   'the',
   'sole',
   'survivor'

In [0]:
#removing stop words
movieSummaryFrame = spark.createDataFrame(movieSummaryPuncRemoved, ["id" , "movieSummary"])

movieSummaryFrame.show(2)


+--------+--------------------+
|      id|        movieSummary|
+--------+--------------------+
|23890098|[shlykov, a, hard...|
|31186339|[the, nation, of,...|
+--------+--------------------+
only showing top 2 rows



In [0]:
# Removing stopwords
stopWord = StopWordsRemover(inputCol="movieSummary", outputCol="summaryWithoutStopWords")
summaryWithoutStopWords = stopWord.transform(movieSummaryFrame).select("id", "summaryWithoutStopWords")

summaryWithoutStopWords.show(4)

+--------+-----------------------+
|      id|summaryWithoutStopWords|
+--------+-----------------------+
|23890098|   [shlykov, hardwor...|
|31186339|   [nation, panem, c...|
|20663735|   [poovalli, induch...|
| 2231378|   [lemon, drop, kid...|
+--------+-----------------------+
only showing top 4 rows



In [0]:
# Transforming into key value pairs

summaryKeyValuePair = summaryWithoutStopWords.select(summaryWithoutStopWords.id, explode(summaryWithoutStopWords.summaryWithoutStopWords))
summaryKeyValuePair.show(6)

+--------+-----------+
|      id|        col|
+--------+-----------+
|23890098|    shlykov|
|23890098|hardworking|
|23890098|       taxi|
|23890098|     driver|
|23890098|     lyosha|
|23890098|saxophonist|
+--------+-----------+
only showing top 6 rows



In [0]:
# counting key value pair freq pairwise

summaryKeyValuePairRdd = summaryKeyValuePair.rdd

summaryKeyValuePairFreq = summaryKeyValuePairRdd.map(lambda x: ((x[0], x[1]) ,1)).reduceByKey(lambda x,y : x+y)

# calculating term freq

summaryTermFreq = summaryKeyValuePairFreq.map(lambda y: (y[0][1] , (y[0][0],y[1])))

summaryTermFreq.take(5)



Out[6]: [('mr', ('1952976', 2)),
 ('ceiling', ('1952976', 2)),
 ('player', ('2462689', 1)),
 ('people', ('2462689', 2)),
 ('somewhat', ('20532852', 1))]

In [0]:
# calculating inverse document frequency

# finding total word count 
totalWordCount = summaryKeyValuePair.count()

summaryWordTermFreq  = summaryKeyValuePairFreq.map(lambda x: (x[0][1] ,1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0], math.log(totalWordCount/x[1])))

# summaryWordTermFreq.collect()

In [0]:
# calculating TF.IDF term weighting

summaryWordTfIdf = summaryTermFreq.join(summaryWordTermFreq)

summaryWordTfIdf.take(10)

Out[8]: [('hunter', (('3150394', 1), 9.746777457788987)),
 ('hunter', (('74830', 1), 9.746777457788987)),
 ('hunter', (('21620211', 1), 9.746777457788987)),
 ('hunter', (('4405649', 1), 9.746777457788987)),
 ('hunter', (('26138411', 4), 9.746777457788987)),
 ('hunter', (('2159349', 1), 9.746777457788987)),
 ('hunter', (('2294746', 1), 9.746777457788987)),
 ('hunter', (('17124781', 1), 9.746777457788987)),
 ('hunter', (('2276474', 1), 9.746777457788987)),
 ('hunter', (('4040311', 1), 9.746777457788987))]

In [0]:
# evaluating product of tf.idf

termWeight = summaryWordTfIdf.map(lambda y: (y[1][0][0] , (y[0] , y[1][0][1] , y[1][1] , y[1][0][1]*y[1][1])))
                                  

In [0]:
# loading movie names and joining with term weight  

movieNamesData = sc.textFile("/FileStore/tables/movie_metadata.tsv")
movieNames = movieNamesData.map(lambda x: x.split('\t')).map(lambda x: (x[0],x[1],x[2])).map(lambda x: (x[0],x[2]))
movieNameWithWeights = termWeight.join(movieNames)

movieNameWithWeightsFrame = movieNameWithWeights.map(lambda x: (x[1][0][0], x[1][1] ,x[0],x[1][0][1],x[1][0][2],x[1][0][3])).toDF(["term","name","id","tf","idf","tf-idf"])
movieNameWithWeightsFrame.show(5)

+----------+------------------+-------+---+------------------+------------------+
|      term|              name|     id| tf|               idf|            tf-idf|
+----------+------------------+-------+---+------------------+------------------+
|    hunter|The Little Vampire|4405649|  1| 9.746777457788987| 9.746777457788987|
|    family|The Little Vampire|4405649|  2| 6.776965075445373|13.553930150890746|
|      safe|The Little Vampire|4405649|  1|  8.76594820477726|  8.76594820477726|
|nightmares|The Little Vampire|4405649|  2|10.324753252126847|20.649506504253694|
|     sound|The Little Vampire|4405649|  1| 9.280595676499662| 9.280595676499662|
+----------+------------------+-------+---+------------------+------------------+
only showing top 5 rows



In [0]:
# Loading query file 
searchTerms = sc.textFile("/FileStore/tables/Queryfile.txt")
searchTermList = searchTerms.collect()

In [0]:
# function to find single search 

def singleTermCosineSimilarities (movieNameWithWeightsFrame,searchTerms):
    matchingMovies = movieNameWithWeightsFrame.rdd.filter(lambda x: x[0] == searchTerms[0]).sortBy(lambda x: x[5], ascending = False).map(lambda x: (x[1],x[5])).take(10)
    return sc.parallelize(matchingMovies).toDF(["movieName", "cosineSimilarity"])

In [0]:
# function to find multiple search terms 

def multiTermCosineSimilarities(movieNameWithWeightsFrame,searchTerms):
    searchQuery = sc.parallelize(searchTerms).map(lambda x : (x,1)).reduceByKey(lambda x,y : x+y).toDF(["term", "queryCount"])
    movieSummaryTable = tf_idf_df.select("term","name", "id", "tf-idf")
    movieSummaryName = searchQuery.join(movieSummaryTable, on="term", how="left_outer").fillna(0)
    queryTermWeight = movieSummaryName.withColumn('weight', col('queryCount') * col('tf-idf')).withColumn('queryCount', col('queryCount') **2).withColumn('tf-idf', col('tf-idf') **2)
    sumOfQuery = queryTermWeight.groupby('name').agg(sum('weight').alias('weightedSum'),sum('queryCount').alias('weightedCount'),sum('tf-idf').alias('tfidfSum'))
    cosinParameter = sumOfQuery.rdd.map(lambda x: (x[0], (x[1], int(x[2]), x[3])))
    cosineResult = cosinParameter.map(lambda x : (x[0], 1-x[1][0]/(math.sqrt(x[1][1]) * math.sqrt(x[1][2])))).sortBy(lambda x: -x[1]).take(10)
    return sc.parallelize(cosineResult).toDF(['movieName', 'cosineSimilarity'])
 

In [0]:
# printing the output

for i in searchTermList:
    terms = i.lower().split()
    if len(terms)==1:
        print("Query: " , i)
        singleSearch = singleTermCosineSimilarities( movieNameWithWeightsFrame, terms)
        singleSearch.show(truncate = False)
    elif len(terms)>1:
        print("Query: " , i)
        multiTerms = multiTermCosineSimilarities(movieNameWithWeightsFrame, terms)
        multiTerms.show(truncate = False)

Query:  annoyance
+-------------------------+------------------+
|movieName                |cosineSimilarity  |
+-------------------------+------------------+
|Little School Mouse      |21.852038961714413|
|Urban Legend             |10.926019480857207|
|Broadway Limited         |10.926019480857207|
|Murder Collection V.1    |10.926019480857207|
|Grown Ups                |10.926019480857207|
|Quality Street           |10.926019480857207|
|Michael Madhana Kamarajan|10.926019480857207|
|Big                      |10.926019480857207|
|Up, Up, and Away         |10.926019480857207|
|Goodnight Mister Tom     |10.926019480857207|
+-------------------------+------------------+

Query:  free
+------------------------------------------+------------------+
|movieName                                 |cosineSimilarity  |
+------------------------------------------+------------------+
|Gridiron Gang                             |56.01024440039777 |
|Head                                      |48.0087809