In [2]:
!pip install pyspark
!pip install -U -q PyDrive2

!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u442-b06~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
from pyspark.rdd import RDD
from nltk.stem import PorterStemmer
import re
import math
import numpy as np


In [None]:
spark = SparkSession.builder.appName("VSM Retrieval").getOrCreate()
sc = spark.sparkContext

In [6]:
# Define path to input text files (change this to your directory)
input_dir = "/content/drive/MyDrive/EECS4415_BigDataSystem/A2/CISI-IndividualDocuments/"

# Load all text files into an RDD
text_rdd = sc.wholeTextFiles(input_dir)  # Each element is (filename, content)
print("Total number of documents:")
print(text_rdd.count())

Total number of documents:
1460


In [21]:
# See the top 5 elements of text_rdd
text_rdd.take(5)


[('file:/content/drive/MyDrive/EECS4415_BigDataSystem/A2/CISI-IndividualDocuments/941.txt',
  'ISBD Standard or Secret?\n  The controversial ISBD will mean radical changes in descriptive cataloging practice if put into operation, as planned, by the Library of Congress. Users of LC catalog cards will require retraining: all reference librarians  will experience an immediate and continuing demand for explanation of the  new catalog cards to users; those large public and research libraries with  computer-based systems will require costly modifications of computer programs. Yet the ISBD (International Standard Bibliographic Description) is destined  to be implemented by the Library of Congress with the sanction of ALA but  without even being considered by the recognized standards associations (ISO,  ANSI), to say nothing of the other professional and information industry  organizations.\n'),
 ('file:/content/drive/MyDrive/EECS4415_BigDataSystem/A2/CISI-IndividualDocuments/1014.txt',
  'The

Text Preprocessing

**Read a list of stopwords from a file named stop_words.lst (which is a text file with one stopword per line)**

In [None]:
# Read the stopwords from the stop_words.txt file and put them in a set named stop_words
stop_words=set()
#my google drive path to stopword
stopwords_path="/content/drive/MyDrive/EECS4415_BigDataSystem/A2/stop_words.txt"

with open(stopwords_path,'r')as fil:
  for line in fil:
    #remove line breaks
    word=line.strip()
    stop_words.add(word)

print(f"# of stopword load: {len(stop_words)}")


# of stopword load: 319


**Define a function to get the filename prefix without the full path name and the file extension. The prefix will be used as the document ID.**

In [None]:
def get_filename_prefix(filepath):
  try:
    filename = os.path.basename(filepath)  # Get the filename from the path
    filename_without_ext = os.path.splitext(filename)[0]  # Remove the extension
    return filename_without_ext
  except:
    return None


In [None]:
# Use the Porter Stemmer from NLTK
stemmer = PorterStemmer()

# Function to preprocess text
def preprocess_text(document):
  filename, content = document
  words = re.findall(r"\b[a-z]+\b", content.lower())  # Tokenization
  #remove stopwords
  filtered_word=[word for word in words if word not in stop_words]

  #stemming
  filtered_words=[stemmer.stem(word) for word in filtered_word]


  return (get_filename_prefix(filename), filtered_words)  # (doc_id, [word1, word2, word3, ...])

  #ps. i really should consider use other way name those valueable
  #it casued lots of time find out i wasn't use the stemmed word........................

# Convert text_rdd into another RDD (doc_words_rdd) whose elment is (doc_id, word_list)
doc_words_rdd = text_rdd.map(preprocess_text)

Build Inverted Index

**Compute term frequency (tf_value) of each word in each document, and put the results in a RDD (tf_rdd) where each element is (word, (doc_id, tf_value)). The relative term frequency value should be used, that is, the tf_value of a word $w$ in a document $d$ should be the number of occurrences of $w$ in $d$ divided by the total number of words in $d$.**

In [None]:
# Compute Term Frequency (TF)
def compute_tf(doc_tuple):
    doc_id,words=doc_tuple
    #total # of words inside doc
    total_words=len(words)

    #count each words occured time
    word_counts={}
    for w in words:
      if w not in word_counts:
        word_counts[w]=0
      word_counts[w]+=1

    #generate list in this format:  (word, (doc_id, tf_value)) as required
    tf_list=[]
    for w,count in word_counts.items():
      tf_value=count/total_words #relative word frequency
      tf_list.append((w,(doc_id,tf_value)))


    return tf_list  # York Code

# Convert doc_words_rdd into tf_rdd whiose element is (word, (doc_id, tf_value))
tf_rdd = doc_words_rdd.flatMap(compute_tf)

#debug
#tf_rdd.take(10)

**From tf_rdd, compute the document frequency of each word and put the results in a RDD (df_rdd), where each element is (word, doc_count)**

In [None]:
# Compute Document Frequency (DF) per word
#simple
#as long as we know each 'word' occured in which 'doc_id'
#so first we retrive (word, doc_id)
word_doc_rdd=tf_rdd.map(lambda x:(x[0],x[1][0]))

#do distinct operation to (word,doc_id), to elimate duplicate
#since in each single doc, one word may occur more than once, but df only need count once
word_doc_distinct_rdd=word_doc_rdd.distinct()

#conver (word, doc_id) to (word,1), then by reduceByKei get sum, we can get (word, doc_count)
df_rdd=word_doc_distinct_rdd.map(lambda x: (x[0],1))\
                .reduceByKey(lambda a,b:a+b)

#debug
#df_rdd.take(20)


The idf value of a word is computed as $log\frac{N}{df+1}$

In [None]:
# Compute Inverted Document Frequency (IDF)
#since N is the total number of doc, we can retrive it from pervious doc_words_rdd
N=doc_words_rdd.count()

#apply formula: log(N/df+1)
idf_rdd=df_rdd.mapValues(lambda df:math.log(N/(df+1)))

#debug
#idf_rdd.take(10)


**From tf_rdd and idf_rdd, compute the tf-idf value of a word for each document and put the results in another RDD tfidf_rdd whose element is (word, (doc_id, tf-idf))**

In [None]:
# Compute TF-IDF
#based on the pervious result
#when join key(word), we should get something like
#(word,((doc_idtfidf_value),idf_value)) - check debug code result
joined_rdd=tf_rdd.join(idf_rdd)

#use the rdd after joined, we can calculate tf*idf get tf-idf
#output format should like: (word,(doc_id,tfidf_value))
tfidf_rdd=joined_rdd.map(lambda x:(x[0],(x[1][0][0],x[1][0][1]*x[1][1])))

#debug
#tfidf_rdd.take(10)

**Convert tfidf_rdd to an inverted index RDD whose element is (word, [(doc_id, tf-idf), (doc_id, tf-idf), ...])**

In [None]:
# Convert to Inverted Index Format
#covert to interted index format
inverted_index_rdd=tfidf_rdd.groupByKey().mapValues(list)

#originally, I plan to use .lookup() function
#it well knowed in excel, and work similer like that to seach value in column and row
#i have no idea why colab report that was an error....

#therefore, i just collect them into a python dictionary
#further retrieve we can just use driver to get value
#this is not bad i guess? since it even more faster?
inverted_index_dict=inverted_index_rdd.collectAsMap()
print("Inverted index collected as dictionary. Size =",len(inverted_index_dict))

Inverted index collected as dictionary. Size = 5545


In [None]:
# Print a sample of the TF-IDF based inverted index
for word, postings in inverted_index_rdd.take(5):
    print(f"{word}: {postings}")
#---------------------------above is original code-------------------------------
#----------all below code will affect speed, comment it as needed----------
#-------------------------since they are all debug------------------------------
#debug code by myself
#read eclass a2 requirement page, I see sample inverted index
#here i just want see the my output compare to sample
#target_word="intensifi" #other word for testing: adher, simplifi, intensifi
#results=inverted_index_rdd.filter(lambda x:x[0]==target_word).collect()
#if results:
#  word,postings=results[0]
#  print(f"{word}: {postings}")
#else:
#  print(f"No results found for word: {target_word}")

#read the page:
#The ID of a document should be the prefix of its file name. For example, the ID of document 3.txt is '3'.
#here i wan't sort, as long as i got correct doc_id
#sample_doc_ids = doc_words_rdd.map(lambda x: x[0]).take(10)
#print("Sample doc IDs:", sample_doc_ids)

standard: [('941', 0.1311354067954801), ('1014', 0.041754557387615555), ('999', 0.19982538178358872), ('1003', 0.11418593244776498), ('1000', 0.059522454148303025), ('937', 0.07560960391811465), ('946', 0.028546483111941245), ('1001', 0.1613974237482832), ('993', 0.04823371284431452), ('940', 0.17484720906064014), ('1004', 0.17484720906064014), ('919', 0.06879234454844857), ('996', 0.09483238457526244), ('903', 0.030408210271415676), ('887', 0.09646742568862904), ('962', 0.04512186040274584), ('913', 0.06741097216795765), ('873', 0.06993888362425606), ('888', 0.1149680278754894), ('877', 0.09024372080549169), ('860', 0.06993888362425606), ('841', 0.0717321883325703), ('842', 0.04701773689025617), ('824', 0.0822810395579483), ('815', 0.08742360453032007), ('748', 0.06823305719439615), ('755', 0.035412092974306864), ('724', 0.037804801959057326), ('657', 0.04823371284431452), ('603', 0.06582483164635863), ('608', 0.029761227074151513), ('604', 0.028258134797679215), ('655', 0.02391072944

In [43]:
# See how many words in the inverted index
inverted_index_rdd.count()

5545

Vector Space Model



**Query processing: define a function to convert a query into its tf-idf representation**

In [None]:
# Put the word IDF values in a dictionary for fast lookup during query processing
idf_lookup = idf_rdd.collectAsMap()

# Function to process a query into a sparse TF-IDF dictionary
def process_query(query):
    # N it number of doc, is used to search words that have not appeared
    N=doc_words_rdd.count()

    #break word & lowercase, only keep letters
    tokens=re.findall(r"[a-z]+",query.lower())

    #remove stopword
    #this implemented before, same logic, check part 2
    filtered_words=[word for word in tokens if word not in stop_words]

    #stemming
    stemmed_tokens=[stemmer.stem(word) for word in filtered_words]

    #check each word occured time
    word_counts={}
    for w in stemmed_tokens:
      if w not in word_counts:
        word_counts[w]=0
      word_counts[w]+=1

    #calculate word TF value(relative frequent =count/total)
    total_terms=len(stemmed_tokens)
    query_tfidf={}
    for w, count in word_counts.items():
      #TF
      tf=count/total_terms if total_terms>0 else 0

      #if this word wasn't occured inside the doc, then df=0=>idf=log(n/(0+1))=log(n=N)
      idf=idf_lookup.get(w,math.log(N))

      #finally we can got the word in the seach TF-IDF
      query_tfidf[w]=tf*idf


    return query_tfidf

**Precompute Document Norms:**
The L2 norm of each document is computed once to speed up document score computation. This reduces redundant calculations at query time. This computation should be done based on the inverted index.



In [None]:
# Precompute document norms based on their words' tf-idf values and put the norms in a Python dictionary with key being doc_id and value being the document norm
#from the pervious code result the inverted index format is:
#standard: [('941', 0.10943756502450377), ('999', 0.12507150288514715), ('1003', 0.14293886044016818)
#more standard way is: (word, [(doc_id1, tf_idf1),.......])

#therefore we can extend each (word,posting) as (doc_id, tf_idf)
doc_tf_idf_rdd=inverted_index_rdd.flatMap(lambda x:x[1])
#!!here x[1] is posting list, not other things, had to remember this

#calculate the sum of squares of all tf-idf values ​​for each doc_id
#we can use reduceByKey to cumulate
doc_tf_idf_sumsq_rdd=doc_tf_idf_rdd.map(lambda x:(x[0],x[1] ** 2)).reduceByKey(lambda a,b: a+b)

#take square root of sum of square to get the L2 norm (this stand for each dic..)
doc_norms_rdd=doc_tf_idf_sumsq_rdd.mapValues(math.sqrt)

#collect result to py dictonary
doc_norms=doc_norms_rdd.collectAsMap()

#debug
for d_id in list(doc_norms.keys())[:5]:
  print(f"doc_id: {d_id}, norm: {doc_norms[d_id]}")


doc_id: 999, norm: 0.8315303035221504
doc_id: 937, norm: 0.544077901108632
doc_id: 946, norm: 0.45342040779059023
doc_id: 996, norm: 0.6080724270867532
doc_id: 913, norm: 0.5379992948346767


In [None]:
# Function to compute document scores given a query using the vector space model
def rank_documents(query, top_k):
    #to see why use pyton dictionary, check code block near end of part II

    #retrive tf-idf, we just done before
    query_tfidf=process_query(query)

    #scores is used to accumulate the dot product (q·d) for each document
    scores={}

    #(q⋅d) / ||d||
    #(q⋅d) is for BIG PART: 1
    #||d|| is for BIG PART 2

#---------------------------------------------------
    #BIG PART: 1 ------------ (q⋅d)
#---------------------------------------------------
    #for more information why use dictionary, check code block at the end of part 2
    #iterate over each word q_word in the query vector
    #core idea here is retrieve each doc contain this word
    for q_word,q_tfidf_value in query_tfidf.items():
      #posting format: [(doc_id, doc_tfidf),.........]
      postings_list=inverted_index_dict.get(q_word, [])
      # if posting list is empty | word is not occured, then skip
      for (doc_id,doc_tfidf_value) in postings_list:
        #dot_product contribute for current
        score_contribtion=q_tfidf_value*doc_tfidf_value
        scores[doc_id]=scores.get(doc_id,0.0)+score_contribtion

#---------------------------------------------------
    #BIG PART: 2 ------------ ||d||
#---------------------------------------------------
    #here divide ||d|| based on above score
    final_scores=[]
    for doc_id,dot_product in scores.items():
      #if this doc is in doc_norms and range is not 0, then divide /
      if doc_id in doc_norms and doc_norms[doc_id]!=0:
        score=dot_product/doc_norms[doc_id]
        final_scores.append((doc_id,score))

    #finally, sort from high to low, and take front top
    ranked_docs=sorted(final_scores,key=lambda x:x[1],reverse=True)[:top_k]


    return ranked_docs


**Given a query, call the rank_documents function and print the top-10 documents**

In [48]:
# Example Query 1
query = "Specific advantages of computerized index systems."
top_docs = rank_documents(query, 10)

# Print top 10 relevant documents
print("\nTop 10 Relevant Documents:")
print("\nDoc_ID: Document Score")
for doc, score in top_docs:
    print(f"{doc}: {score}")


Top 10 Relevant Documents:

Doc_ID: Document Score
377: 0.34316571310187943
869: 0.30415914371393965
824: 0.23371892909299502
1144: 0.22246147894220142
856: 0.2212078128278848
1419: 0.21683483722162045
715: 0.2066610196302842
812: 0.20652041350135483
489: 0.19080310866870978
194: 0.18652349192687342


In [49]:
# Example Query 2
query = "How can actually pertinent data, as opposed to references or entire articles themselves, be retrieved automatically in response to information requests?"
top_docs = rank_documents(query, 10)

# Print top 10 relevant documents
print("\nTop 10 Relevant Documents:")
print("\nDoc_ID: Document Score")
for doc, score in top_docs:
    print(f"{doc}: {score}")


Top 10 Relevant Documents:

Doc_ID: Document Score
1138: 0.2698882732022883
532: 0.26532484401971396
1155: 0.1958742695858231
562: 0.15561758487265828
309: 0.15488168234340965
790: 0.1425356353084291
1327: 0.14038899967739984
451: 0.13858007618172363
315: 0.13833282930878682
58: 0.13745453463399604
