In [0]:
%pip install nltk

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import re
import math
from pyspark.sql import SparkSession
from pyspark import SparkContext
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

plot_file = "dbfs:/FileStore/plot_summaries.txt"       
meta_file = "dbfs:/FileStore/movie_metadata.tsv"          
search_file = "dbfs:/FileStore/search_terms-1.txt"        

# Load stopwords and broadcast to workers
my_stopwords = set(stopwords.words('english'))
stopwords_broadcast = sc.broadcast(my_stopwords)


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [0]:
raw_lines = sc.textFile(plot_file)

# Split each line and filter out any that don't have exactly 2 parts
plots_rdd = raw_lines.map(lambda line: line.split("\t")).filter(lambda parts: len(parts) == 2).map(lambda parts: (parts[0], parts[1]))

print("Total movie plots:", plots_rdd.count())
print("One sample plot:", plots_rdd.take(1))

Total movie plots: 42306
One sample plot: [('23890098', "Shlykov, a hard-working taxi driver and Lyosha, a saxophonist, develop a bizarre love-hate relationship, and despite their prejudices, realize they aren't so different after all.")]


In [0]:
meta_rdd = sc.textFile(meta_file).map(lambda line: line.split("\t"))
# print(meta_rdd.take(5))

movie_meta_rdd = meta_rdd.filter(lambda parts: len(parts) >= 3).map(lambda parts: (parts[0], parts[2])).distinct()
movie_meta_dict = dict(movie_meta_rdd.collect())
movie_meta_bc = sc.broadcast(movie_meta_dict)

print("Some movie metadata:", list(movie_meta_dict.items())[:5])


Some movie metadata: [('261236', 'A Woman in Flames'), ('18998739', "The Sorcerer's Apprentice"), ('10408933', "Alexander's Ragtime Band"), ('24229100', 'Lady Snowblood 2: Love Song of Vengeance'), ('6631279', 'Little city')]


In [0]:
def simple_tokenize(text):
    return re.findall(r"[a-zA-Z]+", text.lower())

def process_text(movie_id, text):
    tokens = simple_tokenize(text)
    filtered_tokens = [tok for tok in tokens if tok not in stopwords_broadcast.value]
    return (movie_id, filtered_tokens)

processed_plots = plots_rdd.map(lambda x: process_text(x[0], x[1]))
print("Processed a sample record:", processed_plots.take(1))
# Now we have (movieID, [list of tokens])


Processed a sample record: [('23890098', ['shlykov', 'hard', 'working', 'taxi', 'driver', 'lyosha', 'saxophonist', 'develop', 'bizarre', 'love', 'hate', 'relationship', 'despite', 'prejudices', 'realize', 'different'])]


In [0]:
tf_rdd = processed_plots.flatMap(lambda x: [((x[0], token), 1) for token in x[1]]).reduceByKey(lambda a, b: a + b)

token_doc_pairs = tf_rdd.map(lambda x: (x[0][1], x[0][0])).distinct()
df_rdd = token_doc_pairs.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)


total_docs = processed_plots.count()
print("Total docs:", total_docs)


idf_rdd = df_rdd.map(lambda x: (x[0], math.log(float(total_docs) / float(x[1]), 10)))


tf_by_token = tf_rdd.map(lambda x: (x[0][1], (x[0][0], x[1])))

tfidf_join = tf_by_token.join(idf_rdd)

tfidf_rdd = tfidf_join.map(lambda x: (x[1][0][0], x[0], x[1][0][1] * x[1][1]))
print("Some TF-IDF scores:", tfidf_rdd.take(5))


inv_index_rdd = tfidf_rdd.map(lambda x: (x[1], (x[0], x[2]))) \
                         .groupByKey() \
                         .mapValues(list)

Total docs: 42306
Some TF-IDF scores: [('31186339', 'nation', 2.1118542124004), ('474750', 'nation', 2.1118542124004), ('15340311', 'nation', 2.1118542124004), ('32943405', 'nation', 2.1118542124004), ('33148591', 'nation', 2.1118542124004)]


In [0]:
queries = sc.textFile(search_file).collect()
print("Search queries found:", queries)

