In [1]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [26]:
#external files
from preprocessing import FileIO
from openai_interface import GPT_Turbo
from opensearch_interface import OpenSearchClient
from reranker import ReRanker

#standard library imports
import json
import time
import os
from math import ceil
from datetime import datetime
from typing import List, Any, Dict, Tuple, Union

#misc
from tqdm import tqdm

### Ingest data

In [3]:
data_path = './practice_data/impact_theory_minilm_196.parquet'
data = FileIO().load_parquet(data_path)

Shape of data: (37007, 16)
Memory Usage: 4.27+ MB


### Randomly select 100 chunks for Q/A pairs

In [4]:
import random

In [5]:
def sample_data(data: List[dict], sample_size: int):
    sample = random.sample(data, sample_size)
    contents = [(d['doc_id'], d['content']) for d in sample]
    return contents

In [6]:
def get_meta(sample: List[dict], key: str="doc_id") -> List[Any]:
    return [d[key] for d in sample]

In [7]:
def get_sample(doc_id: str, corpus: List[dict], full_dict: bool=False):
    result = [d for d in corpus if d['doc_id'] == doc_id][0]
    if full_dict: return result
    else: return result['content']

In [8]:
get_sample('kE3yryW-FiE_33', data)

"that would be nice if we all thought that way But of course, there's this primitive mind that we talked about which is this part of your brain that is not wired for truth it's wired for survival in 50,000 BC and what that often meant was agreeing with The sacred beliefs of your tribe and believing them and the people who could believe what the tribe believed With full conviction they survived. Well, they were you know, they were on the in-group they fit in and that's what was needed What's up, guys?"

In [9]:
def strip_numbers(query: str):
    return query[3:].strip()

In [10]:
def process_questions(question_tuples: List[tuple]) -> Dict[str, List[str]]:
    question_dict = {}
    for tup in question_tuples:
        doc_id = tup[0]
        questions = tup[1].split('\n')
        questions = [strip_numbers(q) for q in questions]
        question_dict[doc_id] = questions
    return question_dict

In [11]:
def generate_dataset(data: List[dict], dir_path: str, num_questions: int=100, batch_size: int=50):
    gpt = GPT_Turbo()
    if batch_size > 50:
        raise ValueError('Due to OpenAI rate limits, batch_size cannot be greater than 50')

    time_marker = datetime.now().strftime("%Y-%m-%d:%H:%M:%S")
    filepath = os.path.join(dir_path, f"{num_questions}_questions_{time_marker}.json")
    
    sample = sample_data(data, num_questions)
    batches = ceil(num_questions/batch_size)
    all_questions = []
    for n in range(batches):
        batch = sample[n*batch_size:(n+1)*batch_size]
        questions = gpt.batch_generate_question_context_pairs(batch)
        all_questions.append(questions)
        if n < batches - 1:
            print('Pausing for 60 seconds due to OpenAI rate limits...')
            time.sleep(60)
    all_questions = [tup for batch in all_questions for tup in batch]
    processed_questions = process_questions(all_questions)
    with open(filepath, 'w') as f:
        json.dump(processed_questions, f, indent=4)
    return processed_questions

In [12]:
dataset = generate_dataset(data=data, dir_path='./practice_data/', num_questions=100)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:07<00:00,  6.59Generated Questions/s]


Pausing for 60 seconds due to OpenAI rate limits...


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:07<00:00,  6.79Generated Questions/s]


In [89]:
gteclient = OpenSearchClient(model_name_or_path='/home/elastic/notebooks/vector_search_applications/models/gte-base/')
osclient = OpenSearchClient()
reranker = ReRanker()
intfloat = ReRanker(model_name='intfloat/simlm-msmarco-reranker')

In [14]:
query = "How did the United States respond to the Soviet Union's advancements in space?"
kw_index = 'impact-theory-minilm-196'
vec_index = 'impact-theory-minilm-196'

