### 1. Индексация
Реализуйте построение инвертированного индекса в памяти для коллекции из домашней работы номер 3. В каждом постинглисте также сохраните значение term-frequency.

In [28]:
from tqdm import tqdm

from typing import Dict, List, Tuple

ArticleName = str
Text = str
Term = str
CollectionData = Dict[str, Dict[str,  float]]
RankingParams = {}

PREFIX = '/wiki/'
AVERAGE_KEY = 'avg_len'
ID_TO_TITLE_KEY = 'id2title'
DOC_TO_LEN_KEY = 'doc2len'
POSTINGS_LEN_KEY = 'postings_len'

DocId = int
TermFreq = int

RelevInfo = float

Posting = List[Tuple[DocId, RelevInfo]]

In [2]:
import pickle
import gzip
import html2text

url_path_map_value = {}

with open('url_path_map.p', 'rb') as f:
    url_path_map_value = pickle.load(f)

In [3]:
from bs4 import BeautifulSoup
import re 
import itertools  
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')

def html2text(html):
    soup = BeautifulSoup(html)
    paras = []
    for paragraph in soup.find_all('p'):
        paras.append(str(paragraph.text))
    heads = []
    for head in soup.find_all('span', attrs={'mw-headline'}):
        heads.append(str(head.text))
    text = [val for pair in itertools.zip_longest(paras, heads, fillvalue =' ' ) for val in pair]
    text = ' '.join(text)
    text = re.sub(r"\[.*?\]+", '', text)

    text = text.replace('\n', '')[:-11]
    return text


def get_article_text(article_name: ArticleName) -> Text:
    text = ""
    key = PREFIX + article_name
    if key in url_path_map_value:
        path = url_path_map_value[key]
        if path != '':
            with gzip.open(path, 'rb') as f:
                html = gzip.decompress(f.read()).decode('utf-8')
                title = article_name.replace('_', ' ')
                text = f'{title} {html2text(html)}'
    return text

import string

def make_terms(text: Text) -> List[Term]:
    words = text.lower().translate(str.maketrans('', '', string.punctuation)).split()
    return [t for t in words if t not in stopwords.words('english')]

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/danielmuraveyko/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [4]:
#with multi-processes: AttributeError: Can't get attribute 'get_article_text' on <module '__main__' (built-in)> !!!!

from multiprocessing import Pool

def load_docs(selected_docs_fn: ArticleName, threads: int = 4) -> Dict[ArticleName, Text]:    
    docs = {}
    for line in tqdm(open(selected_docs_fn)):
        article_name = line.strip()
        docs[article_name] = get_article_text(article_name)
    return docs
    
docs = load_docs("./selected_docs.tsv", 32)
print(f'{len(docs)} docs loaded')

15229it [09:47, 25.93it/s]

15229 docs loaded





In [5]:
from collections import Counter

bags_of_words = {k:make_terms(v) for k, v in docs.items()}

def invert_index(docs: Dict[ArticleName, Text]) -> Tuple[Dict[Term, Posting], Dict[ArticleName, DocId]]:
    
    num_of_words = {k:dict(Counter(v)) for k, v in bags_of_words.items()}
    index = {}
    
    article2doc_id = {title:i for i, (title, body) in enumerate(docs.items())}

    for title, tf in num_of_words.items():
        article_id = article2doc_id[title]
        for word, val in tf.items():
            d = index.get(word, [])
            d.append((article_id, val))
            index[word] = d
    
    return index, article2doc_id
    
    

index, article_to_doc_id = invert_index(docs)

In [6]:
doc_id2article = {v:k for k, v in article_to_doc_id.items()}

In [7]:
sum([len(posting) for _, posting in index.items()]) * 8

9972248

Сохраните полученный индекс на диске в бинарном формате. Формат должен позволять читать отсортированные по термам постинглисты, по одному за раз. Размер сохраненного индекса в байтах должен быть порядка 8*(сумму длин всех постинг листов). 

Отдельно сохраните на диск дополнительные данные о коллекции, которые пригодятся для поиска, например названия статей или среднюю длину документа. Размер дополнительных данных, должен быть пропорционален количеству документов коллекции.

In [29]:
import struct
import os
import pickle


