# Index construction

This week's exercise is about constructing an inverted index in more detail.

In particular, for the programming exercise, we will look at blocked
sort-based indexing.  This is an indexing technique that can be employed when
the full index, or some intermediate datastructures do not fit in the main
memory of the computer performing the indexing.


The basic algorithm is outlined in the book in Figure 4.2.  We will provide
you with some code that produces a block and reads and writes blocks from and
to disk, and leave the implementation of `bsbi_invert()` and the merging part
of `merge_blocks()` to you.

The format of the block you get from `parse_next_block()` is a potentially
unordered list of `(termid, docid)` pairs.  The global datastructure
`termid_map` can be used to look up term ids by term and the global
datastructure `docid_map` can be used to look up documents by id.

Our provided implementation of `parse_corpus()` and `parse_next_block()`
produces the same document and token ids across executions.

**Please note:** Here in the exercise we use an "optimized" version of BSBI. With BSBI as you have seen it in the lecture, the intermediate files are lists of raw termID-docID pairs, not postings lists. The postings list is only built at the very end when merging all the intermediate files. It is SPIMI that uses already built partial postings lists as intermediate representation. If this difference is not fully clear to you, we strongly advise you to revise the slides or the book.

First we define some variables. Note the use of python's `namedtuple` factory function which is used here to create simple tuple subclasses that allow us to access tuple fields by name

In [None]:
import sys
sys.path.append("../../")

In [None]:
import os, shutil

from collections import namedtuple
from textutils import tokenize_document

Document = namedtuple('Document', ['path', 'id'])
PostingsList = namedtuple('PostingsList', ['termid', 'postings'])

docid_map = {}
documentid_counter = 1
termid_map = {}
termid_counter = 1

The following function parses the corpus.

In [None]:
import glob

def parse_corpus(corpus='solution-corpus.txt'):
    '''
    Parse corpus from input file and populate document-id map, return list of
    documents as Document() tuples.
    '''
    global docid_map
    documents = []
    documentid_counter = 1
    for path in sorted(glob.glob('../../shared/corpus/*.txt')):
        documents.append(Document(path=path, id=documentid_counter))
        docid_map[documentid_counter] = path
        documentid_counter += 1

    return documents

documents = parse_corpus()

Let's preview the first few documents.

In [None]:
documents[:5]

The following creates a directory for the block storage, the index and other auxiliary files. It deletes data from a previous run if it exists, so you can try again.

In [None]:
OUTPUT_DIR = 'output'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.mkdir(OUTPUT_DIR)

#### Task 1:

Implement `bsbi_invert()`.

In [None]:
def bsbi_invert(block):
    '''
    Compute inverted index for block.
    Return inverted index as (sorted by termid) list of PostingsList() tuples.
    '''
    postings = []
    # Assignment: implement construction of inverted index for list of tuples
    # given in block
    # 1. Sort block contents by term id.
    # Hint python's sort optionally takes a key function, see e.g.
    # https://developers.google.com/edu/python/sorting#custom-sorting-with-key
    # 2. Construct inverted index from termid-docid pairs
    ### TODO: assignment
    return postings

#### Task 2:
Complete the implementation of `merge_blocks()`. For this, you first need to implement the helper function `select_next_readbufs()`. It gets called in order to return the termid of the term whose postings list is the next to be merged. In the next step, you then have to implement this merging. To do this, make use of the provided helper function `merge()`

In [None]:
def select_next_termid(readbufs):
    '''
    Select next termid for which to compute complete postings list

    readbufs is the list of input read buffers. This list may contain None for
    input files which we have already processed completely.
    '''
    # Assignment: implement an algorithm to select the next termid for which
    # we will merge postings lists
    ### TODO: assignment
    return None


MERGE_READ_WINDOW = 500
def refill_read_buffer(fh):
    '''
    Read a new buffer for read window in external merge from file at fh.
    '''
    b = []
    for _ in range(MERGE_READ_WINDOW):
        p = read_postings_list(fh)
        if not p:
            # Hit end-of-file, return what we've read so far.
            break
        b.append(p)
    return b

def merge(postings1, postings2):
    '''
    A helper function that merges the two postings lists postings1 and
    postings2.
    '''
    return sorted(set(postings1) | set(postings2))


