In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
import ir_measures
from ir_measures import *
import pandas as pd
import json
from tqdm import tqdm
from time import time
import warnings
warnings.filterwarnings("ignore")


# Connection

In [2]:
es = Elasticsearch(hosts='https://localhost:9200', 
                     basic_auth=('elastic', 'sYV-CgqebNRTw1e=L=pY'),
                     verify_certs=False)


## WikiIR

In [3]:
df = pd.read_csv('wikIR1k/documents.csv')


### Index Configuration

In [4]:
# Without stemming

mappings = {
    'properties': {
        'text': {
            'type': 'text',
            'analyzer': 'white'
        }
    }
}

settings = {
    'analysis' : {
        'analyzer' : {
            'white' : {
                'tokenizer' : 'whitespace'
            }
        }
    }
}

index = 'wiki'

if es.indices.exists(index=index):
    es.indices.delete(index=index)
es.indices.create(index=index, settings=settings, mappings=mappings)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'wiki'})

### Checking analyzer

In [5]:
def check_analyzer(analyzer, text):
    body = analyzer
    body['text'] = text
    tokens = es.indices.analyze(index=index, body=body)['tokens']
    tokens = [token_info['token'] for token_info in tokens]
    return tokens

text = 'Tests for small assignment'
analyzer = {
    'analyzer': 'white'
}

check_analyzer(analyzer, text)


['Tests', 'for', 'small', 'assignment']

In [6]:
# Indexing documents

def create_es_action(index, doc_id, document):
    return {
        '_index': index,
        '_id': doc_id,
        '_source': document
    }


def es_action_generator(df):
    for doc_id, row in tqdm(df.iterrows(), total=df.shape[0]):
        doc = {'text': row['text_right']}
        yield create_es_action(index, row['id_right'], doc)


start = time()
for ok, result in parallel_bulk(es, es_action_generator(df), queue_size=4, thread_count=4, chunk_size=1000):
    if not ok:
        print(result)
stop = time()
print('Indexing time:', stop - start)
        
es.indices.refresh(index=index)

100%|██████████| 369721/369721 [00:24<00:00, 15328.86it/s]


Indexing time: 24.41376805305481


ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

In [7]:
# with stemming

mappings = {
    'properties': {
        'text': {
            'type': 'text',
            'analyzer': 'porter_stemmer'
        }
    }
}

settings = {
    'analysis' : {
        'analyzer' : {
            'porter_stemmer' : {
                'tokenizer' : 'whitespace',
                'filter' : ['porter_stem']
            }
    },
        'filter' : {
            'porter_stem' : {
                'type' : 'porter_stem',
                'language' : 'English'
            }
        }
    }
}


index = 'wiki'

if es.indices.exists(index=index):
    es.indices.delete(index=index)
es.indices.create(index=index, settings=settings, mappings=mappings)


analyzer = {
    'analyzer': 'porter_stemmer'
}

check_analyzer(analyzer, text)

['Test', 'for', 'small', 'assign']

In [8]:
start = time()
for ok, result in parallel_bulk(es, es_action_generator(df), queue_size=4, thread_count=4, chunk_size=1000):
    if not ok:
        print(result)
stop = time()

print('Indexing time:', stop-start)
        
es.indices.refresh(index=index)

100%|██████████| 369721/369721 [00:25<00:00, 14425.79it/s]


Indexing time: 25.94707989692688


ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

### Search

In [9]:
def pretty_print_result(search_result, fields=[]):
    res = search_result['hits']
    print(f'Total documents: {res["total"]["value"]}')
    for hit in res['hits']:
        print(f'Doc {hit["_id"]}, score is {hit["_score"]}')
        for field in fields:
            print(f'{field}: {hit["_source"][field]}')
    
def search(query, *args):
    return pretty_print_result(es.search(index=index, body=query, size=20), args)

def get_doc_by_id(doc_id):
    return es.get(index=index, id=doc_id)['_source']