def dump_index(index: Dict[Term, Posting], filename: str) -> None:
    with open(filename, 'wb') as out_file:
        
        out_file.write(struct.pack("i", len(index)))
        for term, posting in index.items():
            bytes_term = term.encode()
            p_len = len(posting)
            doc_ids = [p[0] for p in posting]
            tfs = [p[1] for p in posting]

            out_file.write(struct.pack("i", len(bytes_term)))
            out_file.write(struct.pack(f"{len(bytes_term)}s", bytes_term))
            out_file.write(struct.pack("i", p_len))
            out_file.write(struct.pack(f"{p_len}i", *doc_ids))
            out_file.write(struct.pack(f"{p_len}i", *tfs))



def dump_collectiondata(data: CollectionData, filename: str) -> None:
    with open(filename, 'wb') as f:
        pickle.dump(data, f)
    
dump_index(index, "index.inv")
print("Index file size:", os.path.getsize("index.inv"))

docs_len = {article_to_doc_id[title]:len(body) for title, body in bags_of_words.items()}
bodies = docs_len.values()
average_len = sum(bodies) / len(bodies)
postings_len = {k:len(v) for k, v in index.items()}

collection_data = {
    AVERAGE_KEY:average_len,
    ID_TO_TITLE_KEY:doc_id2article,
    DOC_TO_LEN_KEY:docs_len,
    POSTINGS_LEN_KEY:postings_len
}

dump_collectiondata(collection_data, "index.data")
print("Collection data file size:", os.path.getsize("index.data"))

Index file size: 11980370
Collection data file size: 2062692


### 2. Поиск

Для простоты реализации поиска, не требуется делать чтение постинглистов с диска по запросу - достаточно считать их с диска в память целиком. Также загрузите с диска дополнительные данные о коллекции.

In [11]:
def load_index(filename: str) -> Dict[Term, Posting]:
    index = {}
    
    with open(filename, "rb") as in_file:
        size = struct.unpack("i", in_file.read(4))[0]
        for _ in range(size):
            term_len = struct.unpack("i", in_file.read(4))[0]
            term_b = in_file.read(term_len)
            term = struct.unpack(f"{term_len}s", term_b)[0].decode()

            bytes_arr_len = struct.unpack("i", in_file.read(4))[0]
            doc_ids_b = in_file.read(bytes_arr_len * 4)
            tfs_b = in_file.read(bytes_arr_len * 4)
            doc_ids = struct.unpack(f"{bytes_arr_len}i", doc_ids_b)
            tfs = struct.unpack(f"{bytes_arr_len}i", tfs_b)
            index[term] = list(zip(doc_ids, tfs))

    return index        
        

def load_collectiondata(filename: str) -> CollectionData:
    data_new = {}
    with open(filename, 'rb') as f:
        data_new = pickle.load(f)
    return data_new


index0 = load_index("index.inv")
print("Number or terms in index:", len(index))
collection_data = load_collectiondata("index.data")

Number or terms in index: 124173


In [12]:
assert index0 == index

In [406]:
def load_queries(queries_fn: ArticleName) -> List[Tuple[Text, ArticleName]]:
    queries = []
    for line in open(queries_fn):
        query, answer = line.rstrip().split('\t', 1)
        queries.append((query, answer))
    return queries

queries = load_queries("./queries.tsv")
for query, answer in queries:
    assert answer in docs
    
print(f'{len(queries)} queries loaded')
for query, article_name in queries[:5]:
    print(f'{query} -> {article_name}')
    
def run(title, search, queries: List[Tuple[Text, ArticleName]], index: Dict[Term, Posting], collection_data: CollectionData, ranking_params: RankingParams) -> None:
    accuracy = 0.0
    accuracy10 = 0.0
    rr = 0.0
    processed = 0
    with tqdm(queries) as progress:
        for query, answer in progress:
            result = search(query, 10, index, collection_data, ranking_params)
            
            rank = None
            for position, (article_name, score) in enumerate(result):
                if article_name == answer:
                    rank = position + 1
                    break
                
            if rank is not None:
                accuracy += (rank == 1)
                accuracy10 += (rank <= 10)
                rr += 1.0 / rank
                
            processed += 1
            progress.set_description(f'Acc: {accuracy/processed:0.2f}, Acc10: {accuracy10/processed:0.2f}, RR: {rr/processed:0.2f}')
    print(f'{title}\n  Accuracy: {accuracy/processed:0.2f}\n  Accuracy10: {accuracy10/processed:0.2f}\n  RR: {rr/processed:0.2f}')
    