def merge_blocks(blockfiles, outfile):
    '''
    Merge all the blocks stored in the files listed in blockfiles.
    Write the merged index to outfile.
    '''

    # open all input files
    inputs = []
    readbufs = []
    for f in blockfiles:
        print("Opening file", f, "for input")
        inputs.append(open(os.path.join(OUTPUT_DIR, f)))
    # open output file
    outf = open(outfile, 'w')

    # fill read buffers
    for fh in inputs:
        readbufs.append(refill_read_buffer(fh))

    # Iterate over all the read buffers, create merged postings lists and
    # write those to output file.

    # While there is inputdata left
    while sum(map(len, filter(None, readbufs))) > 0:
        # Exercise: implement select_next_termid()
        next_termid = select_next_termid(readbufs)
        #print("Merging postings lists for termid=%d" % next_termid)
        postings = []

        # Assignment: Go over all readbufs and merge postings lists for selected termid
        for readbuf in readbufs:
            # Hint: to make the surrounding code work, you should remove any items
            # which you process from the read buffers.
            # You can do this using the following statement: `del readbuf[0]`
            # Make use of already implemented merge()
            ### TODO: assignment
            pass

        # Write merged postings list to output file
        write_postings_list(outf, PostingsList(termid=next_termid, postings=postings))

        # If necessary, refill read buffers, this code assumes that the
        # postings list merging removes processed elements from the read
        # buffers using e.g. del readbuf[0].
        # Additionally, this will replace the readbuf list for an input block
        # with None, if we processed all the contents of that file.
        for n,r in enumerate(readbufs):
            if r is not None and len(r) == 0:
                # print("Refilling read buffer for block", n+1)
                next = refill_read_buffer(inputs[n])
                if len(next) == 0:
                    print("> Finished processing input block", n+1)
                    readbufs[n] = None
                else:
                    readbufs[n] = next

    # Close input files
    for fh in inputs:
        fh.close()
    # Close output file
    outf.flush()
    outf.close()


In the following you find the function `blocked_sort_based_index` which calls your previous implemented functions and some other already provided helper functions. You don't have to change this code block.

In [None]:
token_gen = None
def token_generator(documents):
    '''
    This method uses the global variable `token_gen` to create a single
    instance of the generator iterator defined in _token_generator().
    This allows us to iterate through all the tokens in all the documents
    without having to carry a lot of information between calls to
    `parse_next_block()` (see below).

    The generator defined in _token_generator() returns (term, docid) tuples
    for all documents but produces only one tuple for a term per document.

    A generator with raise a StopIteration exception when there are no more
    results to be produced.
    '''
    global token_gen
    def _token_generator(documents):
        for doc in documents:
            doctokens = set()
            for t in tokenize_document(doc.path):
                if t in doctokens:
                    continue
                doctokens.add(t)
                yield (t, doc.id)
    if token_gen is None:
        token_gen = _token_generator(documents)
    return token_gen

BLOCKSIZE = 10000
def parse_next_block(documents):
    '''
    Produce termid-documentid tuples in batches of BLOCKSIZE.

    This uses the token_generator() method to create blocks of (termid, docid)
    tuples of fixed size.  See the docstring of token_generator() for more
    details on how the token tuple generation works internally.  This method
    also maps terms to term ids and stores that mapping in `termid_map`.

    Note that the last block produced by this method may contain fewer than
    BLOCKSIZE tuples.
    '''
    global termid_map, termid_counter
    block = []
    while len(block) < BLOCKSIZE:
        try:
            t, docid = next(token_generator(documents))
        except StopIteration:
            break
        if t not in termid_map:
            termid_map[t] = termid_counter
            termid_counter += 1
        tid = termid_map[t]
        block.append((tid, docid))
    return block

def write_postings_list(fh, p):
    '''
    Write a PostingsList() to file at fh.
    Format:  'termid:docid,docid,...,docid'
    '''
    # Custom format, for easier lazy reads
    fh.write("%d:%s\n" % (p.termid, ','.join(map(str, p.postings))))

def read_postings_list(fh):
    '''
    Read postings list from file at fh
    Expected format:  'termid:docid,docid,...,docid'
    '''
    line = fh.readline().strip()
    try:
        termid, postingstr = line.split(':')
        postings = list(map(int, postingstr.split(',')))
        return PostingsList(termid=int(termid), postings=postings)
    except:
        if line != '':
            # This is an actual parse error, empty line just signals EOF.
            print("Unable to parse line '%s' as postings list" % line)
        return None


