In [None]:
!pip install nltk
from pyspark.sql import SparkSession
sc = SparkSession.builder.appName("DataFrames").getOrCreate()

In [None]:
from math import log
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
stopwords = set(stopwords.words("english"))

In [None]:
def frequency(x):
    words = x[1]
    l = []
    for s in words:
        if len(s)>2 and s.lower() not in stopwords:
            l.append(((x[0],s.lower()),1))
    return l

In [None]:
def termFrequency(term,plot_summary):
    word_frequencies = plot_summary.filter(lambda x: word in x[1]).map(lambda x: (x[0], x[1][word]))
    return word_frequencies

In [None]:
def idf(term,plot_summary):
    count = plot_summary.filter(lambda x: word in x[0][1]).count()
    if count != 0:
        return log(N/count)
    else: 
        return 1

In [None]:
def tfIdf(term,plot_summary):
    tfValue = plot_summary.filter(lambda x: term in x[0][1]).map(lambda x: (x[0][0], x[1]))
    idfValue = plot_summary.filter(lambda x: term in x[0][1]).count()
    tfIdfValue = tfValue.map(lambda x: (x[0],x[1]*log(N/idfValue)))
    return tfIdfValue

In [None]:
plot_summary = sc.textFile('/FileStore/tables/plot_summaries.txt')

In [None]:
N = plot_summary.count()
print(N)

42306


In [None]:
# Preprocess the data
plot_summary = plot_summary.map(lambda x: x.split('\t'))
plot_summary = plot_summary.map(lambda x: (x[0],x[1].replace(',','').replace('.','').replace('?','').replace('\"','').lower().split(' ')))

In [None]:
plot_summary = plot_summary.flatMap(frequency).reduceByKey(lambda x,y: x+y)

In [None]:
# Read movie metadata into DataFrame
movieDf = sc.read.options(delimiter='\t').csv('/FileStore/tables/movie_metadata.tsv')
movieDf = movieDf.withColumnRenamed("_c2","movieName").withColumnRenamed("_c0","movieId")
headers = ['movieId','tfIdf']

In [None]:
terms = sc.textFile('/FileStore/tables/singletermsearch-5.txt')
print(terms.collect())

['Romance', 'Thriller', 'Revenge', 'Horror', 'Comedy']


In [None]:
terms = terms.collect()
for term in terms:
    term = term.lower()
    tfIdfValue = tfIdf(term,plot_summary)
    topMovies = tfIdfValue.sortBy(lambda x: -1*x[1]).take(10)
    print(term)
    topMoviesRdd = sc.parallelize(topMovies)
    topMoviesDf = topMoviesRdd.toDF(headers)
    result = movieDf.join(topMoviesDf,topMoviesDf.movieId == movieDf.movieId,"inner")
    result.select('movieName','tfIdf').show(truncate = False)

romance
+------------------------------+------------------+
|movieName                     |tfIdf             |
+------------------------------+------------------+
|4 Romance                     |18.122413834445457|
|Dragon                        |14.497931067556365|
|The Great Outdoors            |14.497931067556365|
|The Congress Dances           |10.873448300667274|
|Black Death                   |10.873448300667274|
|The manor of Araucaima        |10.873448300667274|
|Beyond the Valley of the Dolls|10.873448300667274|
|The English Patient           |10.873448300667274|
|Second Fiddle                 |10.873448300667274|
|The Ghosts of Girlfriends Past|10.873448300667274|
+------------------------------+------------------+

thriller
+--------------------------------------------+------------------+
|movieName                                   |tfIdf             |
+--------------------------------------------+------------------+
|Godfather                                   |11.4078486

In [None]:
# cosine similarity
search_query = sc.textFile('/FileStore/tables/multitermsearch-2.txt')
print(search_query.collect())

['Funny movie with action scenes']


In [None]:
search_query_words = search_query.flatMap(lambda x: x.split(' ')).map(lambda x: (x.lower(),1)).reduceByKey(lambda x,y:x+y).collect()
print(search_query_words)

[('movie', 1), ('action', 1), ('funny', 1), ('with', 1), ('scenes', 1)]


In [None]:
l = []
idfValues = {}
for x in search_query_words:
    word = x[0]
    idfValue = idf(word,plot_summary)
    idfValues[word] = idfValue
    l.append((word,x[1]*idfValue))
 
searchRdd = sc.parallelize(l)
 
movieRdd = plot_summary.filter(lambda x: x[0][1] in [w[0] for w in search_query_words]).map(lambda x:(x[0][1],(x[0][0],x[1]*idfValues[x[0][1]])))
#print(movieRDD.collect())
 
joinedRdd = searchRdd.join(movieRdd)

In [None]:
from math import sqrt
cosineSimilarity = joinedRdd.map(lambda x : (x[1][1][0], (x[1][0] * x[1][1][1], x[1][0] **2, x[1][1][1] **2)))
cosineSimilarity = cosineSimilarity.reduceByKey(lambda x,y : ((x[0] + y[0], x[1] + y[1], x[2] + y[2])))
cosineSimilarity = cosineSimilarity.map(lambda x : (x[0], 1-x[1][0]/(sqrt(x[1][1]) * sqrt(x[1][2]))))

In [None]:
result = cosineSimilarity.sortBy(lambda x: -x[1])

In [None]:
resultDF = result.toDF(["movieId", "cosineSimilarity"])

In [None]:
finalResult = movieDf.join(resultDF,movieDf.movieId == resultDF.movieId, "inner")

In [None]:
finalResult.select('movieName', 'cosineSimilarity').show(10,False)

+---------------------------------------+-------------------+
|movieName                              |cosineSimilarity   |
+---------------------------------------+-------------------+
|Hot Boyz                               |0.3700404386106769 |
|Prem Nazirine Kanmanilla               |0.33352119479649256|
|Ghulami                                |0.31128479983586255|
|Mind Game                              |0.31128479983586255|
|Killer Flick                           |0.31128479983586255|
|The Kid & I                            |0.2872144686465431 |
|Kaadhal Parisu                         |0.2831237762424278 |
|Mystery Science Theater 3000: The Movie|0.24661677512878777|
|The Last Movie                         |0.24661677512878777|
|Wishology                              |0.24246302654905127|
+---------------------------------------+-------------------+
only showing top 10 rows

