In [0]:
PLOT_PATH = "dbfs:/FileStore/tables/plot_summaries.txt"
METADATA_PATH = "dbfs:/FileStore/tables/movie_metadata.tsv"
SINGLE_TERM_PATH = "dbfs:/FileStore/tables/single_term.txt"
MULTI_TERM_PATH = "dbfs:/FileStore/tables/multi_term.txt"

In [0]:
import re
from pyspark.ml.feature import StopWordsRemover
import math

In [0]:
# Load the plot summaries file
data_rdd = sc.textFile(PLOT_PATH)

In [0]:
# Split into (movie_id, plot_summary)
data_rdd = data_rdd.map(lambda line: line.split("\t")).map(lambda x: (int(x[0]), x[1]))

In [0]:
# data_rdd.take(1)

In [0]:
# data_rdd.count()

# Preprocessing

In [0]:
# Load stopwords from Spark
stopwords = StopWordsRemover().getStopWords()

In [0]:
# Function to clean text and tokenize
def preprocess(text):
    words = re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())  # Extract words of length ≥ 2
    return [word for word in words if word not in stopwords]

In [0]:
# Apply preprocessing
tokenized_rdd = data_rdd.map(lambda x: (x[0], preprocess(x[1])))

In [0]:
# stopwords[:5]

In [0]:
# tokenized_rdd.take(10)

In [0]:
# tokenized_rdd.take(1)

# TF-IDF Computation

## TF : Frequency of a word within document

In [0]:
# Emit (movie_id, word) -> 1
word_counts_rdd = tokenized_rdd.flatMap(lambda x: [((x[0], word), 1) for word in x[1]])

In [0]:
# Reduce by key to get term frequency (TF)
tf_rdd = word_counts_rdd.reduceByKey(lambda a, b: a + b)

In [0]:
# word_counts_rdd.take(10)

In [0]:
# tf_rdd.take(10)

In [0]:
# tf_rdd.sortBy(lambda x : x[1]).take(10)

In [0]:
# tf_rdd.sortBy(lambda x: -x[1]).take(10)

## Document Frequency

In [0]:
# Extract (word, movie_id) pairs to count unique movies per word
doc_word_rdd = tf_rdd.map(lambda x: (x[0][1], x[0][0])).distinct()

In [0]:
# Count how many documents contain each word
df_rdd = doc_word_rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

In [0]:
# Get total number of documents
num_docs = data_rdd.count()

In [0]:
# doc_word_rdd.take(5)

In [0]:
# df_rdd.take(5)

In [0]:
# num_docs

In [0]:
# df_rdd.take(10)

## IDF Calculation

In [0]:
# Compute IDF using log(N / df)
idf_rdd = df_rdd.map(lambda x: (x[0], math.log(num_docs / (x[1] + 1))))

In [0]:
# idf_rdd.sortBy(lambda x: -x[1]).take(20)

In [0]:
# idf_rdd.take(10)

In [0]:
# Join TF and IDF
tfidf_rdd = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1])))  # (word, (movie_id, tf))
tfidf_rdd = tfidf_rdd.join(idf_rdd)  # (word, ((movie_id, tf), idf))
tfidf_rdd = tfidf_rdd.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] * x[1][1]))  # ((movie_id, word), tf-idf)

In [0]:
# tfidf_rdd.take(10)

# Reading Search Queries

In [0]:
# Load query files
single_term_rdd = sc.textFile(SINGLE_TERM_PATH)
multi_term_rdd = sc.textFile(MULTI_TERM_PATH)

# Collect queries as lists
single_term_queries = single_term_rdd.collect()
multi_term_queries = multi_term_rdd.collect()

In [0]:
# Print queries to verify
print("Single-Term Queries:", single_term_queries)
print("Multi-Term Queries:", multi_term_queries)

Single-Term Queries: ['action', 'adventure', 'comedy', 'romance', 'thriller']
Multi-Term Queries: ['funny movie with action scenes', 'heartwarming drama with inspiring story', 'thrilling adventure with mystery and suspense', 'romantic comedy with clever dialogue and charm', 'suspenseful thriller with unexpected twist and depth']


