# Building inverted index and answering queries

Here is presented a simple standard document processing pipeline with a simple search engine based on it: starting from crawling documents, then building an inverted index, answering queries using this index, and organizing it as a simple web server.

# 1. Preprocessing

First, here is a unified approach to documents preprocessing, and Preprocessor class is responsible for it.

In [1]:
import nltk

class Preprocessor:

    def __init__(self):
        self.stop_words = {'a', 'an', 'and', 'are', 'as', 'at', 'be', 'by',
                           'for', 'from', 'has', 'he', 'in', 'is', 'it', 'its',
                           'of', 'on', 'that', 'the', 'to', 'was', 'were',
                           'will', 'with'}
        self.ps = nltk.stem.PorterStemmer()

    def tokenize(self, text):
        return nltk.word_tokenize(text)

    def stem(self, word, stemmer):
        return stemmer.stem(word)

    def is_apt_word(self, word: str):
        return word not in self.stop_words and word.isalpha()

    def preprocess(self, text):
        """
        tokenize lowercased text and stem it, ignoring not appropriate words
        """
        text = text.lower()
        tokenized_text = self.tokenize(text)
        stemmed = [self.stem(word, self.ps) for word in tokenized_text]

        filtered_words = []
        for word in stemmed:
            if self.is_apt_word(word):
                filtered_words.append(word)

        return filtered_words


## 1.1. Tests ##

In [2]:
prep = Preprocessor()
text = 'To be, or not to be, that is the question'

assert prep.tokenize(text) == ['To', 'be', ',', 'or', 'not', 'to', 'be', ',', 'that', 'is', 'the', 'question']
assert prep.stem('retrieval', prep.ps) == 'retriev'
assert prep.is_apt_word('qwerty123') is False
assert prep.preprocess(text) == ['or', 'not', 'question']

# 2. Crawling and Indexing

## 2.1 Base classes

Here are some base classes that needed for writing indexer.

In [3]:
import requests
from urllib.parse import quote
from bs4 import BeautifulSoup
from bs4.element import Comment
import urllib.parse
import os


class Document:

    def __init__(self, url):
        self.url = url

    def download(self):

        def is_html_document(response: requests.Response):
            if response is not None \
                    and response.headers['content-type'] is not None \
                    and "html" in response.headers['content-type']:
                return True

        try:
            response = requests.get(self.url)
            self.content = []
            if response.status_code == 200 and is_html_document(response):
                self.content = response.content
                if self.content is not None and len(self.content) > 0:
                    # print("from internet")
                    return True  # check if page content is not empty.
            return False
        except:
            return False

    def get_file_name(self, path):
        return os.path.join(path, quote(self.url).replace('/', '_'))

    def load(self, path):
        try:
            with open(self.get_file_name(path), 'rb') as f:
                self.content = f.read()
                # print("from disk")
        except:
            return False
        return True

    def get(self, path):
        if not self.load(path):
            return self.download()
        return True

    def persist(self, path):
        with open(self.get_file_name(path), 'wb') as f:
            f.write(self.content)


class HtmlDocument(Document):

    def normalize(self, href):
        if href is not None and href[:4] != 'http':
            href = urllib.parse.urljoin(self.url, href)
        return href

    def parse(self):

        def is_text_inside_main_div(element):
            """
            Check if text inside following tag construction:
            <div class="StandardArticleBody_body"/><p>...</p></div>
            which is the main text container.
            :param element: Actually text element.
            """
            return element.parent.parent is not None \
                   and element.parent.parent.name == 'div' \
                   and 'class' in element.parent.parent.attrs \
                   and 'StandardArticleBody_body' in element.parent.parent.attrs['class']

        def tag_visible(element):
            if element.parent.name in ['style', 'script', 'head', 'title',
                                       'meta', '[document]']:
                return False
            if isinstance(element, Comment):
                return False
            return is_text_inside_main_div(element)

        def is_available_content_type(href):
            return not (href.endswith(".pdf") or href.endswith(".mp3")
                        or href.endswith(".avi") or href.endswith(".mp4")
                        or href.endswith(".txt"))

        model = BeautifulSoup(self.content)

        self.anchors = []
        a = model.find_all('a')
        
        for anchor in a:
            href = self.normalize(anchor.get('href'))
            if href is None:
                continue
            text = anchor.text

            if href.startswith('https://www.reuters.com/') \
                    and is_available_content_type(href):
                self.anchors.append((text, href))

        texts = model.findAll(text=True)
        visible_texts = filter(tag_visible, texts)
        self.text = u" ".join(t.strip() for t in visible_texts)

## 2.2 Main class

The main indexer logic is here. It organized as a crawler generator that adds certain visited pages to inverted index and saves them on disk. 

