In [None]:
#This program takes serach term and returns top ten movies that is represented by the serach terms.  
#The program is written following the Apache Spark Map Reduce framework. Cosine similarity has been utilized to create the serach funtionality.

#Input to the program: A text file where each line represents a search term/terms.
#Output: Top ten movies for each search term/terms.

In [None]:
pip install nltk

Python interpreter will be restarted.
Python interpreter will be restarted.


In [None]:
import nltk
nltk.download('book')

In [None]:
from nltk import word_tokenize
from nltk.corpus import stopwords
import math
import re

In [None]:
movies_summary = sc.textFile('dbfs:/FileStore/tables/plot_summaries.txt').cache()
movie_names = spark.read.option('header','false').option('inferSchema','true').option('delimiter','\t').csv('dbfs:/FileStore/Assignment_1/movie_metadata.tsv') #reading the file in df format

movie_name_rdd = movie_names.select(['_c0', '_c2']).rdd.map(lambda x : (str(x['_c0']), str(x['_c2']))) #converting from df to rdd format
moviesRDD = movies_summary.map(lambda x : x.split("\t")).cache()
tokensRDD = moviesRDD.map(lambda x : (x[0], x[1].split(" "))).cache()

stop_words = stopwords.words('english') #will return list of stopwords in English
tokens_processed = tokensRDD.map(lambda x : (x[0], [item.lower() for item in x[1] if item.lower() not in stop_words and item.isalpha()])).cache() #discarding stop words and only considering alphabetical words

tf_single = tokens_processed.flatMap(lambda x : [((x[0], item), 1/len(x[1])) for item in x[1]]).reduceByKey(lambda x,y : x+y).cache()

N = tokens_processed.count()
idf = tf_single.map(lambda x : (x[0][1], 1)).reduceByKey(lambda x,y : x+y).map(lambda x : (x[0], 1+math.log(N/x[1]))).cache()

#tf_idf_single
tf_single_temp = tf_single.map(lambda x : (x[0][1], (x[0][0], x[1]))).cache()
merged_single = tf_single_temp.join(idf).cache()
tf_idf_single = merged_single.mapValues(lambda x : (x[0][0], x[0][1]*x[1])).cache() 

#tf_idf_sentence
tf = tokens_processed.map(lambda x : (x[0], [(item, x[1].count(item)/len(x[1])) for item in set(x[1])])).cache()
idf_dict = dict(idf.collect())
tf_idf = tf.mapValues(lambda x : [(pair[0], idf_dict[pair[0]]*pair[1]) for pair in x]).cache() #tf_idf_single and tf_idf contains the same value in different format. It has been done for later convenience

In [None]:
test_dict = {} # this holds tf-idf value for the query
def input_tf_idf(test):
    global test_dict

    procss_ = [item.lower() for item in test if item not in stop_words and item.isalpha()]

    test_rdd = sc.parallelize(procss_)
    test_tf = test_rdd.map(lambda x : (x,1)).reduceByKey(lambda x,y : x+y).mapValues(lambda x : x/len(test))
    test_tf_idf = test_tf.map(lambda x : (x[0], idf_dict.get(x[0], 0)*x[1]))
    test_dict = dict(test_tf_idf.collect())

In [None]:
def cosine_sim(list1): #this calculates cosine similarity between the documents from the corpus and query text
    list1 = dict(list1)

    query_list = []
    doc_list = []

    query = {key:value for key,value in sorted(test_dict.items())}
    for key in query.keys():
        doc_list.append(list1.get(key, 0))
        query_list.append(query[key])
    
    q_val = 0
    d_val = 0
    dot = 0
    result = 0

    for i in range(len(query_list)):
        dot += query_list[i]*doc_list[i]
        q_val += query_list[i]*query_list[i]
        d_val += doc_list[i]*doc_list[i]
    denom = math.sqrt(q_val)+math.sqrt(d_val)
    result = dot/denom
    return result

In [None]:
def main(input_txt):
    
    input_txt = word_tokenize(input_txt)
    temp = ''

    if(len(input_txt) != 1):
        input_tf_idf(input_txt) #calculate tf-idf for the query
        temp = tf_idf.mapValues(lambda x : cosine_sim(x)).sortBy(lambda x : -x[1])
    else: #if query is a word simply return the top ten documents where the tf-idf for that value is the highest
        temp = tf_idf_single.filter(lambda x : x[0] == input_txt[0].lower()).map(lambda x : (x[1][0], x[1][1])) 

    result = ''
    if(temp.count() == 0):
        result = "No Movie Found"
    else:
        #join the above result with movie_name_rdd to get the names
        result = movie_name_rdd.join(temp).sortBy(lambda x : -x[1][1]) #sort with -x[1][1] allows descending sort by tf_idf values or the cosine values depending on the input query
        result = list(result.map(lambda x : x[1]).take(10)) 
        # result.take(10)

    print(f"Search Result for: {' '.join(input_txt)}")
    i = 0
    for item in result:
        i += 1
        print(f"{i}. {item[0]}")

    print("\n")

In [None]:
in_file = sc.textFile('dbfs:/FileStore/Assignment_1/input_assign1.txt') #The search terms are in this file. In the console below you can see these search term.
in_file = list(in_file.collect())

for line in in_file:
    main(line)

Search Result for: Superhero movies batman superman flash dc
1. Batman: Dead End
2. Atom Man vs. Superman
3. 1 Day
4. Paper Doll
5. Stamp Day for Superman
6. Invaders from Space
7. Superman/Batman: Apocalypse
8. Atomic Rulers of the World
9. All-Star Superman
10. Superman/Batman: Public Enemies


Search Result for: Dracula
1. Vampira
2. Hotel Transylvania
3. Dracula
4. Dracula 2000
5. Dracula p√®re et fils
6. Dracula Has Risen from the Grave
7. Dracula 3D
8. Dracula
9. Dracula A.D.1972
10. Boo!


Search Result for: Funny movie with action scene
1. Kottarathil Kuttibhootham
2. Lu and Bun
3. The Major Lied 'Til Dawn
4. Kote
5. Treasure Buddies
6. Giri
7. Jaal
8. Tiger
9. The Daredevil Men
10. Murder at Midnight


Search Result for: Movies with alien and space
1. Lighter Than Hare
2. Paper Doll
3. Prey
4. The Attack of the Giant Moussaka
5. 002 Operazione Luna
6. Aunt Rose
7. Space Chimps 2: Zartog Strikes Back
8. The Falling
9. Space Master X-7
10. Metamorphosis: The Alien Factor


Searc