In [None]:
# Name: Edilberto F. Carrizales
# Date: Thurs, Feb 22, 2024

In [None]:
# installing nltk (Natural Language ToolKit) Library
%pip install nltk

Python interpreter will be restarted.
Collecting nltk
  Downloading nltk-3.8.1-py3-none-any.whl (1.5 MB)
Collecting tqdm
  Downloading tqdm-4.66.2-py3-none-any.whl (78 kB)
Collecting regex>=2021.8.3
  Downloading regex-2023.12.25-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (773 kB)
Installing collected packages: tqdm, regex, nltk
Successfully installed nltk-3.8.1 regex-2023.12.25 tqdm-4.66.2
Python interpreter will be restarted.


In [None]:
# 2. Search Engine for Movie Plot Summaries
# In this part, we will work with a data set of movie plot summaries that is available from the Carnegie Movie Summary Corpus site. We are interested in building a search engine for the plot summaries that are available in the file “plot_summaries.txt” that is available under the Dataset link of the above page.
import nltk
import math
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# download wordlists required
nltk.download('stopwords')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Out[11]: True

In [None]:
# Read in the Plot summaries and user search terms
plot_summaries_rdd = sc.textFile("dbfs:/FileStore/plot_summaries.txt")

In [None]:
# split the plot summaries into movie id and summary
plot_summaries_rdd = plot_summaries_rdd.map(lambda line: line.split("\t"))

In [None]:
plot_summaries_rdd.collect()

