## Distributed Information Retrieval

In this exercise we implement a core process of Fagin's algorithm, the parallel scanning of the posting lists. Assume an aggregation function that returns the sum of the tf-idf scores given the terms in a document.

We assume that the posting lists for each term of a retrieval system are running on a different node.

We first create a dictionary $h$, where each entry of the dictionary corresponds to a posting list for term, assumed to be hosted in a different node. Explore the structure of $h$, to understand it.

In [1]:
from nltk.stem import PorterStemmer, WordNetLemmatizer
import nltk
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel
import string
from nltk.corpus import stopwords
import math
from collections import Counter
nltk.download('stopwords')
import numpy as np

stemmer = PorterStemmer()

# Tokenize, stem a document
def tokenize(text):
    text = "".join([ch for ch in text if ch not in string.punctuation])
    tokens = nltk.word_tokenize(text)
    return " ".join([stemmer.stem(word.lower()) for word in tokens])

# Read a list of documents from a file. Each line in a file is a document
with open("bread.txt") as f:
# with open("epfldocs.txt") as f:
    content = f.readlines()
original_documents = [x.strip() for x in content] 
documents = [tokenize(d).split() for d in original_documents]

# create the vocabulary
vocabulary = set([item for sublist in documents for item in sublist])
vocabulary = [word for word in vocabulary if word not in stopwords.words('english')]
vocabulary.sort()

# compute IDF, storing idf values in a dictionary
def idf_values(vocabulary, documents):
    idf = {}
    num_documents = len(documents)
    for i, term in enumerate(vocabulary):
        idf[term] = math.log(num_documents/sum(term in document for document in documents), math.e)
    return idf

# Function to generate the vector for a document (with normalisation)
def vectorize(document, vocabulary, idf):
    vector = [0]*len(vocabulary)
    counts = Counter(document)
    max_count = counts.most_common(1)[0][1]
    for i,term in enumerate(vocabulary):
        vector[i] = idf[term] * counts[term]/max_count
    return vector

# Function to compute cosine similarity
def cosine_similarity(v1,v2):
    sumxx, sumxy, sumyy = 0, 0, 0
    for i in range(len(v1)):
        x = v1[i]; y = v2[i]
        sumxx += x*x
        sumyy += y*y
        sumxy += x*y
    if sumxy == 0:
            result = 0
    else:
            result = sumxy/math.sqrt(sumxx*sumyy)
    return result

def vectorize_query(query, vocabulary, idf):
    q = query.split()
    q = [stemmer.stem(w) for w in q]
    query_vector = vectorize(q, vocabulary, idf)
    return query_vector
    
def search_vec(query, k):
    query_vector = vectorize_query(query, vocabulary, idf)
    scores = [[cosine_similarity(query_vector, document_vectors[d]), d] for d in range(len(documents))]
    scores.sort(key=lambda x: -x[0])
    ans = []
    indices = []
    for i in range(min(k,len(original_documents))):
        ans.append(original_documents[scores[i][1]])
        indices.append(scores[i][1])
    return ans, indices, query_vector

# Compute IDF values and vectors
idf = idf_values(vocabulary, documents)
document_vectors = [vectorize(s, vocabulary, idf) for s in documents]

[nltk_data] Downloading package stopwords to /Users/duong/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Question

In this exercise we implement a core process of Fagin's algorithm, the parallel scanning of the posting lists. Assume an aggregation function that returns the sum of the tf-idf scores given the terms in a document.

We assume that the posting lists for each term of a retrieval system are running on a different node.

We first create a dictionary $h$, where each entry of the dictionary corresponds to a posting list for a term, assumed to be hosted in a different node. Explore the structure of $h$, to understand it. We suggest to first use the simple collection breads.txt.

In [2]:
import numpy as np
import operator

doc_vecs = np.transpose(np.array(document_vectors))
h = {}
for i, term in enumerate(vocabulary):
    ha = {}
    for docj in range(len(original_documents)):
        tfidf = doc_vecs[i][docj]
        ha[docj] = tfidf
    sorted_ha = sorted(ha.items(), key=operator.itemgetter(1), reverse=True)
    h[term] = sorted_ha

### Complete the following function that implements the Fagin algorithm.

In [3]:
def fagin_algorithm(query, h, k, vocabulary):
    
    # Split and stem the query
    q = query.split()
    q = set([stemmer.stem(w) for w in q])
    query_term_cnt = len(q)
    
    # select the posting lists for the query terms
    posting_lists = {}
    for term in q:
        if term in h:
            posting_lists[term] = h[term]
    
    max_len = len(documents)
    
    # Traverse the selected posting lists until we found k documents that appear in ALL posting lists
    # This corresponds to phase 1 of Fagin's algorithm.
    # As a result you produce a dictionary documents_occurrences, with the document identifiers as keys, 
    # and the number of documents found as value.
    # We stop traversing the posting lists until we have found k documents that appear in ALL posting lists 
    # of the query terms

    documents_occurrences = {}
    l = 0
    found_documents = 0
    while l < max_len:
        for term in q:
            d  = posting_lists[term][l][0]
            if d in documents_occurrences.keys():
                documents_occurrences[d] = documents_occurrences[d]+1
            else:
                documents_occurrences[d] = 1
            if documents_occurrences[d] == query_term_cnt:
                found_documents = found_documents + 1
        if found_documents == k:
            l = max_len + 1
            break
        else:
            l = l+1
                
    print(documents_occurrences)
        
    # Retrieve for the found documents the tfidf values and add them up.
    # For simplicity, we do not distinguish the cases whether the values have already been retrieved in the 
    # previous phase, or whether we use random access to obtain those values
    
    tfidf = {}
    for d in documents_occurrences.keys():
        t = 0
        for term in q:
            t = t + dict(posting_lists[term])[d]
        tfidf[d] = t
        
    # Finally we compute the top-k documents and return the answer
    
    ans = sorted(tfidf.items(), key=lambda x:x[1], reverse = True)[:k]
    return ans


ans = fagin_algorithm("bread recipe", h, 2, vocabulary)
print(ans)
for document_id in ans:
    print(original_documents[document_id[0]])

{3: 2, 2: 1, 0: 2, 1: 1}
[(3, 1.1394342831883648), (0, 0.5697171415941824)]
Breads, Pastries, Pies, and Cakes: Quantity Baking Recipes
How to Bake Breads Without Baking Recipes


Produce your own datasets to explore the behavior of the algorithm. Create a dataset such that for retrieving a 2 term query a total of 6 documents needs to be retrieved in the first phase of the algorithm (as in the example in the lecture).