def demo_search(index, search):
    ranking_params = {'b': 0.5, 'k1': 4, 'k2': 0}

    for query in ["coronovirus in belarus",
                  "who won junior eurovision in 2005",
                  "science about full-text search",
                 ]:
        result = search(query, 5, index, collection_data, ranking_params)[:5]
        print(f"[{query}]")
        for article_name, score in result:
            print(f"{score:7.2f}  {article_name}")
        print("\n")

200 queries loaded
animals that have shells and live in water -> Shell_(zoology)
how many different types of scorpions are there -> Scorpion
describe the structure of a scientific name for a species -> Binomial_nomenclature
what are the 3 types of plastids in plant cells -> Plastid
who named the cell and how did he come up with that name -> Cell_theory


In [411]:
import math

def score_BM25(N, average_len, n, f, qf, dl, b, k1, k2):
    K = compute_K(k1, b, dl, average_len)
    first = math.log((N - n + 0.5) / (n + 0.5))
    second = ((k1 + 1) * f) / (K + f)
    third = ((k2+1) * qf) / (k2 + qf)
    return first * second * third


def compute_K(k1, b, dl, avdl):
    return k1 * ((1 - b) + b * dl / avdl)

def search_indexed(query: Text, top_size: int, index: Dict[Term, Posting], collection_data: CollectionData, ranking_params: RankingParams) -> List[Tuple[ArticleName, float]]:
    terms = make_terms(query)
    print(terms)

    q = dict(Counter(terms))
    c = Counter({})


    b = ranking_params.get('b', 1)
    k1 = ranking_params.get('k1', 1)
    k2 = ranking_params.get('k2', 1)

    lens = collection_data.get(DOC_TO_LEN_KEY, {})
    average_len = collection_data.get(AVERAGE_KEY, 1)
    doc_id2article = collection_data.get(ID_TO_TITLE_KEY, {})
    postings_len = collection_data.get(POSTINGS_LEN_KEY, {})
    
    for term in terms:
        posting = index.get(term, [])
        for doc_id, freq in posting:
            
                
            score = score_BM25(N=len(index), average_len=average_len, n=postings_len.get(term, 1),
                               f=freq, qf=q[term],
                               dl=lens.get(doc_id, 1),
                               b=b, k1=k1, k2=k2
            )
            if doc_id in c:
                c[doc_id] += score
            else:
                c[doc_id] = score
    res = sorted([(doc_id2article.get(doc_id, ""), rank) for doc_id, rank in c.items()], key=lambda x: -x[1])
    return res[:top_size]

In [412]:
demo_search(index0, search_indexed)

['coronovirus', 'belarus']
[coronovirus in belarus]
  25.27  COVID-19_pandemic_in_Belarus
  17.68  Daugava_River
  12.16  Bug_River
   9.94  Jagiellon_dynasty
   9.68  Byelorussian_Soviet_Socialist_Republic


['junior', 'eurovision', '2005']
[who won junior eurovision in 2005]
  43.55  Junior_Eurovision_Song_Contest_2019
  34.41  Junior_Eurovision_Song_Contest_2004
  34.21  Junior_Eurovision_Song_Contest_2014
  34.01  Junior_Eurovision_Song_Contest_2015
  30.98  List_of_ice_hockey_leagues


['science', 'fulltext', 'search']
[science about full-text search]
  28.45  Information_retrieval
  26.80  Google_Search
  22.92  Popular_science
  22.74  Citizen_science
  21.99  Philosophy_of_science




