## 1.	Инвертированный индекс (1 балл)
### 1. Индексация
Реализуйте построение инвертированного индекса в памяти для коллекции из домашней работы номер 3. В каждом постинглисте также сохраните значение term-frequency.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from tqdm import tqdm

from typing import Dict, List, Tuple
ArticleName = str
Text = str
Term = str
CollectionData = None
RankingParams = None

In [None]:
!pip install fake-useragent

Collecting fake-useragent
  Downloading fake-useragent-0.1.11.tar.gz (13 kB)
Building wheels for collected packages: fake-useragent
  Building wheel for fake-useragent (setup.py) ... [?25l[?25hdone
  Created wheel for fake-useragent: filename=fake_useragent-0.1.11-py3-none-any.whl size=13502 sha256=be2fbfd5e5a997b2b44ecfd8bd3081d99a948614dfd32f6b3eb8e4623270f0b0
  Stored in directory: /root/.cache/pip/wheels/ed/f7/62/50ab6c9a0b5567267ab76a9daa9d06315704209b2c5d032031
Successfully built fake-useragent
Installing collected packages: fake-useragent
Successfully installed fake-useragent-0.1.11


In [None]:
import gzip
import os
import pandas as pd
import pickle
import re
import requests

from bs4 import BeautifulSoup as bs
from fake_useragent import UserAgent
from hashlib import md5
from tqdm.notebook import tqdm

In [None]:
directory = "/content/drive/MyDrive/wiki_"
pref = "https://simple.wikipedia.org/"

In [None]:
def hmd5(s):
    return md5(s.encode("utf-8")).hexdigest().lower()

In [None]:
def make_dirs(path):
    os.makedirs(os.path.dirname(path), exist_ok=True)

In [None]:
def normalize(parsed):
    parts = []
    for p in parsed.find_all("p"):
        parts.append(re.sub("\[\d*\]|\s+", ' ',p.text).strip())
    lis = set()
    for p in parsed.find_all("span", {"class", "mw-headline"}):
        ul = p.find_next("ul")
        if ul:
            for li in ul.find_all("li"):
                res = li.find("a", {"class":False})
                if res and res.text not in lis and "wiki" not in res.text and res.get("title") and "Special" not in res.get("title"):
                    parts.append(res.text)
                    lis.add(res.text)
    text = " ".join(parts)
    return text

In [None]:
def get_article_text(article_name: ArticleName) -> Text:
    article_hash = hmd5(article_name)
    dump_path = directory + "/" + article_hash[:2] + "/" + article_hash[2:4] + "/" + article_hash[4:]
    if not os.path.exists(dump_path):
        url = pref + "wiki/" + article
        response = requests.get(url, headers={'User-Agent': agent})
        if response.status_code != 404:
            while response.status_code != 200:
                time.sleep(0.1)
                response = requests.get(url, headers={'User-Agent': agent})
                
            make_dirs(dump_path)
            with gzip.open(dump_path, "wb") as f:
                f.write(response.text.encode("utf-8"))
            parsed = bs(response.text, 'html.parser')
    else:
        with gzip.open(dump_path, "rb") as f:
            parsed = bs(f.read().decode("utf-8"), "html.parser")
    return normalize(parsed)
    
for article_name in tqdm(["Software_Development_Kit", "Gangrene", "COVID-19_pandemic_in_Belarus", "Guitar_Hero:_Aerosmith"]):
    print(f'Article: {article_name}\n{get_article_text(article_name)}\n\n')

  0%|          | 0/4 [00:00<?, ?it/s]

Article: Software_Development_Kit
A software development kit (SDK or "devkit") is usually a set of development tools that allows a software developer to create applications for a certain software package, software framework, hardware platform, computer system, video game console, operating system, or similar platform. SDKs vary greatly between a simple application programming interface to hardware used to simulate a system.


Article: Gangrene
Gangrene is a serious medical condition that causes the decay and death of body tissue, usually in the extremities such as the fingers, hands, toes, and feet. The two main types of gangrene are dry gangrene and wet gangrene. A third less common type is a form of wet gangrene known as gas gangrene. A very rare type which affects the internal organs is known as internal gangrene. Dry gangrene is usually caused by a loss of blood supply to the affected area, such as may happen following an injury which damages the blood vessels to the affected area.

In [None]:
from multiprocessing import Pool

def load_docs(selected_docs_fn: ArticleName, threads: int = 4) -> Dict[ArticleName, Text]:    
    docs = {}
    pool = Pool(threads)
    tasks = []
    if os.path.exists("/content/drive/MyDrive/wiki_/docs"):
        with open("/content/drive/MyDrive/wiki_/docs", "rb") as f:
            docs = pickle.load(f)
        return docs
    tasks = []
    for line in tqdm(open(selected_docs_fn)):
        article_name = line.strip()
        tasks.append((article_name, pool.apply_async(get_article_text, (article_name, ))))
    for article_name, task in tqdm(tasks):
        docs[article_name] = task.get(10**6)
    return docs
    
docs = load_docs("./selected_docs.tsv", 32)
print(f'{len(docs)} docs loaded')

15190 docs loaded


In [None]:
import nltk

In [None]:
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
from collections import Counter, defaultdict
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer

In [None]:
def remove_stop_words(text):
    tokenizer = RegexpTokenizer(r'\w+|\d+')
    words = tokenizer.tokenize(text)
    ps = nltk.stem.PorterStemmer()
    filtered_words = [ps.stem(word) for word in words if word not in stopwords.words('english') and len(word) > 1]
    return filtered_words

In [None]:
def make_terms(text: Text) -> List[Term]:
    return remove_stop_words(text)

In [None]:
import numpy as np

In [None]:
DocId = int
TermFreq = int
RelevInfo = Counter
Posting = List[Tuple[DocId, RelevInfo]]

In [None]:
def invert_index(docs: Dict[ArticleName, Text]) -> CollectionData:
    term_frequency = defaultdict(lambda: Counter())
    doc_id = dict()
    id_doc = dict()
    docs_len = 0
    doc_len = dict()
    
    for idx, doc in tqdm(enumerate(docs)):
        doc_id[doc] = idx
        id_doc[idx] = doc
        doc_terms = Counter(make_terms(docs[doc]))
        docs_len += len(doc_terms)
        doc_len[idx] = len(doc_terms)
        for term in doc_terms:
            term_frequency[term][idx] = doc_terms[term]
    
    for term in term_frequency:
        term_frequency[term] = list(sorted(term_frequency[term].items(), key=lambda x: x[0]))

    average_doc_len = docs_len / len(docs)
    
    return {"term_frequency" : term_frequency,
            "doc_id" : doc_id,
            "id_doc" : id_doc,
            "doc_len" : doc_len,
            "average_doc_len" : average_doc_len}

collection_data = invert_index(docs)

0it [00:00, ?it/s]

In [None]:
index = dict(collection_data["term_frequency"])
del collection_data["term_frequency"]

Сохраните полученный индекс на диске в бинарном формате. Формат должен позволять читать отсортированные по термам постинглисты, по одному за раз. Размер сохраненного индекса в байтах должен быть порядка 8*(сумму длин всех постинг листов). 

Отдельно сохраните на диск дополнительные данные о коллекции, которые пригодятся для поиска, например названия статей или среднюю длину документа. Размер дополнительных данных, должен быть пропорционален количеству документов коллекции.

In [None]:
def dump_index(index: Dict[Term, Posting], filename: str) -> None:
    with open(filename, "wb") as f:
        for term in sorted(index.keys()):
            pickle.dump((term, index[term]), f)


def dump_collectiondata(data: CollectionData, filename: str) -> None:
    with open(filename, "wb") as f:
        pickle.dump(data, f)

dump_index(index, "index.inv")
print("Index file size:", os.path.getsize("index.inv"))

dump_collectiondata(collection_data, "index.data")
print("Collection data file size:", os.path.getsize("index.data"))

Index file size: 14922832
Collection data file size: 649739


### 1.2. Поиск
Для простоты реализации поиска, не требуется делать чтение постинглистов с диска по запросу - достаточно считать их с диска в память целиком. Также загрузите с диска дополнительные данные о коллекции.

In [None]:
def load_index(filename: str) -> Dict[Term, Posting]:
    index = dict()
    with open(filename, "rb") as f:
        while True:
            try:
                pl = pickle.load(f)
                index[pl[0]] = pl[1]
            except EOFError:
                break
        return index  

    
def load_collectiondata(filename: str) -> CollectionData:
    with open(filename, "rb") as f:
        return pickle.load(f) 

index = load_index("index.inv")
print("Number or terms in index:", len(index))
collection_data = load_collectiondata("index.data")

Number or terms in index: 92722


Реализуйте поиск документовна с ранжированием BM25 на основе инвертированного индекса в парадигме document-at-time, то есть через [слияние](https://en.wikipedia.org/wiki/Merge_algorithm) постинглистов. Функция поиска должна принимать число - ограничение на количество документов, возвращаемое поиском. Используемое количество дополнительной памяти должно быть пропорционально этому ограничению и никак не должно зависить от размера постинглистов или размера коллекции.
Результаты поиска должны быть аналогичные тем, что были в домашней работе номер 3. 

In [None]:
import heapq

In [None]:
def search_indexed(query: Text,
                   top_size: int,
                   index: Dict[Term, Posting],
                   collection_data: CollectionData,
                   ranking_params: RankingParams) -> List[Tuple[ArticleName, float]]:
    result = []

    doc_id = collection_data["doc_id"]
    id_doc = collection_data["id_doc"]
    doc_len = collection_data["doc_len"]
    average_doc_len = collection_data["average_doc_len"]
    
    k1 = ranking_params["k1"]
    k2 = ranking_params["k2"]
    b = ranking_params["b"]    

    f = Counter(make_terms(query))
    terms = f.keys()
    k = len(terms)
    pq = []
    terms = dict(zip(terms, [0] * k))
    cnt = 0
    for term in terms:
        if term in index:
            heapq.heappush(pq, (index[term][0][0], term))
            cnt += 1

    res = []
    while cnt > 0:
        idx = pq[0][0]
        value = 0
        K = k1 * ((1 - b) + b * doc_len[idx] / average_doc_len)

        while len(pq) > 0 and pq[0][0] == idx:
            _, term = heapq.heappop(pq)
            f_ij = index[term][terms[term]][1]
            value += np.log((len(doc_len) + 1) / len(index[term])) *\
                     ((k1 + 1) * f_ij / (K + f_ij)) *\
                     ((k2 + 1) * f[term] / (k2 + f[term]))
            
            terms[term] += 1
            if terms[term] < len(index[term]):
                heapq.heappush(pq, (index[term][terms[term]][0], term))
            else:
                cnt -= 1
        if len(res) < top_size or res[0][0] < value:
            heapq.heappush(res, (value, idx))
        if len(res) > top_size:
            heapq.heappop(res)
    
    res = [(id_doc[idx], value) for value, idx in res]
    res.sort(key=lambda x: x[1], reverse=True)
    return res
ranking_params = {"type": "BM25",
                  "k1": 1,
                  "k2": 1,
                  "b": 1}
for query in ["coronovirus in belarus",
              "who won junior eurovision in 2005",
              "science about full-text search",
             ]:
    result = search_indexed(query, 5, index, collection_data, ranking_params)[:5]
    print(f"[{query}]")
    for article_name, score in result:
        print(f"{score:7.2f}  {article_name}")
    print("\n")

[coronovirus in belarus]
  10.68  Time_in_Belarus
  10.57  COVID-19_pandemic_in_Belarus
  10.36  Daugava_River
   8.87  Bug_River
   8.58  Eurasian_Union


[who won junior eurovision in 2005]
  18.11  Junior_Eurovision_Song_Contest_2014
  17.82  Junior_Eurovision_Song_Contest_2015
  14.91  Junior_Eurovision_Song_Contest_2004
  14.70  Junior_Eurovision_Song_Contest_2019
  14.06  Katherine_Hansen


[science about full-text search]
  17.59  Information_retrieval
  12.71  Computer_vision
  12.57  Google_Search
  11.26  Binary_search
  11.24  The_Massacre_at_Paris




Сравните качество и скорость работы нового алгоритма поиска с предыдущим.

In [None]:
def load_queries(queries_fn: ArticleName) -> List[Tuple[Text, ArticleName]]:
    queries = []
    for line in open(queries_fn):
        query, answer = line.rstrip().split('\t', 1)
        queries.append((query, answer))
    return queries

queries = load_queries("./queries.tsv")
for query, answer in queries:
    assert answer in docs
    
print(f'{len(queries)} queries loaded')
for query, article_name in queries[:5]:
    print(f'{query} -> {article_name}')

200 queries loaded
animals that have shells and live in water -> Shell_(zoology)
how many different types of scorpions are there -> Scorpion
describe the structure of a scientific name for a species -> Binomial_nomenclature
what are the 3 types of plastids in plant cells -> Plastid
who named the cell and how did he come up with that name -> Cell_theory


In [None]:
def run(title,
        queries: List[Tuple[Text, ArticleName]],
        index: Dict[Term, Posting],
        collection_data: CollectionData,
        ranking_params: RankingParams) -> None:
    accuracy = 0.0
    accuracy10 = 0.0
    rr = 0.0
    processed = 0
    with tqdm(queries) as progress:
        for query, answer in progress:
            result = search_indexed(query, 10, index, collection_data, ranking_params)[:10]
            
            rank = None
            for position, (article_name, score) in enumerate(result):
                if article_name == answer:
                    rank = position + 1
                    break
                
            if rank is not None:
                accuracy += (rank == 1)
                accuracy10 += (rank <= 10)
                rr += 1.0 / rank
                
            processed += 1
            progress.set_description(f'Acc: {accuracy/processed:0.2f}, Acc10: {accuracy10/processed:0.2f}, RR: {rr/processed:0.2f}')
    print(f'{title}\n  Accuracy: {accuracy/processed:0.2f}\n  Accuracy10: {accuracy10/processed:0.2f}\n  RR: {rr/processed:0.2f}')
    
ranking_params = {"type": "BM25",
          "b": 0.871875,
          "k1": 8.0,
          "k2": 0.5}
run("JustRun", queries, index, collection_data, ranking_params)

  0%|          | 0/200 [00:00<?, ?it/s]

JustRun
  Accuracy: 0.30
  Accuracy10: 0.56
  RR: 0.38


Реализуйте static pruning до 50 элементов для каждого постинглиста. Сравните качество и скорость работы нового индекса с предыдущим.

In [None]:
def search_pruned(query: Text,
                   top_size: int,
                   pruned_index: Dict[Term, Posting],
                   index: Dict[Term, Posting],
                   collection_data: CollectionData,
                   ranking_params: RankingParams) -> List[Tuple[ArticleName, float]]:
    result = []

    doc_id = collection_data["doc_id"]
    id_doc = collection_data["id_doc"]
    doc_len = collection_data["doc_len"]
    average_doc_len = collection_data["average_doc_len"]
    
    k1 = ranking_params["k1"]
    k2 = ranking_params["k2"]
    b = ranking_params["b"]    

    f = Counter(make_terms(query))
    terms = f.keys()
    k = len(terms)
    pq = []
    terms = dict(zip(terms, [0] * k))
    cnt = 0
    for term in terms:
        if term in pruned_index:
            heapq.heappush(pq, (pruned_index[term][0][0], term))
            cnt += 1

    res = []
    while cnt > 0:
        idx = pq[0][0]
        value = 0
        K = k1 * ((1 - b) + b * doc_len[idx] / average_doc_len)

        while len(pq) > 0 and pq[0][0] == idx:
            _, term = heapq.heappop(pq)
            f_ij = pruned_index[term][terms[term]][1]
            value += np.log((len(doc_len) + 1) / len(index[term])) *\
                     ((k1 + 1) * f_ij / (K + f_ij)) *\
                     ((k2 + 1) * f[term] / (k2 + f[term]))
            
            terms[term] += 1
            if terms[term] < len(pruned_index[term]):
                heapq.heappush(pq, (pruned_index[term][terms[term]][0], term))
            else:
                cnt -= 1
        if len(res) < top_size or res[0][0] < value:
            heapq.heappush(res, (value, idx))
        if len(res) > top_size:
            heapq.heappop(res)
    
    res = [(id_doc[idx], value) for value, idx in res]
    res.sort(key=lambda x: x[1], reverse=True)
    return res

In [None]:
def run_pruned(title,
        queries: List[Tuple[Text, ArticleName]],
        pruned_index: Dict[Term, Posting],
        index: Dict[Term, Posting],
        collection_data: CollectionData,
        ranking_params: RankingParams) -> None:
    accuracy = 0.0
    accuracy10 = 0.0
    rr = 0.0
    processed = 0
    with tqdm(queries) as progress:
        for query, answer in progress:
            result = search_pruned(query, 10, pruned_index, index, collection_data, ranking_params)[:10]
            
            rank = None
            for position, (article_name, score) in enumerate(result):
                if article_name == answer:
                    rank = position + 1
                    break
                
            if rank is not None:
                accuracy += (rank == 1)
                accuracy10 += (rank <= 10)
                rr += 1.0 / rank
                
            processed += 1
            progress.set_description(f'Acc: {accuracy/processed:0.2f}, Acc10: {accuracy10/processed:0.2f}, RR: {rr/processed:0.2f}')
    print(f'{title}\n  Accuracy: {accuracy/processed:0.2f}\n  Accuracy10: {accuracy10/processed:0.2f}\n  RR: {rr/processed:0.2f}')

In [None]:
def prune(index: Dict[Term, Posting], top_size: int = 50) -> Dict[Term, Posting]:
    pruned_index = {term : 
                    sorted(sorted(index[term],
                           key=lambda x: x[1],
                           reverse=True)[:top_size],
                           key=lambda x: x[0])
                    for term in index}
    return pruned_index

pruned_index = prune(index, 50)
for term, posting in pruned_index.items():
    prev_doc_id = -1
    for doc_id, freq in posting:
        assert doc_id > prev_doc_id
        prev_doc_id = doc_id

In [None]:
ranking_params = {"type": "BM25",
          "b": 0.871875,
          "k1": 8.0,
          "k2": 0.5}
for query in ["coronovirus in belarus",
              "who won junior eurovision in 2005",
              "science about full-text search",
             ]:
    result = search_pruned(query, 5, pruned_index, index, collection_data, ranking_params)[:5]
    print(f"[{query}]")
    for article_name, score in result:
        print(f"{score:7.2f}  {article_name}")
    print("\n")
run_pruned("JustRun", queries, pruned_index, index, collection_data, ranking_params)

[coronovirus in belarus]
  29.24  Time_in_Belarus
  28.49  COVID-19_pandemic_in_Belarus
  24.22  Daugava_River
  13.24  Eurasian_Union
  12.99  Byelorussian_Soviet_Socialist_Republic


[who won junior eurovision in 2005]
  29.84  Eurovision:_Europe_Shine_a_Light
  26.72  List_of_ice_hockey_leagues
  26.00  Family_Four
  24.76  Junior_Eurovision_Song_Contest_2015
  24.65  Eurovision_Song_Contest_2011


[science about full-text search]
  29.91  Google_Search
  27.66  Warsaw_Uprising
  25.23  Záparo_numerals
  24.67  Binary_search
  22.22  Bible_version_debate




  0%|          | 0/200 [00:00<?, ?it/s]

JustRun
  Accuracy: 0.28
  Accuracy10: 0.49
  RR: 0.34


## 2.	Индексное слияние (1 балл)

Постройте и сохраните на диске пять индексов, каждый из которых содержит пятую часть документов коллекции.

In [None]:
article_names = list(docs.keys())
index_part_filenames = []
data_part_filenames = []

n_parts = 5
n_articles = len(article_names)
n_docs_per_part = (n_articles + n_parts - 1) // n_parts

for part_i in range(n_parts):
    docs_part = {}
    for article in article_names[part_i*n_docs_per_part: (part_i + 1)*n_docs_per_part]:
        docs_part[article] = docs[article]
    # index_part, _ = invert_index(docs_part)
    collection_data_part = invert_index(docs_part)
    index_part = dict(collection_data_part["term_frequency"])
    del collection_data_part["term_frequency"]
    index_part_filename = f"index_part{part_i:02d}.inv"
    data_part_filename = f"data_part{part_i:02d}.data"
    dump_index(index_part, index_part_filename)
    dump_collectiondata(collection_data, data_part_filename)
    index_part_filenames.append(index_part_filename)
    data_part_filenames.append(data_part_filename)
    print("Index file part size:", os.path.getsize(index_part_filename))
    print("Collection data file size:", os.path.getsize(data_part_filename))    

0it [00:00, ?it/s]

Index file part size: 3166258
Collection data file size: 649739


0it [00:00, ?it/s]

Index file part size: 3184365
Collection data file size: 649739


0it [00:00, ?it/s]

Index file part size: 3218759
Collection data file size: 649739


0it [00:00, ?it/s]

Index file part size: 3074857
Collection data file size: 649739


0it [00:00, ?it/s]

Index file part size: 3138017
Collection data file size: 649739


Реализуйте алгоритм слияния индексов во внешней памяти.  То есть нельзя подгружать индекс целиком в память, требуется считывать и хранить в памяти не более одного постинг листа за раз из каждого из пяти индексов.

Итоговый файл индекса должен байт в байт совпадать с аналогичным построенным в пункте 1.1.

In [None]:
def get_next(file_input):
    try:
        pl = pickle.load(file_input)
        return pl
    except EOFError:
        return None

In [None]:
def external_memory_index_merge(input_files: List[str], output_filename: str) -> None:
    files = []
    for input_file in input_files:
        files.append(open(input_file, "rb"))
    with open(output_filename, "wb") as f:
        pls = []
        for i, fname in enumerate(files):
            pl = get_next(fname)
            if pl:
                heapq.heappush(pls, (pl[0], i, pl[1]))
        
        merged_pl = []
        while len(pls) > 0:
            pl = pls[0]
            heapq.heappop(pls)
            i = pl[1]
            merged_pl.extend(map(lambda x: (x[0] + n_docs_per_part * i, x[1]), pl[2]))
            if len(pls) > 0 and pls[0][0] != pl[0] or len(pls) == 0:
                pickle.dump((pl[0], merged_pl), f)
                merged_pl.clear()

            pl = get_next(files[i])
            if pl:
                heapq.heappush(pls, (pl[0], i, pl[1]))
    
    for f in files:
        f.close()

In [None]:
def read_file_content(filename):
    with open(filename, "rb") as inp:
        return inp.read()
    
external_memory_index_merge(index_part_filenames, "index_merged.inv")
if read_file_content("index.inv") == read_file_content("index_merged.inv"):
    print('Congrats!')
else:
    print('Merged index is different from original one :(')

Congrats!


## 3. Сжатие индекса (1 балл)
Реализуйте кодирование чисел алгоритмом VarInt.

In [None]:
import struct 
import io


class baseline_coder:
    def encode(output_stream, posting):
        for doc_id, freq in posting:
            output_stream.write(struct.pack('II', doc_id, freq))
            
    def decode(input_stream):
        res = []
        while True:
            data = input_stream.read(struct.calcsize('II'))
            if len(data) == 0:
                break
            res.append(struct.unpack('II', data))
        return res

In [None]:
class varint_coder:
    def encode(output_stream, posting):
        for doc_id, tf in posting:
            for num in (doc_id, tf):
                while True:
                    if num >= 128:
                        output_stream.write(struct.pack('I', (num & 0x7F) | 0x80))
                        num >>= 7
                    else:
                        output_stream.write(struct.pack('I', num & 0x7F))
                        break

    def decode(input_stream):
        res = []
        val = 0
        deg = 0
        while True:
            data = input_stream.read(struct.calcsize('I'))
            if len(data) == 0:
                break
            b = struct.unpack('B', data)[0]
            val += (b & 0x7F) << (deg * 7)
            if not (b & 0x80):
                res.append(val)
                val = 0
                deg = -1
            deg += 1
        posting = [(res[i], res[i + 1]) for i in range(0, len(res), 2)]
        return posting
        

output = io.BytesIO()
varint_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = varint_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b'\x00\x01\x01\x02\x02\x03\x03\xac\x02\x04\xa0\x9c\x01'
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


In [None]:
!pip install Brotli

Collecting Brotli
  Downloading Brotli-1.0.9-cp37-cp37m-manylinux1_x86_64.whl (357 kB)
[?25l[K     |█                               | 10 kB 16.8 MB/s eta 0:00:01[K     |█▉                              | 20 kB 12.2 MB/s eta 0:00:01[K     |██▊                             | 30 kB 9.3 MB/s eta 0:00:01[K     |███▊                            | 40 kB 8.0 MB/s eta 0:00:01[K     |████▋                           | 51 kB 4.1 MB/s eta 0:00:01[K     |█████▌                          | 61 kB 4.3 MB/s eta 0:00:01[K     |██████▍                         | 71 kB 4.3 MB/s eta 0:00:01[K     |███████▍                        | 81 kB 4.8 MB/s eta 0:00:01[K     |████████▎                       | 92 kB 5.0 MB/s eta 0:00:01[K     |█████████▏                      | 102 kB 4.0 MB/s eta 0:00:01[K     |██████████                      | 112 kB 4.0 MB/s eta 0:00:01[K     |███████████                     | 122 kB 4.0 MB/s eta 0:00:01[K     |████████████                    | 133 kB 4.0 MB/s et

In [None]:
import gzip
import brotli

In [None]:
class gzip_coder:
    def encode(output_stream, posting):
        data = bytearray()
        for doc_id, tf in posting:
            data.extend(struct.pack('II', doc_id, tf))
        output_stream.write(gzip.compress(data))


    def decode(input_stream):
        data = gzip.decompress(input_stream.read())
        res = []
        step = struct.calcsize('II')
        prev = -1
        for i in range(0, len(data), step):
            doc_id, tf = struct.unpack('II', data[i: i + step])
            res.append((doc_id, tf))
        return res
        

output = io.BytesIO()
gzip_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = gzip_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b'\x1f\x8b\x08\x00\xa4Y\x86a\x02\xffc````\x84b&(f\x86b\x1d\xa0 \x0b\x90V\xf0c`\x00\x004k\xbeS(\x00\x00\x00'
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


In [None]:
class brotli_coder:
    def encode(output_stream, posting):
        data = bytearray()
        for doc_id, tf in posting:
            data.extend(struct.pack('II', doc_id, tf))
        output_stream.write(brotli.compress(data))


    def decode(input_stream):
        data = brotli.decompress(input_stream.read())
        res = []
        step = struct.calcsize('II')
        prev = -1
        for i in range(0, len(data), step):
            doc_id, tf = struct.unpack('II', data[i: i + step])
            res.append((doc_id, tf))
        return res
        

output = io.BytesIO()
brotli_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = brotli_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b"\x1b'\x00\xf8\x87\xe8d\xf7H\xc1\xba\x94u\x03-\xc0 \x8a\x00@\x08E\xe7\xc2x\x1f"
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


In [None]:
class delta_gzip_coder:
    def encode(output_stream, posting):
        data = bytearray()
        prev = -1
        for doc_id, tf in posting:
            data.extend(struct.pack('II', doc_id - prev - 1, tf))
            prev = doc_id
        output_stream.write(gzip.compress(data))


    def decode(input_stream):
        data = gzip.decompress(input_stream.read())
        res = []
        step = struct.calcsize('II')
        prev = -1
        for i in range(0, len(data), step):
            doc_id, tf = struct.unpack('II', data[i: i + step])
            prev = prev + doc_id + 1
            res.append((prev, tf))
        return res
        

output = io.BytesIO()
delta_gzip_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = delta_gzip_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b'\x1f\x8b\x08\x00\xfbY\x86a\x02\xffc````d\x80\x00&(\xcd\x0c\xa5u\xa0\x12\n~\x0c\x0c\x00\x99\x18\xdd\xd6(\x00\x00\x00'
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


In [None]:
class delta_brotli_coder:
    def encode(output_stream, posting):
        data = bytearray()
        prev = -1
        for doc_id, tf in posting:
            data.extend(struct.pack('II', doc_id - prev - 1, tf))
            prev = doc_id
        output_stream.write(brotli.compress(data))


    def decode(input_stream):
        data = brotli.decompress(input_stream.read())
        res = []
        step = struct.calcsize('II')
        prev = -1
        for i in range(0, len(data), step):
            doc_id, tf = struct.unpack('II', data[i: i + step])
            prev = prev + doc_id + 1
            res.append((prev, tf))
        return res
        

output = io.BytesIO()
delta_brotli_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = delta_brotli_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b"\x1b'\x00\xf8\x8f\xd4cM\x98\\\t\xd9d\x07Rq\xddp*Q:\n\x14\x0e\x02(`\x80w\x00"
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


In [None]:
class delta_varint_coder:
    def encode(output_stream, posting):
        data = bytearray()
        prev = -1
        for doc_id, tf in posting:
            for num in (doc_id - prev - 1, tf):
                while True:
                    if num >= 128:
                        output_stream.write(struct.pack('I', (num & 0x7F) | 0x80))
                        num >>= 7
                    else:
                        output_stream.write(struct.pack('I', num & 0x7F))
                        break
            prev = doc_id

    def decode(input_stream):
        res = []
        val = 0
        deg = 0
        cnt = 0
        prev = -1
        while True:
            data = input_stream.read(struct.calcsize('I'))
            if len(data) == 0:
                break
            b = struct.unpack('B', data)[0]
            val += (b & 0x7F) << (deg * 7)
            if not (b & 0x80):
                if cnt % 2 == 0:
                    prev = val + prev + 1
                    res.append(prev)
                else:
                    res.append(val)
                val = 0
                deg = -1
                cnt += 1
            deg += 1
        posting = [(res[i], res[i + 1]) for i in range(0, len(res), 2)]
        return posting
        

output = io.BytesIO()
delta_varint_coder.encode(output, list(enumerate([1, 2, 3, 300, 20000])))
print(output.getvalue())

posting = delta_varint_coder.decode(io.BytesIO(output.getvalue()))
print(posting)
assert posting == list(enumerate([1, 2, 3, 300, 20000]))

b'\x00\x01\x00\x02\x00\x03\x00\xac\x02\x00\xa0\x9c\x01'
[(0, 1), (1, 2), (2, 3), (3, 300), (4, 20000)]


Сравните эффективность разных вариантов кодирования постинглистов:
 - Базовый вариант (4 байта на число)
 - Какой-нибудь алгоритм сжатия общего назначения (lz4/zstd/brotli/gzip)
 - VarInt
 - Delta-кодирование + Какой-нибудь алгоритм сжатия общего назначения 
 - Delta-кодирование + VarInt

In [None]:
def test_encoded_size(coder, index):
    total_size = 0
    for term, posting in tqdm(index.items()):
        output = io.BytesIO()
        coder.encode(output, posting)
        data = output.getvalue()
        total_size += len(data)
        decoded_posting = coder.decode(io.BytesIO(data))
        assert decoded_posting == posting, f"{decoded_posting} != {posting}"
    print(f"{coder.__name__}: {total_size/1024/1024} MB")    
    
test_encoded_size(baseline_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

baseline_coder: 10.707420349121094 MB


In [None]:
test_encoded_size(gzip_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

gzip_coder: 6.027420997619629 MB


In [None]:
test_encoded_size(brotli_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

brotli_coder: 4.927611351013184 MB


In [None]:
test_encoded_size(varint_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

varint_coder: 4.000444412231445 MB


In [None]:
test_encoded_size(delta_gzip_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

delta_gzip_coder: 5.205864906311035 MB


In [None]:
test_encoded_size(delta_brotli_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

delta_brotli_coder: 3.676483154296875 MB


In [None]:
test_encoded_size(delta_varint_coder, index)

  0%|          | 0/92722 [00:00<?, ?it/s]

delta_varint_coder: 3.058574676513672 MB