- `crawl_generator_for_index` method crawles the given website doing BFS, starting from `source` within given `depth`. Considers only inner pages (of a form https://www.reuters.com/...) for visiting. To speed up, doesn't consider for visiting pages with content type other than html: '.pdf', '.mp3', '.avi', '.mp4', '.txt'. If encounters an article page (of a form https://www.reuters.com/article/...), saves its content in a file in `collection_path` folder and populates the inverted index calling `index_doc` method. When done, saves on disk three resulting dictionaries:
    - `doc_urls`, `doc_id:url`
    - `index`, `term:[collection_frequency, (doc_id_1, doc_freq_1), (doc_id_2, doc_freq_2), ...]`
    - `doc_lengths`, `doc_id:doc_length` 

    `limit` parameter is given for testing - if not `None`, the loop is broken when number of saved articles exceeds the `limit` and return without writing dictionaries to disk.
    
    
- `index_doc` method parses and preprocesses the content of a `doc` and adds it to the inverted index. Also keeps track of document lengths in a `doc_lengths` dictionary.

In [4]:
from queue import Queue
import pickle
from pathlib import Path


class Indexer:

    def __init__(self):
        # dictionaries to populate
        self.doc_id = 0  # also consider it as a saved document counter.
        self.doc_urls = {}
        self.index = {}
        self.doc_lengths = {}
        # preprocessor
        self.prep = Preprocessor()

    def crawl_generator_for_index(self, source, depth,
                                  collection_path="collection", limit=None):

        def is_article(url: str):
            return url.startswith("https://www.reuters.com/article/")

        def process_article(article: HtmlDocument):
            """
            save article's content and populates the inverted index.
            """
            self.doc_id += 1
            article.persist(collection_path)
            self.index_doc(article, self.doc_id)

        def save_dictionaries_on_disk():
            with open('inverted_index.p', 'wb') as f:
                pickle.dump(self.index, f)
            with open('doc_lengths.p', 'wb') as f:
                pickle.dump(self.doc_lengths, f)
            with open('doc_urls.p', 'wb') as f:
                pickle.dump(self.doc_urls, f)

        # create a path if it does not exist.
        Path(collection_path).mkdir(parents=True, exist_ok=True)

        q = Queue()
        q.put((source, 0))
        visited = set()
        while not q.empty():
            url, url_depth = q.get()
            if url not in visited:
                visited.add(url)
                try:
                    doc = HtmlDocument(url)
                    if doc.get(collection_path):
                        doc.parse()
                    else:
                        continue
                    for a in doc.anchors:
                        if url_depth + 1 < depth:
                            q.put((a[1], url_depth + 1))
                    yield doc

                    if is_article(doc.url):
                        process_article(doc)
                        if limit is not None and self.doc_id >= limit:
                            # if limit is reached - end the cycle
                            break  # stop finding articles.
                except FileNotFoundError as e:
                    print("Analyzing", url, "led to FileNotFoundError")

        if q.empty():
            save_dictionaries_on_disk()

    def index_doc(self, doc, doc_id):

        def create_doc_vocabulary(preprocessed_text):
            freq_vocabulary = {}  # term: term_frequency
            for term in preprocessed_text:
                if term not in freq_vocabulary:
                    freq_vocabulary[term] = 1
                else:
                    freq_vocabulary[term] += 1
            return freq_vocabulary

        def fill_inverted_index_dict(preprocessed_text):
            freq_vocabulary = create_doc_vocabulary(preprocessed_text)
            for term, term_freq in freq_vocabulary.items():
                if term not in self.index:
                    self.index[term] = [term_freq, (doc_id, term_freq)]
                else:
                    self.index[term].append((doc_id, term_freq))
                    self.index[term][0] += term_freq

        preprocessed_text = self.prep.preprocess(doc.text)
        self.doc_urls[doc_id] = doc.url
        fill_inverted_index_dict(preprocessed_text)
        self.doc_lengths[doc_id] = len(preprocessed_text)


## 2.3. Tests ##

In [5]:
indexer = Indexer()
k = 1
for c in indexer.crawl_generator_for_index("https://www.reuters.com/news/us", 2, "test_collection", 5):
    print(k, c.url)
    k+=1

assert type(indexer.index) is dict
assert type(indexer.index['reuter']) is list
assert type(indexer.index['reuter'][0]) is int
assert type(indexer.index['reuter'][1]) is tuple

1 https://www.reuters.com/news/us
2 https://www.reuters.com/
3 https://www.reuters.com/finance
4 https://www.reuters.com/finance/markets
5 https://www.reuters.com/news/world
6 https://www.reuters.com/politics
7 https://www.reuters.com/video
8 https://www.reuters.com/news/archive/healthNews
9 https://www.reuters.com/article/us-health-coronavirus-exclusive/exclusive-emails-reveal-breakdowns-in-u-s-drive-through-virus-testing-idUSKCN21W2KP
10 https://www.reuters.com/news/archive/politicsNews
11 https://www.reuters.com/article/us-health-coronavirus-usa-new-york/new-york-hospitalizations-fall-for-first-time-in-coronavirus-pandemic-governor-idUSKCN21W2DH
12 https://www.reuters.com/article/us-health-coronavirus-usa/trump-speaks-of-mutiny-as-cuomo-dismisses-premature-coronavirus-reopening-idUSKCN21W20G
13 https://www.reuters.com/article/us-health-coronavirus-usa-congress/republicans-warn-coronavirus-aid-program-running-out-of-cash-idUSKCN21W20O
14 https://www.reuters.com/article/us-health-coro

## 2.4 Building index

In [6]:
indexer = Indexer()
k = 1
for c in indexer.crawl_generator_for_index("https://www.reuters.com/", 3, "docs_collection"):
    print(k, c.url)
    k+=1

1 https://www.reuters.com/
2 https://www.reuters.com/home
3 https://www.reuters.com/finance
4 https://www.reuters.com/legal
5 https://www.reuters.com/finance/deals
6 https://www.reuters.com/subjects/aerospace-and-defense
7 https://www.reuters.com/subjects/banks
8 https://www.reuters.com/subjects/autos
9 https://www.reuters.com/finance/summits
10 https://www.reuters.com/subjects/sustainable-business
11 https://www.reuters.com/the-world-at-work
12 https://www.reuters.com/finance/markets
13 https://www.reuters.com/finance/markets/us
14 https://www.reuters.com/finance/markets/europe
15 https://www.reuters.com/finance/markets/asia
16 https://www.reuters.com/finance/global-market-data
17 https://www.reuters.com/markets/stocks
18 https://www.reuters.com/markets/bonds
19 https://www.reuters.com/markets/currencies
20 https://www.reuters.com/markets/commodities
21 https://www.reuters.com/finance/funds
22 https://www.reuters.com/finance/EarningsUS
23 https://www.reuters.com/finance/markets/divide

## 2.5 Index statistics

In [7]:
# load index, doc_lengths and doc_urls
with open('inverted_index.p', 'rb') as fp:
    index = pickle.load(fp)
with open('doc_lengths.p', 'rb') as fp:
    doc_lengths = pickle.load(fp)
with open('doc_urls.p', 'rb') as fp:
    doc_urls = pickle.load(fp)

In [8]:
print('Total index length', len(index))
print('\nTop terms by number of documents they apperared in:')
sorted_by_n_docs = sorted(index.items(), key=lambda kv: (len(kv[1]), kv[0]), reverse=True)
print([(sorted_by_n_docs[i][0], len(sorted_by_n_docs[i][1])) for i in range(20)])
print('\nTop terms by overall frequency:')
sorted_by_freq = sorted(index.items(), key=lambda kv: (kv[1][0], kv[0]), reverse=True)
print([(sorted_by_freq[i][0], sorted_by_freq[i][1][0]) for i in range(20)])

Total index length 11565

Top terms by number of documents they apperared in:
[('reuter', 683), ('said', 640), ('s', 610), ('have', 513), ('coronaviru', 507), ('ha', 501), ('wa', 497), ('but', 426), ('thi', 425), ('not', 419), ('which', 418), ('more', 411), ('than', 370), ('their', 365), ('new', 363), ('after', 360), ('been', 357), ('would', 356), ('had', 349), ('also', 348)]

Top terms by overall frequency:
[('said', 3174), ('s', 2903), ('have', 1607), ('wa', 1467), ('ha', 1400), ('coronaviru', 1092), ('not', 1089), ('but', 1037), ('reuter', 969), ('we', 945), ('thi', 944), ('more', 911), ('would', 870), ('they', 848), ('their', 836), ('had', 824), ('state', 789), ('which', 783), ('new', 770), ('year', 756)]


# 3. Answering query

Now, given already built inverted index, it's time to utilize it for answering user queries. In this class there are two implemented methods:
- `boolean_retrieval`, the simplest form of document retrieval which returns a set of documents such that each one contains all query terms. Returns a set of document ids. Refer to *ch.1* of the book for details;
- `okapi_scoring`, Okapi BM25 ranking function - assigns scores to documents in the collection that are relevant to the user query. Returns a dictionary of scores, `doc_id:score`. Read about it in [Wikipedia](https://en.wikipedia.org/wiki/Okapi_BM25#The_ranking_function) and implement accordingly.

Both methods accept `query` parameter in a form of a dictionary, `term:frequency`

In [9]:
from collections import Counter
import math


class QueryProcessing:

    @staticmethod
    def prepare_query(raw_query):
        prep = Preprocessor()
        # pre-process query the same way as documents
        query = prep.preprocess(raw_query)
        # count frequency
        return Counter(query)

    @staticmethod
    def boolean_retrieval(query, index):
        """ retrieve a set of documents containing all query terms """ 
        terms_docs_intersection = None

        for query_term in query:
            term_docs = set()
            for index_element in index[query_term]:
                if isinstance(index_element, tuple):
                    term_docs.add(index_element[0])

            if terms_docs_intersection is None:
                terms_docs_intersection = term_docs
            else:
                terms_docs_intersection = terms_docs_intersection.intersection(term_docs)

        return terms_docs_intersection

    @staticmethod
    def okapi_scoring(query, doc_lengths, index, k1=1.2, b=0.75):
        relevant_docs_scores = {}

        def calculate_avg_num_words_in_docs():
            avg_d = 0
            for _, doc_length in doc_lengths.items():
                avg_d += doc_length
            return avg_d / len(doc_lengths)

        # idf_sum = 0
        N = len(doc_lengths)
        avg_d = calculate_avg_num_words_in_docs()

        for query_term in query:
            if query_term not in index:
                continue

            n_q = len(index[query_term]) - 1  # num of docs containing term q.
            idf_q = math.log((N - n_q + 0.5) / (n_q + 0.5))

            for index_element in index[query_term]:
                if isinstance(index_element, tuple):
                    # number of times term q appears in doc D:
                    f_q = index_element[1]
                    doc_id = index_element[0]
                    D = doc_lengths[doc_id]  # number of words in doc d

                    # BM25 score for document d:
                    score_d = idf_q * f_q * (k1 + 1) / (f_q + k1 * (1 - b + b * D / avg_d))

                    if doc_id not in relevant_docs_scores:
                        relevant_docs_scores[doc_id] = score_d
                    else:
                        relevant_docs_scores[doc_id] += score_d

        return relevant_docs_scores


## 3.1 Tests 

In [10]:
test_doc_lengths = {1: 20, 2: 15, 3: 10, 4:20, 5:30}
test_index = {'x': [2, (1, 1), (2, 1)], 'y': [2, (1, 1), (3, 1)], 'z': [3, (2, 1), (4,2)]}


test_query1 = QueryProcessing.prepare_query('x z')
test_query2 = QueryProcessing.prepare_query('x y')


assert QueryProcessing.boolean_retrieval(test_query1, test_index) == {2}
assert QueryProcessing.boolean_retrieval(test_query2, test_index) == {1}
okapi_res = QueryProcessing.okapi_scoring(test_query2, test_doc_lengths, test_index)
assert all(k in okapi_res for k in (1,2,3))
assert not any(k in okapi_res for k in (4,5))
assert okapi_res[1] > okapi_res[3] > okapi_res[2]

# 4. Setting up a server

The resulting search engine is organized as a web-service that gets a query from get-parameters and returns urls with scores as a `json` dictionary. It is work in a browser of with curl, and look smth like this:
 
`> curl localhost:8080/?q=some_query_text
{ "url1" : 0.9, "url2": 0.8 }`

In [None]:
import socketserver
import json
from http.server import SimpleHTTPRequestHandler
from urllib import parse

PORT_NUMBER = 8080


class MyHandler(SimpleHTTPRequestHandler):

    def do_GET(self):
        
        def create_urls_scores(okapi_scores):
            sorted_scores = {k: v for k, v in sorted(okapi_scores.items(), key=lambda item: item[1], reverse=True)}

            urls_scores = {}
            for doc_id, okapi_score in sorted_scores.items():
                doc_url = indexer.doc_urls[doc_id]
                urls_scores[doc_url] = okapi_score
            
            return json.dumps(urls_scores).encode()
        
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()

        raw_query = parse.parse_qsl(parse.urlsplit(self.path).query)[0][1]
#         print(raw_query)
        query = QueryProcessing.prepare_query(raw_query)
        okapi_scores = QueryProcessing.okapi_scoring(query, indexer.doc_lengths, indexer.index)
        self.wfile.write(create_urls_scores(okapi_scores))

with socketserver.TCPServer(("", PORT_NUMBER), MyHandler) as httpd:
    print("serving at port", PORT_NUMBER)
    httpd.serve_forever()

serving at port 8080


127.0.0.1 - - [14/Apr/2020 11:49:26] "GET /?q=Trump HTTP/1.1" 200 -