Реализуйте поиск документов с ранжированием BM25 на основе инвертированного индекса в парадигме document-at-time, то есть через [слияние](https://en.wikipedia.org/wiki/Merge_algorithm) постинглистов. Функция поиска должна принимать число - ограничение на количество документов, возвращаемое поиском. Используемое количество дополнительной памяти должно быть пропорционально этому ограничению и никак не должно зависить от размера постинглистов или размера коллекции.
Результаты поиска должны быть аналогичные тем, что были в домашней работе номер 3. 

In [409]:
from heapq import heapify, heappush, heappushpop, heappop
   
def search_indexed_heaps(query: Text, top_size: int, index: Dict[Term, Posting], collection_data: CollectionData, ranking_params: RankingParams) -> List[Tuple[ArticleName, float]]:
    terms = make_terms(query)
    q = dict(Counter(terms))
    c = Counter({})


    b = ranking_params.get('b', 1)
    k1 = ranking_params.get('k1', 1)
    k2 = ranking_params.get('k2', 1)

    lens = collection_data.get(DOC_TO_LEN_KEY, {})
    average_len = collection_data.get(AVERAGE_KEY, 1)
    doc_id2article = collection_data.get(ID_TO_TITLE_KEY, {})
    postings_len = collection_data.get(POSTINGS_LEN_KEY, {})

    terms_heap = []
    heapify(terms_heap)
    terms_heap_len = len(q)

    for term in terms:
        if term in index:
            posting = index[term]
            doc_id, tf = posting[0]
            elem = (doc_id, (term, tf), 1)
            heappush(terms_heap, elem) if len(terms_heap) < terms_heap_len else heappushpop(terms_heap, elem)
    
    doc_id, (_, _), idx = terms_heap[0]
    
    result = []
    heapify(result)
    
    while len(terms_heap) > 0:
        tfs = {}
        while len(terms_heap) > 0:
            min_id, (term, tf), next_idx = terms_heap[0]
            
            if min_id != doc_id:
                break
            heappop(terms_heap)
            
            tfs[term] = tf

            if next_idx > len(index[term]) - 1:
                continue

            min_id, tf = index[term][next_idx]
            elem = (min_id, (term, tf), next_idx + 1)
            heappush(terms_heap, elem) if len(terms_heap) < terms_heap_len else heappushpop(terms_heap, elem)
        
        bm25 = sum([
            score_BM25(
                N=len(index), average_len=average_len, n=postings_len.get(term, 1),
                f=tfs.get(term, 0), qf=qf,
                dl=lens.get(doc_id, 1),
                b=b, k1=k1, k2=k2
            )
            for term, qf in q.items()
        ])
        
        elem = (bm25, doc_id)
        heappush(result, elem) if len(result) < top_size else heappushpop(result, elem)
                
        doc_id = min_id

    return sorted([(doc_id2article[doc_id], score) for score, doc_id in result], key=lambda x: -x[1])


In [410]:
demo_search(index0, search_indexed_heaps)

[coronovirus in belarus]
  25.27  COVID-19_pandemic_in_Belarus
  17.68  Daugava_River
  12.16  Bug_River
   9.94  Jagiellon_dynasty
   9.68  Byelorussian_Soviet_Socialist_Republic


[who won junior eurovision in 2005]
  43.55  Junior_Eurovision_Song_Contest_2019
  34.41  Junior_Eurovision_Song_Contest_2004
  34.21  Junior_Eurovision_Song_Contest_2014
  34.01  Junior_Eurovision_Song_Contest_2015
  30.98  List_of_ice_hockey_leagues


[science about full-text search]
  28.45  Information_retrieval
  26.80  Google_Search
  22.92  Popular_science
  22.74  Citizen_science
  21.99  Philosophy_of_science




Реализуйте static pruning до 50 элементов для каждого постинглиста.

Для каждого терма оставляем 50 документов с наибольшей частотой терма

In [92]:
def prune(index: Dict[Term, Posting], top_size: int = 50) -> Dict[Term, Posting]:
    return {k:sorted(sorted(v, key=lambda x:-x[1])[:top_size], key=lambda x:x[0]) for k, v in index.items()}


pruned_index = prune(index, 50)
for term, posting in pruned_index.items():
    prev_doc_id = -1
    for doc_id, freq in posting:
        assert doc_id > prev_doc_id
        prev_doc_id = doc_id

In [93]:
demo_search(pruned_index, search_indexed)

[coronovirus in belarus]
  25.27  COVID-19_pandemic_in_Belarus
  17.68  Daugava_River
  12.16  Bug_River
   9.94  Jagiellon_dynasty
   9.68  Byelorussian_Soviet_Socialist_Republic


[who won junior eurovision in 2005]
  43.55  Junior_Eurovision_Song_Contest_2019
  34.41  Junior_Eurovision_Song_Contest_2004
  34.21  Junior_Eurovision_Song_Contest_2014
  34.01  Junior_Eurovision_Song_Contest_2015
  30.98  List_of_ice_hockey_leagues


[science about full-text search]
  26.80  Google_Search
  23.00  Information_retrieval
  22.92  Popular_science
  21.28  British_Science_Association
  20.53  Science_fiction_movie




Сравните качество и скорость работы нового алгоритма поиска с предыдущим.

In [94]:
import time

start_time = time.time()
run("BM25", search_indexed, queries, index, collection_data, ranking_params)
print("--- %s seconds ---" % (time.time() - start_time))

Acc: 0.23, Acc10: 0.48, RR: 0.31: 100%|██████████| 200/200 [00:02<00:00, 73.05it/s]

BM25
  Accuracy: 0.23
  Accuracy10: 0.48
  RR: 0.31
--- 2.7399349212646484 seconds ---





In [225]:
import time

start_time = time.time()
ranking_params = {'b': 0.5, 'k1': 4, 'k2': 0}
run("BM25 doc-at-time", search_indexed_heaps, queries, index, collection_data, ranking_params)
print("--- %s seconds ---" % (time.time() - start_time))

Acc: 0.23, Acc10: 0.50, RR: 0.31: 100%|██████████| 200/200 [00:06<00:00, 29.65it/s]

BM25 doc-at-time
  Accuracy: 0.23
  Accuracy10: 0.50
  RR: 0.31
--- 6.747586250305176 seconds ---





Сравните качество и скорость работы нового индекса с предыдущим.

In [137]:
start_time = time.time()
run("Pruned BM25 doc-at-time", search_indexed_heaps, queries, pruned_index, collection_data, ranking_params)
print("--- %s seconds ---" % (time.time() - start_time))

Acc: 0.19, Acc10: 0.39, RR: 0.26: 100%|██████████| 200/200 [00:01<00:00, 146.46it/s]

Pruned BM25 doc-at-time
  Accuracy: 0.19
  Accuracy10: 0.39
  RR: 0.26
--- 1.3672301769256592 seconds ---





## ~Дополнительно~
### Сжатие индекса (+1 балл)
Реализуйте кодирование чисел алгоритмом VarInt.

In [358]:
import struct 
import io


class baseline_coder:
    def encode(output_stream, posting):
        for doc_id, freq in posting:
            output_stream.write(struct.pack('II', doc_id, freq))
            
    def decode(input_stream):
        res = []
        while True:
            data = input_stream.read(struct.calcsize('II'))
            if len(data) == 0:
                break
            res.append(struct.unpack('II', data))
        return res


class varint_coder:
    def encode_num(number):
        buf = b''
        while True:
            towrite = number & 0x7f
            number >>= 7
            if number:
                buf += (towrite | 0x80).to_bytes(1, byteorder='big')
            else:
                buf += (towrite).to_bytes(1, byteorder='big')
                break
        return buf
    
    def decode_num(input_stream):
        shift = 0
        result = 0
        while True:
            i = ord(input_stream.read(1))
            result |= (i & 0x7f) << shift
            shift += 7
            if not (i & 0x80):
                break

        return shift / 7, result
    
    def decode(input_stream):
        size = len(input_stream.getvalue())
        total_read = 0
        res = []
        while total_read < size:
            read, data = varint_coder.decode_num(input_stream)
            total_read += read
            res.append(data)
        return res
        
    def encode(output_stream, posting):
        for num in posting:
            output_stream.write(varint_coder.encode_num(num))
            

    

output = io.BytesIO()
varint_coder.encode(output, [1, 2, 3, 300, 20000])
print(output.getvalue())
posting = varint_coder.decode(io.BytesIO(output.getvalue()))

assert posting == [1, 2, 3, 300, 20000]

b'\x01\x02\x03\xac\x02\xa0\x9c\x01'


Сравните эффективность разных вариантов кодирования постинглистов:
 - Базовый вариант (4 байта на число)
 - Какой-нибудь алгоритм сжатия общего назначения (lz4/zstd/brotli/gzip)
 - VarInt
 - Delta-кодирование + Какой-нибудь алгоритм сжатия общего назначения 
 - Delta-кодирование + VarInt

In [419]:
import lz4.frame

class lz4_baseline_coder:
    def encode(output_stream, posting):
        output_stream0 = io.BytesIO()
        baseline_coder.encode(output_stream0, posting)
        output_stream.write(lz4.frame.compress(output_stream0.getvalue()))
            
    def decode(input_stream):
        decompressed = lz4.frame.decompress(input_stream.getvalue())
        return baseline_coder.decode(io.BytesIO(decompressed))

In [424]:
class varint_posting_coder:
    def encode(output_stream, posting):
        unzipped_posting = list(zip(*posting))
        varint_coder.encode(output_stream, unzipped_posting[0])
        varint_coder.encode(output_stream, unzipped_posting[1])
            
    def decode(input_stream):
        decoded = varint_coder.decode(input_stream)
        left = decoded[:len(decoded)//2]
        right = decoded[len(decoded)//2:]
        return list(zip(left, right))

In [446]:
class delta_coder:
    def encode(output_stream, posting):
        delta_posting = []
        last = -1
        for doc_id, f in posting:
            if last == -1:
                delta_posting.append((doc_id, f))
            else:
                delta_posting.append((doc_id - last - 1, f))
            last = doc_id
        lz4_baseline_coder.encode(output_stream, delta_posting)
        
    def decode(input_stream):
        delta_posting = lz4_baseline_coder.decode(input_stream)
        posting = []
        last = -1
        for doc_id, f in delta_posting:
            if last == -1:
                last = doc_id
            else:
                last = doc_id + last + 1
            posting.append((last, f))

        return posting

In [448]:
def posting2delta(posting):
    delta_posting = []
    last = -1
    for doc_id, f in posting:
        if last == -1:
            delta_posting.append((doc_id, f))
        else:
            delta_posting.append((doc_id - last - 1, f))
        last = doc_id
    return delta_posting


def delta2posting(delta_posting):
    posting = []
    last = -1
    for doc_id, f in delta_posting:
        if last == -1:
            last = doc_id
        else:
            last = doc_id + last + 1
        posting.append((last, f))
    return posting

In [449]:
class delta_coder:
    def encode(output_stream, posting):
        delta_posting = posting2delta(posting)
        lz4_baseline_coder.encode(output_stream, delta_posting)
        
    def decode(input_stream):
        delta_posting = lz4_baseline_coder.decode(input_stream)
        return delta2posting(delta_posting)

In [451]:
class delta_varint_coder:
    def encode(output_stream, posting):
        delta_posting = posting2delta(posting)
        varint_posting_coder.encode(output_stream, delta_posting)
        
    def decode(input_stream):
        delta_posting = varint_posting_coder.decode(input_stream)
        return delta2posting(delta_posting)

In [416]:
def test_encoded_size(coder, index):
    total_size = 0
    for term, posting in tqdm(index.items()):
        output = io.BytesIO()
        coder.encode(output, posting)
        data = output.getvalue()
        total_size += len(data)
        decoded_posting = coder.decode(io.BytesIO(data))
        assert decoded_posting == posting, f"{decoded_posting} != {posting}"
    print(f"{coder.__name__}: {total_size/1024/1024} MB")

In [418]:
test_encoded_size(baseline_coder, index)

100%|██████████| 124173/124173 [00:01<00:00, 85254.85it/s]

baseline_coder: 9.510276794433594 MB





In [417]:
test_encoded_size(lz4_baseline_coder, index)

100%|██████████| 124173/124173 [00:02<00:00, 60467.51it/s]

lz4_baseline_coder: 8.887649536132812 MB





In [425]:
test_encoded_size(varint_posting_coder, index)

100%|██████████| 124173/124173 [00:04<00:00, 27815.01it/s] 

varint_posting_coder: 3.551640510559082 MB





In [450]:
test_encoded_size(delta_coder, index)

100%|██████████| 124173/124173 [00:02<00:00, 45296.79it/s] 

delta_coder: 8.740694999694824 MB





In [452]:
test_encoded_size(delta_varint_coder, index)

100%|██████████| 124173/124173 [00:04<00:00, 26940.21it/s]

delta_varint_coder: 2.8513011932373047 MB



