In [6]:
import gzip
import math
import numpy as np
import os
import pytest
import pickle
import random
import requests
import tarfile
import time
import timeit
import sys

from collections import Counter
from collections import defaultdict
from elasticsearch import Elasticsearch
from playsound import playsound
from tqdm.notebook import tqdm

INDEX_NAME = 'ms-marco'

In [14]:
def save_picke(file_path,obj):
    with open(file_path, 'wb') as handle:
        pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)

def load_pickle(file_path):
    with open(file_path, 'rb') as handle:
        obj = pickle.load(handle)
    return obj

def finished(n=1):
    for i in range(n):
        playsound('assets/bell.wav')
        time.sleep(1.5)

def read_file(file,start_line = 0,n=20,encoding = None):
    lines = []
    read_lines = 0
    with open(file,'r',encoding=encoding) as f:
        for i,line in enumerate(f):
            if not start_line or i  >= start_line:
                lines.append(line)
                read_lines +=1
                if n and read_lines > n:
                    break
    return lines

def download_file(target_path,url):
    local_filename = url.split('/')[-1]
    # NOTE the stream=True parameter below
    file_downloaded = False
    file_path = os.path.join(target_path,local_filename)
    byte_pos = 0
    if os.path.exists(file_path):
        print(f'\tFile {file_path} already exists, skipping...')
        return file_path
    try:
        os.remove(file_path)
    except OSError:
        pass
    print(f'Getting file from {url}')
    while not file_downloaded:
        resume_header = {f'Range': 'bytes=%d-' % byte_pos}
        try:
            with requests.get(url, headers=resume_header, stream=True,  verify=False, allow_redirects=True) as r:
            #with requests.get(url, stream=True) as r:
                r.raise_for_status()
                for chunk in  r.iter_content(chunk_size=8192):
                    with open(file_path, 'ab') as f:
                        # If you have chunk encoded response uncomment if
                        # and set chunk_size parameter to None.
                        #if chunk: 
                        f.write(chunk)
                        byte_pos += 1
                file_downloaded = True
        except:
            print('An error occured while downloading. Retrying...')
    return local_filename

def clear_indices(excluded_indices= []):
    for index in  [index for index  in es.indices.stats()['indices'].keys() if index not in excluded_indices]:
        es.indices.delete(index)
        
def create_index(es,index_name,body,overwrite = False):
    indices = es.indices.stats()['indices'].keys()
    if index_name in  indices:
        if overwrite:
            print(f'overwriting index {index_name}')
            es.indices.delete(index_name)
        else:
            print(f'Index {index_name} already exists')
    else:
        es.indices.create(index_name,body=body)
        
def extract_gz_files(file_path,override=False,n=8):
    x_file_out_path = file_path.replace('.gz','')
    if override:
        try:
            os.remove(x_file_out_path)
        except OSError:
            pass
    if os.path.exists(x_file_out_path):
        print(f'\tFile {x_file_out_path} already exists, skipping...')
    else:
        print(f'\tExtracting file {file_path}')
        gz_file = gzip.GzipFile(file_path, 'rb')
        while True:
            chunk = gz_file.read(n)
            if chunk == b'':
                break
            x_file_out = open(x_file_out_path, 'ab')
            x_file_out.write(chunk)
            x_file_out.close()
        gz_file.close()
        print(f'\t\tExtracted {x_file_out_path}!')
    return x_file_out_path
    

def extract_document(doc_str):
    keys = ['id','url','title','body']
    document = {}
    doc_id = None
    doc_meta = doc_str.split('\t')
    for i in range(len(doc_meta)):
        key = keys[i]
        if key == 'id':
            doc_id = doc_meta[i]
        elif key == 'body':
            meta = doc_meta[i]
            # Used to remove initial double quote and ending pattern [ "\n] per document (") 
            document[key] = doc_meta[i][1:-3]
        else:    
            document[key] = doc_meta[i]
    return doc_id,document


def process_corpus(file_path,n=None,encoding=None):
        lines_read = 0
        continue_at_line = 0
        finished_no_error = False
        while not finished_no_error:
            print(f'Continuing from line {continue_at_line}')
            with open(file_path,'r',encoding=encoding) as f:
                try:
                    for i, line in enumerate(f):
                        if i < continue_at_line:
                            continue
                        if n and lines_read >= n:
                            finished_no_error = True
                            break
                        doc_id, doc = extract_document(line)
                        lines_read += 1
                        print(f"\rProcessing document no: {lines_read} [{doc_id}...]", end="")
                        for es in ES_INSTANCES:
                            es.index(index=INDEX_NAME, id=doc_id, body=doc)
                        
                        continue_at_line = i
                        finished_no_error = True
                except:
                    print(f'An error ocurred while parsing processing the document {lines_read} {doc_id} {sys.exc_info()[0]}')