Out[14]: [['23890098',
  "Shlykov, a hard-working taxi driver and Lyosha, a saxophonist, develop a bizarre love-hate relationship, and despite their prejudices, realize they aren't so different after all."],
 ['31186339',
  'The nation of Panem consists of a wealthy Capitol and twelve poorer districts. As punishment for a past rebellion, each district must provide a boy and girl  between the ages of 12 and 18 selected by lottery  for the annual Hunger Games. The tributes must fight to the death in an arena; the sole survivor is rewarded with fame and wealth. In her first Reaping, 12-year-old Primrose Everdeen is chosen from District 12. Her older sister Katniss volunteers to take her place. Peeta Mellark, a baker\'s son who once gave Katniss bread when she was starving, is the other District 12 tribute. Katniss and Peeta are taken to the Capitol, accompanied by their frequently drunk mentor, past victor Haymitch Abernathy. He warns them about the "Career" tributes who train intensively

In [None]:
# a list of unique english stop words
english_stop_words = set(stopwords.words("english"))

In [None]:
def filter_stop_words(plot_summary):
    filtered_summary = []

    # for every word in the plot summary
    for word in plot_summary.split():
        # if that word is not a stop word, then we keep it
        if word.lower() not in english_stop_words:
            filtered_summary.append(word)
    
    # join all non-stop words into a single string summary
    #filtered_summary = " ".join(filtered_summary)
    return filtered_summary

# call the above function in map so it can be ran by all the worker nodes
plot_summaries_filtered = plot_summaries_rdd.map(lambda x: (x[0], filter_stop_words(x[1])))

In [None]:
plot_summaries_filtered.collect()

Out[17]: [('23890098',
  ['Shlykov,',
   'hard-working',
   'taxi',
   'driver',
   'Lyosha,',
   'saxophonist,',
   'develop',
   'bizarre',
   'love-hate',
   'relationship,',
   'despite',
   'prejudices,',
   'realize',
   'different',
   'all.']),
 ('31186339',
  ['nation',
   'Panem',
   'consists',
   'wealthy',
   'Capitol',
   'twelve',
   'poorer',
   'districts.',
   'punishment',
   'past',
   'rebellion,',
   'district',
   'must',
   'provide',
   'boy',
   'girl',
   'ages',
   '12',
   '18',
   'selected',
   'lottery',
   'annual',
   'Hunger',
   'Games.',
   'tributes',
   'must',
   'fight',
   'death',
   'arena;',
   'sole',
   'survivor',
   'rewarded',
   'fame',
   'wealth.',
   'first',
   'Reaping,',
   '12-year-old',
   'Primrose',
   'Everdeen',
   'chosen',
   'District',
   '12.',
   'older',
   'sister',
   'Katniss',
   'volunteers',
   'take',
   'place.',
   'Peeta',
   'Mellark,',
   "baker's",
   'son',
   'gave',
   'Katniss',
   'bread',
   'starv

In [None]:
# Now we will create tf-idf for every term in every document
# tf = total frequency (number of terms in a document) NOTE:(sometimes it is also done as: (num terms in doc/ total terms in doc), this is done for normalization purposes)
# df = document frequency (number of documents that a term occurs in)
# idf = Inverse document frequency

# We will do tf first:
# 1. We will turn all the terms into a tuple of (ID, Term, 1) using faltMap(), where 1 will represent the total frequency of each of the terms and put them all in a single list
# Note: its important to have (Id,term) in parenthesis because it will allow us to later reduceByKey and get the frequency of a term per document and NOT in all documents.
plot_summaries_tf = plot_summaries_filtered.flatMap(lambda x: [ ( (x[0], term) , 1) for term in x[1] ])

In [None]:
plot_summaries_tf.collect()

Out[19]: [(('23890098', 'Shlykov,'), 1),
 (('23890098', 'hard-working'), 1),
 (('23890098', 'taxi'), 1),
 (('23890098', 'driver'), 1),
 (('23890098', 'Lyosha,'), 1),
 (('23890098', 'saxophonist,'), 1),
 (('23890098', 'develop'), 1),
 (('23890098', 'bizarre'), 1),
 (('23890098', 'love-hate'), 1),
 (('23890098', 'relationship,'), 1),
 (('23890098', 'despite'), 1),
 (('23890098', 'prejudices,'), 1),
 (('23890098', 'realize'), 1),
 (('23890098', 'different'), 1),
 (('23890098', 'all.'), 1),
 (('31186339', 'nation'), 1),
 (('31186339', 'Panem'), 1),
 (('31186339', 'consists'), 1),
 (('31186339', 'wealthy'), 1),
 (('31186339', 'Capitol'), 1),
 (('31186339', 'twelve'), 1),
 (('31186339', 'poorer'), 1),
 (('31186339', 'districts.'), 1),
 (('31186339', 'punishment'), 1),
 (('31186339', 'past'), 1),
 (('31186339', 'rebellion,'), 1),
 (('31186339', 'district'), 1),
 (('31186339', 'must'), 1),
 (('31186339', 'provide'), 1),
 (('31186339', 'boy'), 1),
 (('31186339', 'girl'), 1),
 (('31186339', 'age

In [None]:
# 2. We will get the tf (total frequency) of each of the terms in each document using reduceByKey
plot_summaries_tf = plot_summaries_tf.reduceByKey(lambda x, y: x + y)

In [None]:
plot_summaries_tf.collect()

Out[21]: [(('2462689', 'note'), 1),
 (('12978934', 'crowded'), 1),
 (('17124781', 'Bad,'), 1),
 (('19174305', 'incompetent'), 1),
 (('8153846', 'bad.'), 1),
 (('17711304', 'office'), 1),
 (('18369853', 'One'), 1),
 (('657446', 'series'), 1),
 (('657446', 'tells'), 1),
 (('1282593', 'difficult'), 1),
 (('1520023', 'commanders'), 1),
 (('28177482', 'Willie'), 1),
 (('3662683', 'upon.'), 1),
 (('473286', 'Seeking'), 1),
 (('33942920', 'Elizabeth'), 2),
 (('20903293', 'wants'), 2),
 (('20120996', 'brings'), 1),
 (('5239234', 'exhibition,'), 1),
 (('518117', 'himself.'), 1),
 (('797330', 'finds'), 1),
 (('17063416', "father's"), 1),
 (('3192223', 'watching'), 1),
 (('3875037', 'fictional'), 1),
 (('8204853', 'home,'), 1),
 (('14711494', 'charge.'), 1),
 (('24228496', "actor's"), 1),
 (('14901197', 'takes'), 1),
 (('7671792', 'goes'), 1),
 (('7548602', 'front'), 1),
 (('25223887', 'Surge'), 2),
 (('9214302', 'Dante'), 1),
 (('24355979', 'temporary'), 1),
 (('228355', 'body,'), 1),
 (('174076

In [None]:
# Next we will df (document frequencies):
# 1. Get all terms from all documents and give them a count of 1, using map
plot_summaries_df = plot_summaries_tf.map(lambda x: (x[0][1], 1))

In [None]:
plot_summaries_df.collect()

Out[23]: [('informs', 1),
 (',', 1),
 ('paddles', 1),
 ('killed', 1),
 ('bounty', 1),
 ('suicide', 1),
 ('microorganism', 1),
 ('envelope.', 1),
 ('interrupts', 1),
 ('sneaks', 1),
 ('series', 1),
 ('car', 1),
 ('Katie,', 1),
 ('one', 1),
 ('seeing', 1),
 ('called', 1),
 ('confronted', 1),
 ('Herbie,', 1),
 ('focuses', 1),
 ('compound.', 1),
 ('One', 1),
 ('century.', 1),
 ('visual', 1),
 ('goes', 1),
 ('Tortured', 1),
 ('teller,', 1),
 ('morose', 1),
 ('leave.', 1),
 ('homecoming', 1),
 (',', 1),
 ('financial', 1),
 ('blackjack', 1),
 ('talks', 1),
 ('facing', 1),
 ('Barin.', 1),
 ('sleep', 1),
 ('Dance', 1),
 ('teasingly', 1),
 ('waiting', 1),
 ('swim', 1),
 ('However,', 1),
 ('Bell-Harmonic,', 1),
 ('"tells",', 1),
 ('instated', 1),
 ('befriends', 1),
 ('brought', 1),
 ('João', 1),
 ('running', 1),
 ('missing', 1),
 ('monkey', 1),
 ('state', 1),
 ('tells', 1),
 ('outwit', 1),
 ('intercepts', 1),
 ('Pursuing', 1),
 ('Nutan.', 1),
 ('regard', 1),
 ('Detective', 1),
 ('sad', 1),
 ('bet

In [None]:
# 2. Then we will get the number of documents that each term occurs in (i.e, document frequency)
plot_summaries_df = plot_summaries_df.reduceByKey(lambda x, y: x + y)

In [None]:
plot_summaries_df.collect()

Out[26]: [('zooms', 81),
 ('boys', 1168),
 ('new', 8143),
 ('Florian', 7),
 ('watches', 1106),
 ('mate,', 67),
 ('medical', 913),
 ('dragged', 274),
 ('putty', 13),
 ('Willie', 117),
 ('intimate', 226),
 ('pickaxe', 11),
 ('Anton', 76),
 ('drops', 1052),
 ('her,', 4857),
 ('pouch', 14),
 ('Jergens', 1),
 ('meets', 6444),
 ('however,', 3531),
 ('extinct.', 18),
 ('close', 2173),
 ('NorthAm', 1),
 ('unorthodox', 81),
 ("Sonny's", 29),
 ('it,', 2814),
 ('intending', 305),
 ('cricket', 74),
 ('stating', 535),
 ('encounter.', 82),
 ('Madwoman', 1),
 ("Bond's", 20),
 ('Patrick', 186),
 ('freezing', 100),
 ('Ardhanarishvara,', 1),
 ('offers', 2888),
 ('monitor', 113),
 ('learns', 3850),
 ('terrorist,', 30),
 ('expense', 66),
 ('year', 1979),
 ('grabs', 1170),
 ('reveals', 4327),
 ('Ursula,', 11),
 ('names', 647),
 ('using', 3079),
 ('taxi.', 59),
 ('really', 1859),
 ('Shinobi', 2),
 ('town,', 1115),
 ('name', 2720),
 ('“Amour', 1),
 ('paleontologists', 2),
 ('kneels', 36),
 ('sets', 2443),
 (

In [None]:
# Finally we will calculate the tf-idf weights:
# weight (i) = TF(i) * log(idf (i))  OR  # weight (i) = TF(i) * log(N / df (i))

# Thus, we will first have to calculate the idf (inverse document frequencies)
# Formula:
#   IDF(i) = N / DF(i)
# Where:
#   a. N = the number of documents
#   b .And all df's = all the terms document frequencies

# So,
# 1. the number of documents N is:
num_of_documents_N = plot_summaries_rdd.count()

In [None]:
print(num_of_documents_N)

42306


In [None]:
# 2. Now, the calculation for the idf for each term

def calculate_idf(df):
    idf = math.log(num_of_documents_N / df)
    return idf

plot_summaries_idf = plot_summaries_df.map(lambda x: (x[0], calculate_idf(x[1])) )

In [None]:
plot_summaries_idf.collect()

Out[30]: [('zooms', 6.258235044274657),
 ('boys', 3.589636035558923),
 ('new', 1.6477702574799642),
 ('Florian', 8.706774049891782),
 ('watches', 3.6441790168648156),
 ('mate,', 6.447991579556129),
 ('medical', 3.835948318352127),
 ('dragged', 5.0395560925590255),
 ('putty', 8.08773484148556),
 ('Willie', 5.89051026414934),
 ('intimate', 5.23214919967481),
 ('pickaxe', 8.254788926148725),
 ('Anton', 6.321950858660765),
 ('drops', 3.6942358056494404),
 ('her,', 2.1645079566013505),
 ('pouch', 8.013626869331837),
 ('Jergens', 10.652684198947096),
 ('meets', 1.8817794546502307),
 ('however,', 2.483347803018709),
 ('extinct.', 7.7623124410509305),
 ('close', 2.968820218690666),
 ('NorthAm', 10.652684198947096),
 ('unorthodox', 6.258235044274657),
 ("Sonny's", 7.285388368960621),
 ('it,', 2.7103219612727614),
 ('intending', 4.932372422339684),
 ('cricket', 6.348619105742926),
 ('stating', 4.370417452051089),
 ('encounter.', 6.245964951682843),
 ('Madwoman', 10.652684198947096),
 ("Bond's", 

In [None]:
# 2. Now, we can plug in directly into weight formula
def calculate_weight(current_term, current_tf, dict_summaries_idf):
    weight = 0
    if current_term in dict_summaries_idf:
        weight = current_tf * dict_summaries_idf[current_term]
    
    return weight

# collect and convert to a dict for O(1) access time in function
dict_summaries_idf = plot_summaries_idf.collectAsMap()

# we will calculate plot_summary_weight and will be in the format (movieid, a_summary_term, weight)
plot_summaries_weights = plot_summaries_tf.map(lambda x: (x[0][0], x[0][1], calculate_weight(x[0][1], x[1], dict_summaries_idf)))


In [None]:
plot_summaries_weights.collect()

Out[32]: [('2231378', 'went', 3.674470456316397),
 ('595909', 'cold-hearted,', 9.554071910278985),
 ('595909', 'overturned.', 8.57324265726726),
 ('1952976', 'looks', 5.8957445520290035),
 ('15401493', 'ex-fiancé', 9.043246286512995),
 ('4018288', 'Dream', 19.88199752463584),
 ('4596602', 'women.', 4.464420075864505),
 ('8153846', 'faced', 4.868859016617358),
 ('4466226', 'legs', 20.92859679869924),
 ('18369853', 'state.', 5.1888523939214855),
 ('8034072', 'Banaras.', 9.95953701838715),
 ('4016437', 'narrator,', 6.46302945692067),
 ('17060199', 'using', 2.620324051022595),
 ('17060199', 'service', 8.521534171108987),
 ('1520023', 'man,', 6.184166072357078),
 ('14582951', 'indigenous', 6.824042802458001),
 ('33942920', 'him,', 1.795880842218718),
 ('3644125', 'solitary', 5.558933998140334),
 ('6575053', 'area', 7.641461266762481),
 ('6575053', 'dies', 3.0001385062531747),
 ('11990695', 'asks', 2.036188806456996),
 ('27387452', 'begun', 5.046882132651098),
 ('2209027', 'buys', 4.10589878

In [None]:
# convert the terms and their idf values into tuples, then put them all in a list that will be tupled with movieid 
plot_summaries_by_doc = plot_summaries_weights.map(lambda x: (x[0], (x[1], x[2]) ))

plot_summaries_by_doc = plot_summaries_by_doc.groupByKey().mapValues(list)

In [None]:
plot_summaries_by_doc.collect()

Out[34]: [('30730051',
  [('clear', 3.4749017827518984),
   ('secretly', 3.3610279897726345),
   ('complicated', 4.6438710135045005),
   ('rekindle', 5.899094007840731),
   ('forced', 2.559221924445915),
   ('love', 1.5603393852086203),
   ('be.', 5.057972819345257),
   ('true', 2.934887987933514),
   ('plan', 2.6070959181435676),
   ('beachside', 7.944633997844885),
   ('infatuated', 5.2410381470920555),
   ('elegant', 6.2832363464800745),
   ('best', 2.5871047716650026),
   ('Zoe', 6.701440480365668),
   ('.', 1.1749920209923828),
   ('impending', 5.0322833332299455),
   ("Sam's", 5.552817771122897),
   ('begins', 1.7611726284195306),
   ('intention', 4.561374316869397),
   ('thwart', 5.989245104835028),
   ("filmmaker's", 8.860924729719041),
   ('unravel,', 8.706774049891782),
   ('Davis', 5.493628899732567),
   ('becomes', 1.6707512079602218),
   ('fiancée,', 5.65547192518298),
   ('realize', 3.188174364310568),
   ('famous', 3.556790977849564),
   ('owned', 4.221353117013617),
   

In [None]:
# read in our search queries
single_search_terms_rdd = sc.textFile("dbfs:/FileStore/single_term_search.txt")

In [None]:
# FOR PART 4.A
# select the query you want to test, you can only test one query at a time
query_line_number = 1 # replace the 1 with any query line (text file has 15 lines total)
single_selected_query = single_search_terms_rdd.zipWithIndex().filter(lambda x : x[1] == query_line_number).map(lambda x: x[0])



In [None]:
single_selected_query.collect()

Out[37]: ['Romance']

In [None]:
# get tf for the query
single_query_tf = single_selected_query.map(lambda x: (x, 1)) # add a count of 1 to the term

In [None]:
# get tf-idf for the query
def get_idf_calculate_tf_idf(query_term, tf, dict_summaries_idf):
    weight = 0
    # if query exists in documents retrieve idf and calculate weight, else it will have weight of 0
    if query_term in dict_summaries_idf:
        weight = tf * dict_summaries_idf[query_term]
    
    return weight
        
single_query_tf_idf = single_query_tf.map(lambda x: (x[0], get_idf_calculate_tf_idf(x[0], x[1], dict_summaries_idf)))

In [None]:
def calculate_doc_relevance(doc_words_tuple, query_tf_idf):
    dot_product = 0
    cross_product_A = 0
    cross_product_B = 0

    for term, weight in doc_words_tuple[1]:
        for query_word, query_weight in query_tf_idf:
            if term == query_word:
                #perform dot products
                dot_product += weight * query_weight

                #perform cross product
                cross_product_A += weight ** 2
                cross_product_B += query_weight ** 2
    
    # check if zero
    if cross_product_A == 0 or cross_product_B == 0:
        return 0

    #cos_similarity = dot/ cross_product
    cos_similarity = dot_product / ((cross_product_A ** 0.5) * (cross_product_B ** 0.5))

    return cos_similarity

single_query_tf_idf_collected = single_query_tf_idf.collect()

single_query_doc_relevance = plot_summaries_by_doc.map(lambda x: (x[0], calculate_doc_relevance(x, single_query_tf_idf_collected)))

In [None]:
# Sort in descending order
single_query_doc_relevance_sorted = single_query_doc_relevance.sortBy(lambda x: x[1], ascending= False)

In [None]:
# read in movie.metadata.tsv
movie_metadata = sc.textFile("dbfs:/FileStore/tables/movie_metadata.tsv")

In [None]:
# split the movie metadata 
movie_metadata = movie_metadata.map(lambda line: line.split("\t"))

In [None]:
# get the movieid and name into a tuple
movie_id_and_name = movie_metadata.map(lambda x: (x[0], x[2]))

In [None]:
# join movie ids of our query_doc_relevance tuple and our movie metadata tuple
single_query_doc_relevance_with_name = single_query_doc_relevance_sorted.join(movie_id_and_name)

In [None]:
# return top 10 results for single query
top_10_results_single_query = single_query_doc_relevance_with_name.take(10)

In [None]:
# RESULTS FOR 4.A
# printing only the top 10 results(movie names that we got)
single_selected_query = single_selected_query.collect()
print("Single Query Selected:")
print(single_selected_query)

print("\nTop 10 Movies for Query:")
for result in top_10_results_single_query:
    print(result[1][1])

Single Query Selected:
['Romance']

Top 10 Movies for Query:
New Delhi
Hadh Kar Di Aapne
Face
About Love
4 Romance
Doctor Love
Priest
Blood Ties
The Living Ghost
City of God


In [None]:
#==========================================================================================#

In [None]:
# FOR PART 4.B
multi_search_terms_rdd = sc.textFile("dbfs:/FileStore/multi_term_search.txt")

In [None]:
# select the query you want to test, you can only test one query at a time
query_line_number = 1 # replace the 1 with any query line (text file has 15 lines total)
selected_query = multi_search_terms_rdd.zipWithIndex().filter(lambda x : x[1] == query_line_number).map(lambda x: x[0])

In [None]:
selected_query.collect()

Out[70]: ['Sci-fi thriller with time travel']

In [None]:
# split the query line into tokens and put them in a list
query_tokenized = selected_query.flatMap(lambda line: line.split())

# get tf for each term in the query
query_tokenized = query_tokenized.map(lambda x: (x, 1)) # add a count of 1 to each term

In [None]:
query_tokenized.collect()

Out[72]: [('Sci-fi', 1), ('thriller', 1), ('with', 1), ('time', 1), ('travel', 1)]

In [None]:
query_tf = query_tokenized.reduceByKey(lambda x, y: x + y) # get frequency of each word (term, tf)

In [None]:
query_tf.collect()

Out[74]: [('Sci-fi', 1), ('thriller', 1), ('with', 1), ('time', 1), ('travel', 1)]

In [None]:
# get tf-idf for each term in the query
def get_idf_calculate_tf_idf(query_term, tf, dict_summaries_idf):
    weight = 0
    # if query exists in documents retrieve idf and calculate weight, else it will have weight of 0
    if query_term in dict_summaries_idf:
        weight = tf * dict_summaries_idf[query_term]
    
    return weight
        
query_tf_idf = query_tf.map(lambda x: (x[0], get_idf_calculate_tf_idf(x[0], x[1], dict_summaries_idf)))

In [None]:
query_tf_idf.collect()

Out[76]: [('Sci-fi', 10.652684198947096),
 ('thriller', 6.221867400103782),
 ('with', 0),
 ('time', 1.6211117055655697),
 ('travel', 3.346152800007591)]

In [None]:
def calculate_doc_relevance(doc_words_tuple, query_tf_idf):
    dot_product = 0
    cross_product_A = 0
    cross_product_B = 0

    for term, weight in doc_words_tuple[1]:
        for query_word, query_weight in query_tf_idf:
            if term == query_word:
                #perform dot products
                dot_product += weight * query_weight

                #perform cross product
                cross_product_A += weight ** 2
                cross_product_B += query_weight ** 2
    
    # check if zero
    if cross_product_A == 0 or cross_product_B == 0:
        return 0

    #cos_similarity = dot/ cross_product
    cos_similarity = dot_product / ((cross_product_A ** 0.5) * (cross_product_B ** 0.5))

    return cos_similarity

query_tf_idf_collected = query_tf_idf.collect()

query_doc_relevance = plot_summaries_by_doc.map(lambda x: (x[0], calculate_doc_relevance(x, query_tf_idf_collected)))

In [None]:
query_doc_relevance.collect()

Out[78]: [('3242877', 0),
 ('11252704', 1.0),
 ('25193706', 1.0),
 ('1351316', 0),
 ('19435516', 0),
 ('23943222', 1.0),
 ('30710227', 0),
 ('32038391', 0),
 ('26456639', 1.0),
 ('29823857', 0),
 ('26663186', 0),
 ('13084559', 1.0),
 ('28212897', 1.0),
 ('26943525', 0),
 ('10673063', 0),
 ('3827670', 0),
 ('8910297', 0),
 ('25604628', 0),
 ('464724', 0),
 ('12898551', 0),
 ('14385736', 0),
 ('8423598', 0),
 ('13078877', 1.0),
 ('457561', 0),
 ('12311917', 1.0),
 ('4458143', 0),
 ('11103867', 1.0),
 ('12668580', 1.0),
 ('24250799', 0),
 ('12383140', 0),
 ('342556', 1.0),
 ('14144926', 0),
 ('20759694', 0),
 ('30818824', 1.0),
 ('18162734', 0),
 ('328344', 0),
 ('25121894', 0),
 ('12219972', 0),
 ('27792643', 1.0),
 ('7291822', 0),
 ('35204251', 0),
 ('22342771', 0),
 ('22822904', 1.0),
 ('32561525', 0),
 ('42137', 1.0),
 ('17657295', 1.0),
 ('4556874', 1.0),
 ('23396276', 0),
 ('12401107', 0),
 ('35764906', 1.0),
 ('19425193', 0),
 ('630393', 1.0),
 ('1959464', 0),
 ('22447758', 0),
 ('

In [None]:
# Sort in descending order
query_doc_relevance_sorted = query_doc_relevance.sortBy(lambda x: x[1], ascending= False)

In [None]:
# read in movie.metadata.tsv
movie_metadata = sc.textFile("dbfs:/FileStore/tables/movie_metadata.tsv")

In [None]:
# split the movie metadata 
movie_metadata = movie_metadata.map(lambda line: line.split("\t"))

In [None]:
# get the movieid and name into a tuple
movie_id_and_name = movie_metadata.map(lambda x: (x[0], x[2]))

In [None]:
movie_id_and_name.collect()

Out[83]: [('975900', 'Ghosts of Mars'),
 ('3196793', 'Getting Away with Murder: The JonBenét Ramsey Mystery'),
 ('28463795', 'Brun bitter'),
 ('9363483', 'White Of The Eye'),
 ('261236', 'A Woman in Flames'),
 ('13696889', 'The Gangsters'),
 ('18998739', "The Sorcerer's Apprentice"),
 ('10408933', "Alexander's Ragtime Band"),
 ('9997961', 'Contigo y aquí'),
 ('2345652', 'City of the Dead'),
 ('175026', 'Sarah and Son'),
 ('24229100', 'Lady Snowblood 2: Love Song of Vengeance'),
 ('6631279', 'Little city'),
 ('171005', 'Henry V'),
 ('18296435', 'Aaah Belinda'),
 ('11250635', 'The Mechanical Monsters'),
 ('30388930', '1919'),
 ('77856', 'Mary Poppins'),
 ('32456683', 'Die Fahne von Kriwoj Rog'),
 ('33420460', 'Keep the Change'),
 ('175024', "The Devil's Holiday"),
 ('612710', 'New Rose Hotel'),
 ('21926710', 'White on Rice'),
 ('33427105', 'Freddy and the Song of the South Pacific'),
 ('31983669', 'Road to Life'),
 ('17715326', 'Camera Thrills'),
 ('22087420', 'Ferdinando I, re di Napoli

In [None]:
# join movie ids of our query_doc_relevance tuple and our movie metadata tuple
query_doc_relevance_with_name = query_doc_relevance_sorted.join(movie_id_and_name)

In [None]:
# return top 10 results
top_10_results = query_doc_relevance_with_name.take(10)

In [None]:
# RESULTS FOR PART 4.B
# printing only the top 10 results(movie names that we got)
selected_query = selected_query.collect()
print("Multi Query Selected:")
print(selected_query)
print("\nTop 10 Movies for Query:")
for result in top_10_results:
    print(result[1][1])

Multi Query Selected:
['Sci-fi thriller with time travel']

Top 10 Movies for Query:
Nightmare Beach
Runaway
Minutemen
Management
The Girls
When Were You Born
Bevor der Blitz einschlägt
Baxter
And the Violins stopped playing
The Lark
