# Notes

This is a simplified proof of concept to demonstrate how we could use an analytics style schema for text indexing.

The indexing problem is similar, but not identical, to the current processing. This test indexes the full text dump of [English Wikipedia](https://dumps.wikimedia.org/other/cirrussearch/). This is the preprocessed dump file that is used for Wikipedia's own search, so avoids most of the pain of parsing the markup and removing non content information, at the cost of losing some semantic information (like paragraph boundaries). The text field is broken into sentences to simulate the frame workload we currently do.

The full Wikipedia dump is a good benchmark for our absolute largest dataset size - if we can handle this then we can handle 

## Schema

This schema is a subset of what we'd actually want - in particular this doesn't yet consider term frequency, term positions or multiple text fields. Adding these and the appropriate indexes to make them usable will add overhead to the insert.

The core table is positions - this has one row per term_id, document_id and frame_id - it represents that term_id occured in frame_id of document_id. 

The actual term representation is in the table vocabulary - one row per unique term occurence. Using the layer of indirection and representing terms by a unique term_id allows filtering terms in the vocabulary table before hitting the much larger positions table. Joins are fast because both tables are ordered by term_id, mapping directly to the SQLite loop join for term searching.

### Searching
With this layout primitive search operators can be mapped directly to SQL. Writing new style queries is straightforward and doesn't require a Python round trip as in the current layout. Note for example the any_n_search. It should be possible to push scoring to the SQL layer as well, avoiding loading the entire frame result set as in the current scheme.


## Dataset notes
After processing, the text index is a database of 35GB (documents and frames are not stored in this test, this is representative of the index size alone). 

Summary statistics:

- Number of documents: 5,281,362
- Number of frames: 191,172,996
- Total vocabulary size: 28,206,884
- Total positions rows: 1,613,349,907

The vocabulary size is the number of unique terms - only the basic stopword filter is used. Unique terms is particularly large because it includes names, hyphenated forms, misspellings and misformattings, numbers, dates, URL's, case variants (capitalized sentence starts, all caps etc). This has implications for our downstream analysis and modelling - we need to be prepared to deal with messy data.


## Performance

Total ingestion time for the full dump was ~14 hours. This is promising given the total lack of optimisation to achieve it, and the simple and straightforward schema that results.

For small datasets the bottleneck is currently tokenisation - this could be improved by combining parallelisation (documents tokenised in parallel) and improved tokeniser implementation (the document preprocessor approach). The approach here is an underestimate of the tokenisation time, as it doesn't incorporate paragraph tokenization and the consecutive sentence rule. 

For larger datasets the insert time dominates. Improving this requires:
1. Optimised schema/SQLite operations:
    - Write the minimum data necessary - just the necessary indexes
    - Ensure data locality on writes - (write in primary key order) - might require materialised views rather than covering indexes
    - Update to newer versions of SQLite (at a minimum because of improvements in [WAL mode](https://www.sqlite.org/wal.html) from 3.11)
    - Tuning the page size?
2. Python side optimisation - the insert is a tight loop that might be a candidate for an optimised implementation.
3. Write parallelisation through multiple segments and merging.



In [11]:
from caterpillar.processing.analysis import stopwords
from caterpillar.processing.analysis.analyse import Analyser
from caterpillar.processing.analysis.filter import OuterPunctuationFilter, StopFilter, PositionalLowercaseWordFilter, \
    BiGramFilter
from caterpillar.processing.analysis.filter import PossessiveContractionFilter
from caterpillar.processing.analysis.tokenize import SimpleWordTokenizer


class TestAnalyser(Analyser):
    _tokenizer = SimpleWordTokenizer(detect_compound_names=False)

    def __init__(self, stopword_list=None):
        super(TestAnalyser, self).__init__()
        if stopword_list is None:
            stopword_list = stopwords.ENGLISH_TEST

        self._filters = [
            OuterPunctuationFilter(leading_allow=['@', '#']),
            PossessiveContractionFilter(),
            StopFilter(stopword_list, minsize=stopwords.MIN_WORD_SIZE),
            PositionalLowercaseWordFilter(0),
        ]

    def get_tokenizer(self):
        return self._tokenizer

    def get_filters(self):
        return self._filters

In [26]:
from caterpillar.processing.analysis.analyse import DefaultAnalyser, Analyser
from caterpillar.processing.analysis.tokenize import SimpleWordTokenizer
import nltk
import apsw
import ujson as json
from time import time
import gzip
import math

sentence_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

analyser = TestAnalyser()

# Analyser: article_body
def analyse(document, stopword_filter=True, shingle=False, paragraph_frames=False):
    frame_data = []
    if paragraph_frames:
        frames = document.split('\n\n')
        for frame in frames:
            tokens = [t.strip() for t in frame.split()]
            frame_data.append((frame.strip(), set(tokens)))
        
    else:
        frames = sentence_tokenizer.tokenize(document)
        
        for frame in frames:
            if stopword_filter:
                stream = [t.value for t in analyser.analyse(frame) if not t.stopped]
            else:
                stream = frame.split() #[t.value for t in analyser.analyse(sentence)]

            if shingle:
                shingles = [' '.join(stream[i:i + 2]) for i in range(len(stream) - 1)]
                tokens = set(stream + shingles)
            else:
                tokens = set(stream)

            frame_data.append((frame, tokens))
    return document, frame_data


In [13]:
# Initialise the test database with the appropriate schema
# Test frequency and fields later.
schema = """
drop table if exists positions; 
drop table if exists documents; 
drop table if exists frames;
drop table if exists vocabulary;
drop table if exists temp_positions;

pragma journal_mode = WAL;
pragma page_size = 4096; 
pragma temp_store = MEMORY;
pragma cache_size = 100000;

create table positions (
    term_id int,
    document_id int,
    frame_id int,
    primary key(term_id, document_id, frame_id))
without rowid;

create table documents (
    document_id integer primary key,
    body text);

create table frames (
    frame_id integer primary key,
    body text);
    
create table vocabulary (
    term_id integer primary key,
    term text);

create index vocab_term_idx on vocabulary(term, term_id);
    
create temporary table temp_positions (
    term text,    
    document_id int, 
    frame_id int);
    --,primary key(term, document_id, frame_id);
"""

vocab_update = """
create index insert_positions on temp_positions(term, document_id, frame_id);
insert into vocabulary(term) 
select distinct term
from temp_positions
where not exists (select 1 from vocabulary v where v.term=temp_positions.term);
"""

# Change this to insert the entire positions stream in a single hit
positions_insert = """
insert into positions 
select term_id, document_id, frame_id
from temp_positions
inner join vocabulary
    on vocabulary.term = temp_positions.term
order by term_id;
drop index insert_positions
"""

test_db = apsw.Connection('wikipedia.db')
cursor = test_db.cursor()
cursor.execute(schema).fetchall()


[(u'wal',)]

In [14]:
# Use sentence or paragraph tokenisation as a test case.

def generate_positions(documents, max_documents, max_frames):
    frame_id = max_frames + 1
    for i, (_, frames) in enumerate(documents):
        for _, positions in frames:
            for term in positions:
                yield (term, max_documents + 1 + i, frame_id)
            frame_id += 1

def insert_documents(analysed_docs, db_cursor):
    try:
        db_cursor.execute('begin;')
        
        max_docs = cursor.execute('select max(document_id) from documents').fetchone()[0]
        max_docs = max_docs if max_docs is not None else 0
        
        max_frames = cursor.execute('select max(frame_id) from frames').fetchone()[0]
        max_frames = max_frames if max_frames is not None else 0
        
        positions = generate_positions(analysed_docs, max_docs, max_frames)
        frames = (frame for _, frames in analysed_docs for frame, _ in frames)
        numbered_frames = ((i + 1 + max_frames, 'not stored') for i, frame in enumerate(frames))
        docs = ((i + 1 + max_docs, 'not stored') for i, (doc, _) in enumerate(analysed_docs))
        
        # Pipeline the insertion.
        db_cursor.executemany('insert into temp_positions values (?, ?, ?);', positions)
        db_cursor.executemany('insert into documents values (?, ?)', docs)
        db_cursor.executemany('insert into frames values (?, ?)', numbered_frames)

        # Now insert new terms into the vocabulary
        db_cursor.execute(vocab_update)
        # Finally update the positions table
        db_cursor.execute(positions_insert)
    except Exception as e:
        db_cursor.execute('rollback;')
        print 'rolling back'
        raise e
    else:
        db_cursor.execute('delete from temp_positions;')
        db_cursor.execute('commit;')
    

In [15]:
wiki_lines = 10562730

def add_wikipedia(n_documents=1600, batch_size=200, stopword_filter=True, shingle=False):
    # Reset and vaccuum the database
    cursor.execute('delete from documents; delete from positions; delete from vocabulary; delete from frames;')
    times = [time()]
    last_time = times[0]
    with gzip.open('enwiki-20161107-cirrussearch-content.json.gz', 'rb') as documents:
        print 'starting'
        batch = []
        batch_docs = 0
        for i, line in enumerate(documents):
            if i == n_documents:
                break

            # Batch up the documents, add them all together.
            try:
                doc = json.loads(line.decode('utf-8'))['text']
                batch.append(analyse(doc, stopword_filter=stopword_filter, shingle=shingle, paragraph_frames=False))
                batch_docs += 1
            except KeyError:
                continue

            if batch_docs >= batch_size:
                print 'Writing after document {} ({:.2%})'.format(i, i/float(wiki_lines))
                times.append(time())
                insert_documents(batch, cursor)
                batch = []
                batch_docs = 0
                times.append(time())
                print 'done writing, processing batch took {:.1f} s, writing took {:.1f} s'.format(times[-1]-times[-3], times[-1]-times[-2])
                print 'cumulative time {:.1f} s'.format(times[-1] - times[0])
        else: # Make sure the final batch is written, regardless of size.
            insert_documents(batch, cursor)
            batch = []
            times.append(time())
            
    times.append(time())
    return times


In [16]:
times = add_wikipedia(None, batch_size=100000, stopword_filter=True, shingle=False)
print times[-1] - times[0]

starting
Writing after document 200001 (1.89%)
done writing, processing batch took 658.1 s, writing took 216.3 s
cumulative time 658.1 s
Writing after document 400001 (3.79%)
done writing, processing batch took 678.7 s, writing took 238.5 s
cumulative time 1336.8 s
Writing after document 600005 (5.68%)
done writing, processing batch took 649.4 s, writing took 241.5 s
cumulative time 1986.1 s
Writing after document 800005 (7.57%)
done writing, processing batch took 536.7 s, writing took 205.9 s
cumulative time 2522.8 s
Writing after document 1000005 (9.47%)
done writing, processing batch took 532.2 s, writing took 210.6 s
cumulative time 3055.0 s
Writing after document 1200005 (11.36%)
done writing, processing batch took 612.8 s, writing took 240.2 s
cumulative time 3667.8 s
Writing after document 1400005 (13.25%)
done writing, processing batch took 629.0 s, writing took 261.2 s
cumulative time 4296.9 s
Writing after document 1600005 (15.15%)
done writing, processing batch took 532.0 s,

In [23]:
# Report some statistics from whole index:
print 'Position rows: {}'.format(cursor.execute('select count(*) from positions;').fetchone()[0])
print 'Total vocabulary: {}'.format(cursor.execute('select count(*) from vocabulary;').fetchone()[0])
print 'Number of frames: {}'.format(cursor.execute('select count(*) from frames;').fetchone()[0])
print 'Number of documents: {}'.format(cursor.execute('select count(*) from documents;').fetchone()[0])

Position rows: 1613349907
Total vocabulary: 28206884
Number of frames: 191172996
Number of documents: 5281362


In [20]:
# Match any query primitive.
search_frame_query = """
select v.term, frame_id 
from positions p
inner join vocabulary v
on p.term_id = v.term_id
where v.term in ({})
"""
def search_frames(terms):
    variable_terms = ', '.join(['?']*len(terms))
    this_search = search_frame_query.format(variable_terms)
    return list(cursor.execute(this_search, terms))

search_document_query = """
select v.term, document_id, count(*)
from positions p
inner join vocabulary v
on p.term_id = v.term_id
where v.term in ({})
group by v.term, document_id;
"""

def search_documents(terms):
    variable_terms = ', '.join(['?']*len(terms))
    this_search = search_document_query.format(variable_terms)
    return list(cursor.execute(this_search, terms))


In [22]:
%timeit search_frames(['fish', 'dog', 'cat', 'potato'])

10 loops, best of 3: 113 ms per loop


In [18]:
# A simple and a more complex query
and_query = """
select frame_id 
from positions p
inner join vocabulary v
on p.term_id = v.term_id
where v.term = ?
"""

# Match a frame that contains at least n of the specified terms.
any_n_query = """
with any_terms as (
select v.term, frame_id 
from positions p
inner join vocabulary v
on p.term_id = v.term_id
where v.term in ({}))

select any_terms.term, any_terms.frame_id
from any_terms
inner join (
    select frame_id, count(term) as overlap
    from any_terms
    group by frame_id) t_counts
on t_counts.frame_id = any_terms.frame_id
    and overlap >= ?
"""

def and_search(terms):
    query = ' intersect '.join([and_query]*len(terms))
    return list(cursor.execute(query, terms))

def any_n_search(terms, n):
    variable_terms = ', '.join(['?']*len(terms))
    this_search = any_n_query.format(variable_terms)
    return list(cursor.execute(this_search, terms + [n]))

In [25]:
# Performance isn't too bad when hitting terms that don't return too many rows. 
# (At least when there's enough page cache...)
%timeit and_search(['apple', 'banana', 'pear', 'carrot'])
%timeit any_n_search(['apple', 'banana', 'pear', 'carrot'], 2)

# Hits three very common terms - not great performance.
%timeit any_n_search(['June', 'July', 'August'], 2)

10 loops, best of 3: 26.1 ms per loop
10 loops, best of 3: 85.9 ms per loop
1 loop, best of 3: 13 s per loop