def search_results(query_id, query):
    res = es.search(index=index, body=query, size=20)['hits']
    return [(str(query_id), str(hit['_id']), hit['_score'], rank) for rank, hit in enumerate(res['hits'])]


### Queries

In [10]:
test_queries = pd.read_csv('queries.csv')


In [12]:
def make_query(text):
    return {
        'query': {
            'bool': {
                'must': {
                    'match': {
                        'text': text
                    }
                }
            }
        }
    }

def score(test):
    result = []
    file = open('data.res', 'w')
    for i, row in test.iterrows():
        for res in search_results(row['id_left'], make_query(row['text_left'])):
            result.append(ir_measures.ScoredDoc(res[0], res[1], res[2]))
            file.write(f'{res[0]} Q0 {res[1]} {res[3]} {res[2]} BM25\n')
    file.close()
    return result

In [13]:
run = score(test_queries)
start = time()
score(test_queries)
stop = time()

print('Query execution time (total):', stop-start, 's')

Query execution time (total): 0.9339628219604492 s


### lemmatization

In [14]:
# conda install -c conda-forge spacy
# import spacy

docs_list = []
nlp = spacy.load('en_core_web_sm')

for i, row in tqdm(df.iterrows(), total=df.shape[0]):
    nlp_doc = nlp(row['text_right'])
    new_doc = ''
    for token in nlp_doc:
        new_doc = new_doc + ' ' + token.lemma_
    docs_list.append(new_doc)

lemmatized = pd.DataFrame({'id_right': df['id_right'].values, 'text_right': docs_list})
lemmatized.to_csv('optional_part.csv', index=None)

NameError: name 'spacy' is not defined

In [15]:
lemmatized = pd.read_csv('optional_part.csv')

### Queries (lemmatization)

In [16]:
if es.indices.exists(index=index):
    es.indices.delete(index=index)
es.indices.create(index=index, settings=settings, mappings=mappings)


start = time()
for ok, result in parallel_bulk(es, es_action_generator(lemmatized), queue_size=4, thread_count=4, chunk_size=1000):
    if not ok:
        print(result)
stop = time()

print('Indexing time:', stop-start)
        
es.indices.refresh(index=index)

100%|██████████| 369721/369721 [00:24<00:00, 14896.30it/s]


Indexing time: 25.227459192276


ObjectApiResponse({'_shards': {'total': 2, 'successful': 1, 'failed': 0}})

In [17]:
# BM25 results
BM25 = ir_measures.read_trec_run('BM25.res')
qrels = ir_measures.read_trec_qrels('qrels')

ir_measures.calc_aggregate([P@5, P@10, P@20, AP], qrels, BM25)

{P@20: 0.09499999999999999,
 AP: 0.11196168401599797,
 P@5: 0.18399999999999994,
 P@10: 0.1319999999999999}

In [18]:
results = ir_measures.read_trec_run('data.res')
qrels = ir_measures.read_trec_qrels('qrels')

ir_measures.calc_aggregate([P@5, P@10, P@20, AP], qrels, results)

{P@20: 0.14350000000000002,
 AP: 0.14653833730973906,
 P@5: 0.3119999999999997,
 P@10: 0.20999999999999994}

In [19]:
def make_query(text):
    return {
        'query':{
            "bool": {
                'must': {
                    'match': {
                        'text': text
                    }             
                },
                'should': {
                    "match_phrase": {
                        "text": {
                            "query": text,
                            "boost": 5
                        }
                    }
                }
            }
        }
    }

results = ir_measures.read_trec_run('lem_test_query.res')
qrels = ir_measures.read_trec_qrels('qrels')

ir_measures.calc_aggregate([P@5, P@10, P@20, AP], qrels, results)

{P@20: 0.1315,
 AP: 0.1212078886744036,
 P@5: 0.24399999999999986,
 P@10: 0.18499999999999994}