def blocked_sort_based_index(documents, fullidx):
    '''
    Implement blocked sort based index as outlined in figure 4.2 of the book.
    '''
    n = 0
    blockfiles = []
    # We use this infinite loop to express a do-while loop
    while True:
        n += 1
        block = parse_next_block(documents)
        # We exit our do-while loop when we get a zero-length block from
        # parse_next_block().
        if len(block) == 0:
            break
        print("Indexing block", n)
        block = bsbi_invert(block)
        fname = "block%02d.txt" % n
        blockfiles.append(fname)
        print("Storing block", n, "in", fname)
        with open(os.path.join(OUTPUT_DIR, fname), 'w') as fn:
            for p in block:
                write_postings_list(fn, p)

    print("Merge blocks and store result in", fullidx)
    merge_blocks(blockfiles, fullidx)
    print("Store termid and documentid maps")
    with open(os.path.join(OUTPUT_DIR, 'termid_map.txt'), 'w') as fh:
        for term in termid_map.keys():
            fh.write("%s:%d\n" % (term, termid_map[term]))
    with open(os.path.join(OUTPUT_DIR, 'docid_map.txt'), 'w') as fh:
        for docid in docid_map.keys():
            fh.write("%d:%s\n" % (docid, docid_map[docid]))
    print("Finished")

Final function call

In [None]:
blocked_sort_based_index(documents, os.path.join(OUTPUT_DIR, 'index.txt'))

Now you should have a complete inverted index using term ids and document ids
in `index.txt`.  You should open [index.txt](./output/index.txt) and
[termid_map.txt](./output/termid_map.txt) and [docid_map.txt](./output/docid_map.txt) by
clicking on the links or on the files in Jupyter to see how such an index
might look stored on disk.

A real index would probably use a more sophisticated file-format, such as a
B-tree variant, which lends itself to looking up elements of the index more
easily than our format which is just a textual representation of the inverted
index, where we have one term id and its postings list on a line.

A few sanity checks, assuming that you have not changed the way the handout
code assigns document and term ids.  Note that from here on out, we will drop
the pretense of our index not fitting into memory for simplicty.

In [None]:
full_index = {}
with open(os.path.join(OUTPUT_DIR, 'index.txt')) as idx:
    while True:
        pl = read_postings_list(idx)
        if not pl:
            break
        full_index[pl.termid] = pl.postings

First some quick checks to ensure that the term ids match our expectations:

In [None]:
assert(termid_map['OF'] == 4005)
assert(termid_map['eastern'] == 2977)
assert(termid_map['heavnly'] == 10527)
assert(termid_map['conflict'] == 15670)
assert(termid_map['Brutus'] == 22919)
assert(termid_map['Calpurnia'] == 32684)

In [None]:
print(termid_map['OF'])
print(termid_map['eastern'])
print(termid_map['heavnly'])
print(termid_map['conflict'])
print(termid_map['Brutus'])
print(termid_map['Calpurnia'])

Now we do some spot checks to see if the terms from above have the correct
postings lists:

In [None]:
# term id 8: OF
assert(full_index[4005] ==
        [3, 6, 7, 8, 9, 10, 13, 14, 15, 16, 17, 18, 19, 20, 21, 24, 25, 26, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40])
# term id 5681: eastern
assert(full_index[2977] == [2, 8, 17, 24, 38])
# term id 10527: heavnly
assert(full_index[10527] == [6,41])
# term id 15670: conflict
assert(full_index[15670] == [10,12,19,26,30,35,36,39,44])
# term id 22919: Brutus
assert(full_index[22919] == [18,20,24,26,31,32,33,34,39])
# term id 32684: Calpurnia
assert(full_index[32684] == [34])

In [None]:
# term id 8: OF
print(full_index[4005])
# term id 5681: eastern
print(full_index[2977])
# term id 10527: heavnly
print(full_index[10527])
# term id 15670: conflict
print(full_index[15670])
# term id 22919: Brutus
print(full_index[22919])
# term id 32684: Calpurnia
print(full_index[32684])

As a final test of the disk-based index we provide our implementation of the
standard boolean query engine -- the one you had to implement in exercise 1 --
adapted to construct its internal in-memory index from the on-disk index
created by the blocked sort-based index construction algorithm.

You should be able to query your disk-based index using the syntax below.

In [None]:
from queryengine import InvertedIndex
ii = InvertedIndex(indexfile=os.path.join(OUTPUT_DIR, 'index.txt'),
                   termmap=os.path.join(OUTPUT_DIR, 'termid_map.txt'),
                   docmap=os.path.join(OUTPUT_DIR, 'docid_map.txt'))

# Expected: 34 -> ../../shared/corpus/the_tragedy_of_julius_caesar.txt
ii.execute_and_print("Brutus AND Calpurnia")