In [0]:
%pip install sparknlp mlflow

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
%pip install nltk

Q2: Search Engine for Movie Plot Summaries 

In [0]:
# Build pipeline for stopword remover

documentAssembler = DocumentAssembler() \
.setInputCol("text") \
.setOutputCol("document")

tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")

stop_words = StopWordsCleaner.pretrained("stopwords_iso","en") \
.setInputCols(["token"]) \
.setOutputCol("cleanTokens")

sw_pipeline = Pipeline(stages=[documentAssembler, tokenizer, stop_words]) 


stopwords_iso download started this may take some time.
Approximate size to download 2.1 KB
[ | ][OK!]


In [0]:
from pyspark.sql.functions import col 

In [0]:
import numpy as np

In [0]:
from math import log 

In [0]:
import nltk
from nltk.corpus import stopwords 
nltk.download('stopwords')
nltk.download('punkt')
from nltk.tokenize import word_tokenize
stop_words = set(stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [0]:
# Import movie summaries as dataframe
df = spark.read.option("header","false").option("delimiter", "\t").csv("dbfs:/FileStore/shared_uploads/exc220012@utdallas.edu/plot_summaries.txt") 

In [0]:
# Stop words pipeline functions require a text column, so rename columns of df accordingly
df = df.select(col("_c0").alias("ID"), col("_c1").alias("text"))

In [0]:
# Transform data frame using stopwords pipeline
results = sw_pipeline.fit(df).transform(df)

In [0]:
# Make sure stopwords have been removed effecively
results.select("id", "cleanTokens.result").show(truncate = 200)

+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|      id|                                                                                                                                                                                                  result|
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|23890098|                                [Shlykov, ,, hard-working, taxi, driver, Lyosha, ,, saxophonist, ,, develop, bizarre, love-hate, relationship, ,, despite, prejudices, ,, realize, aren't, different, .]|
|31186339|[nation, Panem, consists, wealthy, Capitol, poorer, districts, ., punishment, past, rebellion, ,, district, provide, boy, girl, ages, 12, 18, 

In [0]:
# Convert dataframe into an RDD
rdd = results.select("id", "cleanTokens.result").rdd.map(tuple)

In [0]:
# Create pair RDD where (word, ID), 1 to prepare to count term frequency per doc
# Cache for access later 
cached_rdd = rdd.flatMap(lambda x: [(term, x[0]) for term in x[1]]).map(lambda x : (x, 1)).cache()

In [0]:
# Count Term Frequency 
# RDD will display (Word, [('ID1', TFin1), ('ID2', 'TFin2)]) where Word is key, and value is a list of tuples that contain ID and TFinDoc 
tf_df = cached_rdd.reduceByKey(lambda x, y : x + y).map(lambda x : (x[0][0], (x[0][1], x[1]))).groupByKey().mapValues(list)

In [0]:
# Calculate frequency of docs that contain a specific term and save as part of each tuple. 
tf_idf = tf_df.mapValues(lambda x : [(t+(len(x),)) for t in x]).cache()

In [0]:
# Get total number of docs for tf-idf calculation
totalDocCount = results.count() 
totalDocCount

Out[15]: 42306

In [0]:
# Function to compute tf-idf 
def get_tfidf(tf, totalDocCount, docCount):  
    idf = log(totalDocCount/docCount, 10)
    result = tf * idf 
    return result 

In [0]:
# Map step to calculate tf-idf for each tuple (ID, TF, DocF)
inverted_index = tf_idf.mapValues(lambda x : [ 
    (values[0], values[1], get_tfidf(values[1], totalDocCount, values[2])) for values in x
    ]
                 )

In [0]:
# Upload movie metadata to get movie titles
movies = spark.read.csv("dbfs:/FileStore/shared_uploads/exc220012@utdallas.edu/movie_metadata.tsv", sep=r'\t', header=False)
movies= movies.select(col("_c0").alias("ID"), col("_c2").alias("Name")).rdd.map(tuple)

In [0]:
# Upload search terms file 
queries = sc.textFile('dbfs:/FileStore/searchterms.txt')

In [0]:
# view search terms
searchterms = queries.collect() 

In [0]:
# Function to Retrieve top 10 results for single word query. 
def Top10SingleResults(word): 
    print("Finding results..")
    matches = inverted_index.lookup(word.lower()) # obtain values corresponding to key
    if not matches: 
        print("No matches found")
        return 
    else : 
        sorted_results = sorted(*matches, key = lambda x: x[2], reverse=True) # sort in descending order by TF-IDF
        top10 = sorted_results[0:10] 
        top10ids = list(zip(*top10))[0]
        result = movies.filter(lambda x : x[0] in top10ids).values().collect()    # filter for movie IDs that match the top 10 list 
    return result 


In [0]:
# Collect list of movie ids as strings 
movieIds = df.select('ID').rdd.flatMap(lambda x: x).collect()

In [0]:
# Create dictionary to map movie IDs to integers for easier array operations in cosine similarity calculation
movieid_lookup = {element: index for index, element in enumerate(movieIds)} 
movieid_lookup

In [0]:
# Create array to store numerators of cosine similarity scores 
scores = np.zeros(totalDocCount) 
print(scores)

[0. 0. 0. ... 0. 0. 0.]


In [0]:
# Create array of document lengths for normalization step in cosine similarity
doclengths = rdd.map(lambda x : (x[0], len(x[1]))).collect()
lengths = np.zeros(totalDocCount, dtype = int)
for x in doclengths: 
    index = movieid_lookup.get(x[0])
    lengths[index] = x[1]

lengths 

Out[31]: array([ 21, 526, 321, ...,  82, 133, 367])

In [0]:
# Function to update numerator of cosine similarity scores 
def update_score(postingslist): 
    for x in (postingslist): 
        index = movieid_lookup.get(x[0])
        if scores[index] == 0: 
            scores[index] = x[2]
        else: 
            scores[index] = scores[index] + x[2]


In [0]:
# Function to Retrieve top 10 Movie Names for Multi-Queries
def top10MultiResults(scores): 
    # Getting the indexes of the top 10 scores. 
    top_10 = np.argpartition(scores, -10)[-10:]
    # Obtain list of top 10 movie titles by using the indices of the scores array to look up the corresponding movie IDs in the lookup dict
    top10list = []
    for x in top_10: 
        top10list.append(list(movieid_lookup.keys())[list(movieid_lookup.values()).index(x)])
    result = movies.filter(lambda x : x[0] in top10list).values().collect() 
    return result


In [0]:
# Script to Perform Searches 
# For each term in the text file, either outputs top 10 documents ranked by highest TF-IDF OR top 10 documents ranked by cosine similarity 
for query in searchterms: 
    print(query)
    # Multi-query 
    if (" " in query): 
        search_tokens = word_tokenize(query)
        tokens = [word for word in search_tokens if not word in stopwords.words()]
        matches = []
        for x in tokens: 
            results = inverted_index.lookup(x.lower())[0]
            if not results: 
                print("No matches found")
            else: 
                matches.append(results) 
            for x in matches:
                update_score(x)
        # Normalize the cosine similarity score. 
        scores = np.divide(scores, lengths)
        print("Finding results..")
        print(top10MultiResults(scores)) 
        scores.fill(0)

    
    else: 
        print(Top10SingleResults(query))
    

kill
Finding results..
['Puppet Master: The Legacy', 'Diljale', 'Demon Hunter', 'North and South', 'Tango Charlie', 'Hercules', "The Warrior's Way", 'U Turn', 'Stash House', 'Shadowboxer']
Orange
Finding results..
['Mermaid Got Married', 'La Vampire Nue', 'Trading Places', 'Clean, Shaven', 'Sherlock Holmes and the House of Fear', 'Message from Space', 'Tweet and Sour', 'The House on Telegraph Hill', "It's a Gift", 'Laughter and Grief by the White Sea']
America
Finding results..
['The Adventures of Rusty']
saxophone
Finding results..
['Danny', 'Duet', 'Ibu Mertuaku', 'Some Like It Hot', 'The Conversation', "St. Elmo's Fire", 'New York, New York', "You Don't Know What You're Doin'!", 'Here Comes Mr. Jordan', 'The Three Wishes of Billy Grier']
pirate
Finding results..
['Dick Deadeye, or Duty Done', "Pirates II: Stagnetti's Revenge", 'Animal Treasure Island', 'Pirates of the Great Salt Lake', 'Raggedy Ann & Andy: A Musical Adventure', 'The Boat That Rocked', 'Case Closed: Jolly Rogers in t