In [92]:
def run_evaluation( dataset: Dict[str, List[str]], 
                    retriever: OpenSearchClient,
                    reranker: ReRanker,
                    kw_index_name: str, 
                    vector_index_name: str,
                    response_size: int=10,
                    top_k: int=5,
                    chunk_size: int=196,
                    rerank_all_responses: bool=False,
                    ) -> Tuple[int, int, int, int]:

    top_k = top_k if top_k else response_size
    reranker_name = reranker.model_name if rerank_all_responses else "None"
    
    results_dict = {'n':response_size, 
                    'top_k': top_k, 
                    'Retriever': retriever.model_name_or_path, 
                    'Ranker': reranker_name,
                    'chunk_size': chunk_size,
                    'kw_hit_rate': 0,
                    'vector_hit_rate': 0,
                    'hybrid_hit_rate':0,
                    'total_questions':0
                    }
    for doc_id, questions in tqdm(dataset.items(), 'Questions'):
        for q in questions:
            results_dict['total_questions'] += 1
            
            #make calls to OpenSearch host of: Keyword, Vector, and Hybrid
            kw_response = retriever.keyword_search(query=q, index=kw_index_name, size=response_size)
            vector_response = retriever.vector_search(query=q, index=vector_index_name, size=response_size)
            hybrid_response = retriever.hybrid_search(q, kw_index_name, vector_index_name, kw_size=response_size, vec_size=response_size)

            #rerank returned responses if rerank_all is True
            if rerank_all_responses:
                kw_response = reranker.rerank(kw_response, q, top_k=top_k)
                vector_response = reranker.rerank(vector_response, q, top_k=top_k)
                hybrid_response = reranker.rerank(hybrid_response, q, top_k=top_k)
                
            #collect doc_ids to check for document matches (include only top_k if top_k > 0)
            kw_doc_ids = [res['_source']['doc_id'] for res in kw_response][:top_k]
            vector_doc_ids = [res['_source']['doc_id'] for res in vector_response][:top_k]
            hybrid_doc_ids = [res['_source']['doc_id'] for res in hybrid_response][:top_k]
            
            #increment hit_rate counters as appropriate
            if doc_id in kw_doc_ids:
                results_dict['kw_hit_rate'] += 1
            if doc_id in vector_doc_ids:
                results_dict['vector_hit_rate'] += 1
            if doc_id in hybrid_doc_ids:
                results_dict['hybrid_hit_rate'] += 1

    #use raw counts to calculate final scores
    calc_hit_rate_scores(results_dict)
    
    return results_dict
        

In [93]:
def calc_hit_rate_scores(results_dict: Dict[str, Union[str, int]]) -> None:
    for prefix in ['kw', 'vector', 'hybrid']:
        results_dict[f'{prefix}_score'] = round(results_dict[f'{prefix}_hit_rate']/results_dict['total_questions'],2)

In [94]:
def record_results(results_dict: Dict[str, Union[str, int]], dir_outpath: str=None) -> None:
    #write results to output file
    if dir_outpath:
        time_marker = datetime.now().strftime("%Y-%m-%d:%H:%M:%S")
        path = os.path.join(dir_outpath, f'retrieval_eval_{chunk_size}_{time_marker}.json')
        with open(path, 'w') as f:
            json.dump(results_dict, f, indent=4)

In [113]:
chunk_size = 196
all_results = []
for x in range(60,61):
    results = run_evaluation( dataset=dataset, 
                              retriever=osclient, 
                              reranker=intfloat,
                              kw_index_name=kw_index, 
                              vector_index_name=vec_index, 
                              response_size=x, 
                              top_k=10,
                              rerank_all_responses=True,
                            )
    all_results.append(results)
record_results(all_results, dir_outpath='./practice_data/')

intfloat/simlm-msmarco-reranker


Questions: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [03:11<00:00,  1.92s/it]


In [121]:
# %%time
# query = 'How do I get ahead in life?'
# resp = osclient.hybrid_search(query, kw_index, vec_index, kw_size=60, vec_size=60)
# intfloat.rerank(resp, query)

Bad pipe message: %s [b"\xe9.^%\x06\x19\xe9\x0e\xd3\xc7]\x86\xfb\x82\x88;\x86C\x00\x00|\xc0,\xc00\x00\xa3\x00\x9f\xcc\xa9\xcc\xa8\xcc\xaa\xc0\xaf\xc0\xad\xc0\xa3\xc0\x9f\xc0]\xc0a\xc0W\xc0S\xc0+\xc0/\x00\xa2\x00\x9e\xc0\xae\xc0\xac\xc0\xa2\xc0\x9e\xc0\\\xc0`\xc0V\xc0R\xc0$\xc0(\x00k\x00j\xc0#\xc0'\x00g\x00@\xc0\n\xc0\x14\x009\x008\xc0\t\xc0\x13\x003\x002\x00\x9d"]
Bad pipe message: %s [b'\x07Co']
Bad pipe message: %s [b'dEG[\x11\x96\x16\xd0X\xbd\r\xde{\x00\x00\xa6\xc0,\xc00\x00\xa3\x00\x9f\xcc\xa9\xcc\xa8\xcc\xaa\xc0\xaf\xc0\xad\xc0\xa3\xc0\x9f\xc0]\xc0a\xc0W\xc0S\xc0+\xc0/\x00\xa2\x00\x9e\xc0\xae\xc0\xac\xc0\xa2\xc0\x9e\xc0\\\xc0`\xc0V\xc0']
Bad pipe message: %s [b"$\xc0(\x00k\x00j\xc0s\xc0w\x00\xc4\x00\xc3\xc0#\xc0'\x00g\x00@\xc0r\xc0v\x00\xbe\x00\xbd\xc0\n\xc0\x14\x009\x008\x00\x88\x00\x87\xc0\t\xc0\x13\x003\x002\x00\x9a\x00\x99\x00E\x00D\xc0\x07\xc0\x11\xc0\x08\xc0\x12\x00\x16\x00\x13\x00\x9d\xc0\xa1\xc0\x9d\xc0Q\x00\x9c"]
Bad pipe message: %s [b'\xf8\xb4\\\x07n\x80\xed\x8d\x7fit\x