In [0]:
def search_single_term(term):
    results = (
        tfidf_rdd.filter(lambda x: x[0][1] == term)
        .map(lambda x: (x[0][0], x[1]))
        .top(10, key=lambda x: x[1])
    )  # Sort by TF-IDF
    return results

In [0]:
# Run search for all single-term queries
single_term_results = {term: search_single_term(term) for term in single_term_queries}

In [0]:
# Print results
for term, results in single_term_results.items():
    print(f"\n🔍 Results for single-term query: {term}")
    for movie_id, score in results:
        print(f"Movie ID: {movie_id}, TF-IDF Score: {score}")


🔍 Results for single-term query: action
Movie ID: 28657324, TF-IDF Score: 41.445035499107064
Movie ID: 33035035, TF-IDF Score: 27.630023666071374
Movie ID: 31474926, TF-IDF Score: 20.722517749553532
Movie ID: 4608223, TF-IDF Score: 17.268764791294608
Movie ID: 10087485, TF-IDF Score: 17.268764791294608
Movie ID: 1376388, TF-IDF Score: 13.815011833035687
Movie ID: 17853465, TF-IDF Score: 13.815011833035687
Movie ID: 12596771, TF-IDF Score: 13.815011833035687
Movie ID: 23380332, TF-IDF Score: 10.361258874776766
Movie ID: 1244836, TF-IDF Score: 10.361258874776766

🔍 Results for single-term query: adventure
Movie ID: 1266489, TF-IDF Score: 32.073625296612235
Movie ID: 6115870, TF-IDF Score: 22.909732354723026
Movie ID: 18405563, TF-IDF Score: 13.745839412833817
Movie ID: 13365023, TF-IDF Score: 13.745839412833817
Movie ID: 36177096, TF-IDF Score: 13.745839412833817
Movie ID: 12759589, TF-IDF Score: 9.16389294188921
Movie ID: 28111565, TF-IDF Score: 9.16389294188921
Movie ID: 10328139, TF-

In [0]:
def search_multi_term(query):
    query_terms = query.lower().split()

    # Get query term TF-IDF values
    query_tfidf_rdd = tfidf_rdd.filter(lambda x: x[0][1] in query_terms)

    # Compute document norms
    doc_norms_rdd = (
        query_tfidf_rdd.map(lambda x: (x[0][0], x[1] ** 2))
        .reduceByKey(lambda a, b: a + b)
        .map(lambda x: (x[0], math.sqrt(x[1])))
    )

    # Compute dot product for cosine similarity
    dot_product_rdd = query_tfidf_rdd.map(lambda x: (x[0][0], x[1])).reduceByKey(
        lambda a, b: a + b
    )

    # Normalize and compute final similarity
    cosine_sim_rdd = (
        dot_product_rdd.join(doc_norms_rdd)
        .map(lambda x: (x[0], x[1][0] / x[1][1]))
        .top(10, key=lambda x: x[1])
    )

    return cosine_sim_rdd

In [0]:
# Run search for all multi-term queries
multi_term_results = {query: search_multi_term(query) for query in multi_term_queries}

In [0]:
# Print results
for query, results in multi_term_results.items():
    print(f"\n🔍 Results for multi-term query: {query}")
    for movie_id, similarity in results:
        print(f"Movie ID: {movie_id}, Cosine Similarity: {similarity}")


🔍 Results for multi-term query: funny movie with action scenes
Movie ID: 4513177, Cosine Similarity: 1.7218655342148554
Movie ID: 1047827, Cosine Similarity: 1.7218655342148554
Movie ID: 1801788, Cosine Similarity: 1.7218655342148554
Movie ID: 1343165, Cosine Similarity: 1.7218655342148554
Movie ID: 5258501, Cosine Similarity: 1.7218655342148554
Movie ID: 19822985, Cosine Similarity: 1.717713774365684
Movie ID: 5778040, Cosine Similarity: 1.6965573909031133
Movie ID: 164395, Cosine Similarity: 1.6965573909031133
Movie ID: 8279491, Cosine Similarity: 1.6965573909031133
Movie ID: 1812196, Cosine Similarity: 1.6965573909031133