def process_query(query):
    tokens = simple_tokenize(query)
    return [tok for tok in tokens if tok not in stopwords_broadcast.value]

def top_docs_single(term, top_n=10):
    results = (inv_index_rdd.filter(lambda x: x[0] == term)
                           .flatMap(lambda x: x[1])
                           .takeOrdered(top_n, key=lambda tup: -tup[1]))
    return results

def print_single_query(term):
    res = top_docs_single(term)
    print(f"Results for query (single term) '{term}':")
    for movie_id, score in res:
        name = movie_meta_bc.value.get(movie_id, "Unknown Movie")
        print(f"  ID: {movie_id}, Name: {name}, Score: {score:.4f}")

Search queries found: ['king', 'action', 'love story', 'batman', 'hulk', 'horror', 'fiction', 'funny movie with action scenes', 'action movie with funny scenes', 'self help movie with happy ending', 'parody of superhero movie', 'movie about car racing']


In [0]:
from collections import Counter
idf_dict = dict(idf_rdd.collect())
idf_bc = sc.broadcast(idf_dict)

def build_query_vector(tokens):
    freq = Counter(tokens)
    return {term: freq_val * idf_bc.value.get(term, 0.0) for term, freq_val in freq.items()}

def cosine_similarity(vec1, vec2):
    dot_product = sum(vec1.get(t, 0.0) * vec2.get(t, 0.0) for t in vec1)
    norm1 = math.sqrt(sum(val**2 for val in vec1.values()))
    norm2 = math.sqrt(sum(val**2 for val in vec2.values()))
    if norm1 == 0 or norm2 == 0:
        return 0.0
    return dot_product / (norm1 * norm2)

def top_docs_multi(tokens, top_n=10):
    query_vec = build_query_vector(tokens)
    query_terms = set(query_vec.keys())
    docs_vectors = (tfidf_rdd.filter(lambda x: x[1] in query_terms).map(lambda x: (x[0], (x[1], x[2]))).groupByKey().mapValues(lambda pairs: dict(pairs)))
    sim_scores = docs_vectors.map(lambda x: (x[0], cosine_similarity(query_vec, x[1])))
    return sim_scores.takeOrdered(top_n, key=lambda tup: -tup[1])

def print_multi_query(tokens):
    res = top_docs_multi(tokens)
    print(f"Results for multi-term query '{' '.join(tokens)}':")
    for movie_id, sim in res:
        name = movie_meta_bc.value.get(movie_id, "Unknown Movie")
        print(f"  ID: {movie_id}, Name: {name}, Similarity: {sim:.4f}")


for q in queries:
    print("========================================")
    print("Query:", q)
    query_tokens = process_query(q)
    if not query_tokens:
        print("  No valid tokens found after stopword removal.")
    elif len(query_tokens) == 1:
        print_single_query(query_tokens[0])
    else:
        print_multi_query(query_tokens)
    print("========================================\n")

Query: king
Results for query (single term) 'king':
  ID: 16702881, Name: King, Score: 60.4686
  ID: 27181651, Name: Esther, Score: 54.7097
  ID: 4015886, Name: The King and I, Score: 47.5110
  ID: 2216322, Name: Ready to Rumble, Score: 44.6316
  ID: 20461830, Name: A Frozen Flower, Score: 40.3124
  ID: 24051028, Name: No Time for Sergeants, Score: 35.9932
  ID: 16904693, Name: Shishkabugs, Score: 34.5535
  ID: 36180452, Name: One Night with the King, Score: 31.6740
  ID: 6967795, Name: Esther... The Girl Who Became Queen, Score: 31.6740
  ID: 3917891, Name: The King and the Clown, Score: 30.2343

Query: action
Results for query (single term) 'action':
  ID: 28657324, Name: Crayon Shin-chan: Action Kamen vs Leotard Devil, Score: 17.9955
  ID: 33035035, Name: Action Man: Robot Atak, Score: 11.9970
  ID: 31474926, Name: Crayon Shin-chan: The Storm Called The Jungle, Score: 8.9977
  ID: 4608223, Name: West Side Story, Score: 7.4981
  ID: 10087485, Name: Rosencrantz & Guildenstern Are Dead