In [70]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

es = Elasticsearch(["http://localhost:9200"])


index_name = "question_base"

mapping = {
    "mappings": {
        "properties": {
            "question": {
                "type": "text",
                "analyzer": "standard", 
                "search_analyzer": "english" 
            },
            "index": {
                "type": "keyword"
            }
        }
    }
}

if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mapping)

In [72]:
import io
import requests
import polars as pl
import numpy as np

from zipfile import ZipFile


resp = requests.get("https://dl.fbaipublicfiles.com/glue/data/QQP-clean.zip")

arch = ZipFile(io.BytesIO(resp.content))

schema = {'id': pl.Int64, 
          'id_left': pl.Int64, 
          'id_right': pl.Int64, 
          'text_left': pl.String, 
          'text_right': pl.String, 
          'label': pl.Int8}

def read_tsv(path, schema):
    data = []
    first = True
    with arch.open(path) as zp:
        for line in zp.readlines():
            if first:
                first = False
            else:
                data.append(line.decode('utf-8').strip().split("\t"))
        data = pl.DataFrame(data=data, schema=schema)
    return data

train = read_tsv('QQP/train.tsv', schema)
test = read_tsv('QQP/dev.tsv', schema)

idx_df = pl.concat([test.unique('id_left').select(pl.col('id_left'), pl.col('text_left')).rename({'id_left': 'idx', 'text_left': 'text'}),
                    test.unique('id_right').select(pl.col('id_right'), pl.col('text_right')).rename({'id_right': 'idx', 'text_right': 'text'}),
                    train.unique('id_left').select(pl.col('id_left'), pl.col('text_left')).rename({'id_left': 'idx', 'text_left': 'text'}),
                    train.unique('id_right').select(pl.col('id_right'), pl.col('text_right')).rename({'id_right': 'idx', 'text_right': 'text'})
          ]).unique()

  data = pl.DataFrame(data=data, schema=schema)


In [82]:
from kafka import KafkaProducer
from tqdm import tqdm

import json
import uuid

In [74]:
config = {
    'bootstrap_servers': 'localhost:9092',
    'client_id': 'search_terms_client',
    'value_serializer': lambda obj: json.dumps(obj).encode('utf-8'),
}

In [79]:
producer = KafkaProducer(**config)

In [101]:
for idx, query in tqdm(idx_df.iter_rows()):
    producer.send(topic=index_name,
                  value={'index': str(idx),
                         'question': query}
                 )

537916it [00:25, 20719.59it/s]


In [106]:
class CandidateModel:
    def __init__(self, es):
        self.es = es

    def create_index(self, index_name):
        self.index_name = index_name
        if not self.es.indices.exists(index=index_name):
            es.indices.create(index=index_name, body=mapping)
        return index_name

    def _fuzzy_search(self, q, size):
        body = {
            "size": size,
            "query": {
                "match": {
                    "question": {
                        "query": query,
                        "fuzziness": "AUTO"
                    }
                }
            }
        }
        response = self.es.search(index=self.index_name, body=body)
        return response["hits"]["hits"]

    def query(self, q, size=10):
        response = self._fuzzy_search(q, size=size)
        return [(i['_source']['index'], i['_source']['question']) for i in response]

In [107]:
cand = CandidateModel(es)
cand.create_index(index_name)

'question_base'

In [108]:
cand.query('How to kiss', 10)

[('389998', 'Which are the best private mba colleges in India?'),
 ('202998', 'Which is the best private university for an MBA in India?'),
 ('430272', 'Which MBA is best after MCA?'),
 ('430273', 'Which is best MBA or MCA?'),
 ('495889', 'Which course is best bba + mba or Animation?'),
 ('202999', 'What are the best private universities to do MBA in India?'),
 ('278176', 'Which is the best distance MBA institute in India?'),
 ('20565', 'Which are the best reputed MBA colleges in India?'),
 ('91649', 'Which are the best weekend MBA programs in India?'),
 ('526392', 'Which are the best MBA colleges in India except IIMs?')]

In [None]:
def get_candidates(q):
    body = {
        "size": size,
        "query": {
            "match": {
                "question": {
                    "query": query,
                    "fuzziness": "AUTO"
                }
            }
        }
    }
    response = es.search(index=index_name, body=body)
for id_left, id_right, label in tqdm(val_dataset.iter_rows(), total=len(val_dataset)):
    elastic_answer = {i['_source']['index']: i['_score'] for i in fuzzy_search(idx_to_text_mapping[id_left], size=10000)}
    ys_pred = np.array([elastic_answer[i] if i in elastic_answer else 0 for i in id_right])
    ndcg.append(ndcg_k(np.array(label), ys_pred))