🔍 Results for multi-term query: heartwarming drama with inspiring story
Movie ID: 19923267, Cosine Similarity: 1.5703944432376282
Movie ID: 13516269, Cosine Similarity: 1.4137155106519212
Movie ID: 8714658, Cosine Similarity: 1.4102350468211504
Movie ID: 1374349, Cosine Similarity: 1.4102350468211504
Movie ID: 17955165, Cosine Similarity: 1.4102350468211504
Movie 

In [0]:
# Load movie metadata (movie_id -> movie_name)
metadata_rdd = (
    sc.textFile(METADATA_PATH)
    .map(lambda line: line.split("\t"))
    .map(lambda x: (int(x[0]), x[2]))
)  # (movie_id, movie_name)

In [0]:
# Function to get movie names
def get_movie_names(results):
    results_rdd = sc.parallelize(results)
    return results_rdd.join(metadata_rdd).map(lambda x: (x[1][1], x[1][0])).collect()

In [0]:
# Convert single-term results
for term, results in single_term_results.items():
    named_results = get_movie_names(results)
    print(f"\n📽 Movies for single-term '{term}':", named_results)


📽 Movies for single-term 'action': [('Kyun Tum Say Itna Pyar Hai', 13.815011833035687), ('Nanny McPhee and the Big Bang', 10.361258874776766), ('West Side Story', 17.268764791294608), ('Crayon Shin-chan: Action Kamen vs Leotard Devil', 41.445035499107064), ('Action Man: Robot Atak', 27.630023666071374), ('Rosencrantz & Guildenstern Are Dead', 17.268764791294608), ('Bombaat', 13.815011833035687), ('Crayon Shin-chan: The Storm Called The Jungle', 20.722517749553532), ('Jingle All the Way', 10.361258874776766), ('Smallpox', 13.815011833035687)]

📽 Movies for single-term 'adventure': [('Scavenger Hunt', 22.909732354723026), ('Hand in Hand', 9.16389294188921), ('The Goonies', 9.16389294188921), ('Wild Horse Hank', 13.745839412833817), ('Meu Tio Matou Um Cara', 13.745839412833817), ('Adventuress Wanted', 9.16389294188921), ('Blackie & Kanuto', 13.745839412833817), ('The Pagemaster', 32.073625296612235), ('Max Steel: Countdown', 9.16389294188921), ('The Brothers Bloom', 9.16389294188921)]

📽

In [0]:
# Convert multi-term results
for query, results in multi_term_results.items():
    named_results = get_movie_names(results)
    print(f"\n📽 Movies for multi-term '{query}':", named_results)


📽 Movies for multi-term 'funny movie with action scenes': [('Om Shanti Om', 1.6965573909031133), ('Dien Bien Phu', 1.7218655342148554), ('The Purple Monster Strikes', 1.6965573909031133), ('The Alamo', 1.7218655342148554), ("Beethoven's Big Break", 1.717713774365684), ('The Stunt Man', 1.6965573909031133), ('Manichitrathazhu', 1.6965573909031133), ('Benji the Hunted', 1.7218655342148554), ('The Game of Death', 1.7218655342148554), ('More American Graffiti', 1.7218655342148554)]

📽 Movies for multi-term 'heartwarming drama with inspiring story': [('The Player', 1.4102350468211504), ('Salomé', 1.4102350468211504), ('The Locket', 1.4029908719124458), ('Saving Milly', 1.5703944432376282), ('Manodu', 1.4029908719124458), ('Guide', 1.4102350468211504), ('The Raven', 1.4046721934613966), ('Shriman Prithviraj', 1.4137155106519212), ('To Live', 1.4102350468211504), ('Candy Rain', 1.4029908719124458)]

📽 Movies for multi-term 'thrilling adventure with mystery and suspense': [('Beautiful', 1.413