<a href="https://colab.research.google.com/github/CandaceCooley/Assignment-3/blob/main/CTF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext, SparkConf
from math import log10, sqrt
import re

In [3]:
conf = SparkConf().setAppName("CosineNormalizedTF").setMaster("local")
sc = SparkContext.getOrCreate(conf)

In [4]:
input_file = "/content/Assignment3_Data.txt"
output_path = "/content/CTF_index"

In [5]:
def clean_and_split(line):
    line = line.lower()
    words = re.findall(r'\b[a-z]+\b', line)
    return words

In [6]:
def extract_word_doc_pairs(line):
    if ":" in line:
        doc_id, text = line.split(":", 1)
        words = clean_and_split(text)
        return [((word, doc_id), 1) for word in words]
    else:
        return []

In [7]:
fileRDD = sc.textFile(input_file)

In [8]:
word_doc_freq = fileRDD.flatMap(extract_word_doc_pairs) \
                       .reduceByKey(lambda a, b: a + b)

In [9]:
log_weighted_tf = word_doc_freq.mapValues(lambda tf: round(1 + log10(tf), 4))

In [10]:
doc_tf_squared = log_weighted_tf.map(lambda x: (x[0][1], x[1] ** 2))


doc_norms = doc_tf_squared.reduceByKey(lambda a, b: a + b) \
                          .mapValues(lambda sum_sq: round(sqrt(sum_sq), 4))

In [11]:
doc_norms_dict = dict(doc_norms.collect())  # Broadcastable size
broadcast_norms = sc.broadcast(doc_norms_dict)


normalized_tf = log_weighted_tf.map(lambda x: (x[0][0], (x[0][1], round(x[1] / broadcast_norms.value[x[0][1]], 4))))

In [12]:
grouped_index = normalized_tf.groupByKey().mapValues(list)

In [13]:
def format_index(entry):
    word, postings = entry
    postings_str = '+'.join([f"{doc}#{score}" for doc, score in postings])
    return f"{word}@{postings_str}"

formatted_index = grouped_index.map(format_index)

In [14]:
import shutil
shutil.rmtree(output_path, ignore_errors=True)
formatted_index.saveAsTextFile(output_path)

sc.stop()