## Do not run the cell below
It will not be necessary unless it is desired to download the whole dataset in the local machine

In [3]:

urls = [
'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-docs.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-docs-lookup.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-doctrain-queries.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-docdev-queries.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-docdev-top100.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/msmarco-docdev-qrels.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/docleaderboard-queries.tsv.gz'
,'https://msmarco.blob.core.windows.net/msmarcoranking/docleaderboard-top100.tsv.gz'
]
source_path = INDEX_NAME.upper()

if not os.path.isdir(source_path):
        os.mkdir(source_path)


gzfiles = []
for url in urls:
    gzfile = download_file(source_path,url)
    gzfiles.append(gzfile)
    
files = []
for file in gzfiles:
    file = extract_gz_files(file,override=False,n=2056)
    files.append(file)
    
finished()


	File MS-MARCO\msmarco-docs.tsv.gz already exists, skipping...
	File MS-MARCO\msmarco-docs-lookup.tsv.gz already exists, skipping...
	File MS-MARCO\msmarco-doctrain-queries.tsv.gz already exists, skipping...
	File MS-MARCO\msmarco-docdev-queries.tsv.gz already exists, skipping...
	File MS-MARCO\msmarco-docdev-top100.gz already exists, skipping...
	File MS-MARCO\msmarco-docdev-qrels.tsv.gz already exists, skipping...
	File MS-MARCO\docleaderboard-queries.tsv.gz already exists, skipping...
	File MS-MARCO\docleaderboard-top100.tsv.gz already exists, skipping...
	File MS-MARCO\msmarco-docs.tsv already exists, skipping...
	File MS-MARCO\msmarco-docs-lookup.tsv already exists, skipping...
	File MS-MARCO\msmarco-doctrain-queries.tsv already exists, skipping...
	File MS-MARCO\msmarco-docdev-queries.tsv already exists, skipping...
	File MS-MARCO\msmarco-docdev-top100 already exists, skipping...
	File MS-MARCO\msmarco-docdev-qrels.tsv already exists, skipping...
	File MS-MARCO\docleaderboard-que

In [4]:
FIELDS = ['url','title', 'body']
body = {
    'mappings': {
            'properties': {
                'url': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                },
                'title': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                },
                'body': {
                    'type': 'text',
                    'term_vector': 'yes',
                    'analyzer': 'english'
                }
            }
        }
    }


## Important!! Do not modify 'overwrite' flag as it will destroy remote elasticsearchh index
Run it as it is to create a local index on your machine. If desired. otherwise ignore it and remove DEFAULT_ES from ES_INSTANCES

In [5]:
overwrite = False # DO NOT CHANGE THIS FLAG!!!
user = 'elastic'
password = 'IfKREtTr7fCqMYTD8NKE4yBi'
remote_url = f'https://{user}:{password}@6a0fe46eef334fada72abc91933b54e8.us-central1.gcp.cloud.es.io:9243'

DEFAULT_ES = Elasticsearch()

REMOTE_ES = Elasticsearch(hosts=remote_url)

ES_INSTANCES = [DEFAULT_ES]
for es in ES_INSTANCES:
    create_index(es,INDEX_NAME,body,overwrite = overwrite)
    print(es.info())
    
#es.cat.count(INDEX_NAME, params={"format": "json"})
#a.exists(INDEX_NAME,'D1810083')

{'name': 'ODIN', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'cFMULXp6QzKZ_LbFfJS4pw', 'version': {'number': '7.9.3', 'build_flavor': 'default', 'build_type': 'zip', 'build_hash': 'c4138e51121ef06a6404866cddc601906fe5c868', 'build_date': '2020-10-16T10:36:16.141335Z', 'build_snapshot': False, 'lucene_version': '8.6.2', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}


In [13]:
docs_needed = set()

with open('MS-MARCO\\msmarco-docdev-top100', 'r') as file:
    for line in file:
        doc = line.split()[2]
        docs_needed.add(doc)

with open('MS-MARCO\\docleaderboard-top100.tsv', 'r') as file:
    for line in file:
        doc = line.split()[2]
        docs_needed.add(doc)
docs_needed = list(docs_needed)
print(len(docs_needed))

713362


In [13]:
docs_needed_dict = load_pickle('dump.pickle')
new_es = Elasticsearch()
create_index(new_es,INDEX_NAME,body,overwrite = False)
new_es.info()
for doc_id in tqdm(docs_needed_dict):
    new_es.index(index=INDEX_NAME, id=doc_id, body=docs_needed_dict[doc_id])
finished()

Index ms-marco already exists


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=713362.0), HTML(value='')))




In [15]:
finished(10)