# Query/Context Dataset Generation
***

This notebook walks students through the process of generating datasets of query/context pairs which can be used for two primary purposes:
- Fine-tuning an embedding model
- Serve as ground truth for retrieval evaluation

In [1]:
%load_ext autoreload 
%autoreload 2

import sys
sys.path.append('../')

# from src.evaluation.retrieval_evaluation import QueryContextGenerator
# from llama_index.finetuning import EmbeddingQAFinetuneDataset
from src.llm.prompt_templates import qa_generation_prompt
from src.llm.llm_interface import LLM
from src.preprocessor.preprocessing import FileIO
from rich import print
import random
import pandas as pd
import os

from dotenv import load_dotenv
env = load_dotenv('.env', override=True)

In [2]:
print(qa_generation_prompt)

In [7]:
llm = LLM(model_name='azure/gpt-35-turbo', 
          api_key=os.environ['AZURE_OPENAI_API_KEY'],
          api_version=os.environ['AZURE_OPENAI_API_VERSION'],
          api_base=os.environ['AZURE_OPENAI_ENDPOINT'])

In [8]:
generator = QueryContextGenerator(llm)

In [16]:
data_path = '../data/huberman_minilm-512.parquet'
data = FileIO().load_parquet(data_path)

Shape of data: (11602, 13)
Memory Usage: 1.15+ MB


In [5]:
class QueryContextGenerator:
    '''
    Class designed for the generation of query/context pairs using a
    Generative LLM. The LLM is used to generate questions from a given
    corpus of text. The query/context pairs can be used to fine-tune 
    an embedding model using a MultipleNegativesRankingLoss loss function
    or can be used to create evaluation datasets for retrieval models.
    '''
    def __init__(self, 
                 llm: LLM
                ):
        self.llm = llm

    def clean_validate_data(self,
                            data: list[dict], 
                            valid_fields: list[str]=['content', 'summary', 'guest', 'doc_id'],
                            total_chars: int=950
                            ) -> list[dict]:
        '''
        Strip original data chunks so they only contain valid_fields.
        Remove any chunks less than total_chars in size. Prevents LLM
        from asking questions from sparse content. 
        '''
        clean_docs = [{k:v for k,v in d.items() if k in valid_fields} for d in data]
        valid_docs = [d for d in clean_docs if len(d['content']) > total_chars]
        return valid_docs

    def train_val_split(self,
                        data: list[dict],
                        n_train_questions: int, 
                        n_val_questions: int, 
                        n_questions_per_chunk: int=2,
                        total_chars: int=950):
        '''
        Splits corpus into training and validation sets.  Training and 
        validation samples are randomly selected from the corpus. total_chars
        parameter is set based on pre-analysis of average doc length in the 
        training corpus. 
        '''
        clean_data = self.clean_validate_data(data, total_chars=total_chars)
        random.shuffle(clean_data)
        train_index = n_train_questions//n_questions_per_chunk
        valid_index = n_val_questions//n_questions_per_chunk
        end_index = valid_index + train_index
        if end_index > len(clean_data):
            raise ValueError('Cannot create dataset with desired number of questions, try using a larger dataset')
        train_data = clean_data[:train_index]
        valid_data = clean_data[train_index:end_index]
        print(f'Length Training Data: {len(train_data)}')
        print(f'Length Validation Data: {len(valid_data)}')
        return train_data, valid_data

    def generate_qa_embedding_pairs(
                                    self,
                                    data: list[dict],
                                    generate_prompt_tmpl: str,
                                    system_message: str='You are a helpful assistant.',
                                    num_questions_per_chunk: int = 2,
                                    ) -> dict:
        """
        Generate query/context pairs from a list of documents. The query/context pairs
        can be used for fine-tuning an embedding model using a MultipleNegativesRankingLoss
        or can be used to create an evaluation dataset for retrieval models.
        """
        queries = {}
        relevant_docs = {}
        corpus = {chunk['doc_id'] : chunk['content'] for chunk in data}
        for chunk in tqdm(data):
            summary = chunk['summary']
            guest = chunk['guest']
            transcript = chunk['content']
            doc_id = chunk['doc_id']
            assist_message = generate_prompt_tmpl.format(summary=summary, 
                                                         guest=guest,
                                                         transcript=transcript,
                                                         num_questions_per_chunk=num_questions_per_chunk)
            try:
                response = self.llm.chat_completion(system_message, 
                                                    assist_message, 
                                                    temperature=1.0, 
                                                    max_tokens=num_questions_per_chunk*50,
                                                    return_raw=False
                                                   )
            except Exception as e:
                print(e)
                continue
            result = response.strip().split("\n")
            questions = [
                re.sub(r"^\d+[\).\s]", "", question).strip() for question in result
            ]
            questions = [question for question in questions if len(question) > 0]

            for question in questions:
                question_id = str(uuid.uuid4())
                queries[question_id] = question
                relevant_docs[question_id] = [doc_id]

        # construct dataset
        return dict(queries=queries, corpus=corpus, relevant_docs=relevant_docs)

# def execute_evaluation(dataset: dict, 
#                        class_name: str, 
#                        retriever: str,
#                        reranker: ReRanker=None,
#                        alpha: float=0.5,
#                        retrieve_limit: int=100,
#                        top_k: int=5,
#                        chunk_size: int=256,
#                        hnsw_config_keys: list[str]=['maxConnections', 'efConstruction', 'ef'],
#                        search_type: Literal['hybrid', 'all']='all',
#                        search_properties: list[str]=['content'],
#                        display_properties: list[str]=['doc_id', 'content'],
#                        dir_outpath: str='./eval_results',
#                        include_miss_info: bool=False,
#                        user_def_params: dict=None
#                        ) -> Union[dict, Tuple[dict, list[dict]]]:
#     '''
#     Given a dataset, a retriever, and a reranker, evaluate the performance of the retriever and reranker. 
#     Returns a dict of kw, vector, and hybrid hit rates and mrr scores. If inlude_miss_info is True, will
#     also return a list of kw and vector responses and their associated queries that did not return a hit.

#     Args:
#     -----
#     dataset: EmbeddingQAFinetuneDataset
#         Dataset to be used for evaluation
#     class_name: str
#         Name of Class on Weaviate host to be used for retrieval
#     retriever: WeaviateClient
#         WeaviateClient object to be used for retrieval 
#     reranker: ReRanker
#         ReRanker model to be used for results reranking
#     alpha: float=0.5
#         Weighting factor for BM25 and Vector search.
#         alpha can be any number from 0 to 1, defaulting to 0.5:
#             alpha = 0 executes a pure keyword search method (BM25)
#             alpha = 0.5 weighs the BM25 and vector methods evenly
#             alpha = 1 executes a pure vector search method
#     retrieve_limit: int=5
#         Number of documents to retrieve from Weaviate host
#     top_k: int=5
#         Number of top results to evaluate
#     chunk_size: int=256
#         Number of tokens used to chunk text
#     hnsw_config_keys: list[str]=['maxConnections', 'efConstruction', 'ef']
#         list of keys to be used for retrieving HNSW Index parameters from Weaviate host
#     search_type: Literal['kw', 'vector', 'hybrid', 'all']='all'
#         Type of search to be evaluated.  Options are 'kw', 'vector', 'hybrid', or 'all'
#     search_properties: list[str]=['content']
#         list of properties to be used for search
#     display_properties: list[str]=['doc_id', 'content']
#         list of properties to be returned from Weaviate host for display in response
#     dir_outpath: str='./eval_results'
#         Directory path for saving results.  Directory will be created if it does not
#         already exist. 
#     include_miss_info: bool=False
#         Option to include queries and their associated search response values
#         for queries that are "total misses"
#     user_def_params : dict=None
#         Option for user to pass in a dictionary of user-defined parameters and their values.
#         Will be automatically added to the results_dict if correct type is passed.
#     '''
        
#     reranker_name = reranker.model_name if reranker else "None"
    
#     results_dict = {'n':retrieve_limit, 
#                     'top_k': top_k,
#                     'alpha': alpha,
#                     'Retriever': retriever.model_name_or_path, 
#                     'Ranker': reranker_name,
#                     'chunk_size': chunk_size,
#                     'hybrid_hit_rate':0,
#                     'hybrid_mrr': 0,
#                     }
#     #add extra params to results_dict
#     kw_vector_metrics = {
#                          'kw_hit_rate': 0,
#                          'kw_mrr': 0,
#                          'vector_hit_rate': 0,
#                          'vector_mrr': 0
#                          }
#     if search_type == 'all':
#         results_dict = {**results_dict, **kw_vector_metrics}
#     results_dict['total_questions'] = 0
#     results_dict['total_misses'] = 0
#     results_dict = add_params(retriever, class_name, results_dict, user_def_params, hnsw_config_keys)
        
#     start = time.perf_counter()
#     miss_info_list = []
#     for query_id, q in tqdm(dataset.queries.items(), 'Queries'):
#         results_dict['total_questions'] += 1
#         hit = False
#         #make Keyword, Vector, and Hybrid calls to Weaviate host
#         try:
#             hybrid_response = retriever.hybrid_search(request=q, class_name=class_name, properties=search_properties, alpha=alpha, limit=retrieve_limit, display_properties=display_properties)  
#             if search_type == 'all':
#                 kw_response = retriever.keyword_search(request=q, class_name=class_name, properties=search_properties, limit=retrieve_limit, display_properties=display_properties)
#                 vector_response = retriever.vector_search(request=q, class_name=class_name, limit=retrieve_limit, display_properties=display_properties)         
#             #rerank returned responses if reranker is provided
#             if reranker:
#                 hybrid_response = reranker.rerank(hybrid_response, q, top_k=top_k)
#                 if search_type == 'all':
#                     kw_response = reranker.rerank(kw_response, q, top_k=top_k)
#                     vector_response = reranker.rerank(vector_response, q, top_k=top_k)
                
            
#             #collect doc_ids to check for document matches (include only results_top_k)
#             hybrid_doc_ids = {result['doc_id']:i for i, result in enumerate(hybrid_response[:top_k], 1)}
#             if search_type == 'all':
#                 kw_doc_ids = {result['doc_id']:i for i, result in enumerate(kw_response[:top_k], 1)}
#                 vector_doc_ids = {result['doc_id']:i for i, result in enumerate(vector_response[:top_k], 1)}
#             #extract doc_id for scoring purposes
#             doc_id = dataset.relevant_docs[query_id][0]
     
#             #increment hit_rate counters and mrr scores
#             if doc_id in hybrid_doc_ids:
#                 results_dict['hybrid_hit_rate'] += 1
#                 results_dict['hybrid_mrr'] += 1/hybrid_doc_ids[doc_id]
#                 hit = True
#             if search_type == 'all':
#                 if doc_id in kw_doc_ids:
#                     results_dict['kw_hit_rate'] += 1
#                     results_dict['kw_mrr'] += 1/kw_doc_ids[doc_id]
#                     hit = True
#                 if doc_id in vector_doc_ids:
#                     results_dict['vector_hit_rate'] += 1
#                     results_dict['vector_mrr'] += 1/vector_doc_ids[doc_id]
#                     hit = True
#             # if no hits, let's capture that
#             if not hit:
#                 results_dict['total_misses'] += 1
#                 miss_info = {'query': q, 
#                              'answer': dataset.corpus[doc_id],
#                              'doc_id': doc_id,
#                              'hybrid_response': hybrid_response}
#                 if search_type == 'all':
#                     miss_info = {**miss_info, 
#                                  'kw_response': kw_response,
#                                  'vector_response': vector_response}
#                 miss_info_list.append(miss_info)

#         except Exception as e:
#             print(e)
#             continue

#     #use raw counts to calculate final scores
#     calc_hit_rate_scores(results_dict, search_type=search_type)
#     calc_mrr_scores(results_dict, search_type=search_type)
    
#     end = time.perf_counter() - start
#     print(f'Total Processing Time: {round(end/60, 2)} minutes')
#     record_results(results_dict, chunk_size, dir_outpath=dir_outpath, as_text=True)
    
#     if include_miss_info:
#         return results_dict, miss_info
#     return results_dict

# def calc_hit_rate_scores(results_dict: Dict[str, Union[str, int]], 
#                          search_type: Literal['hybrid', 'all']='all'
#                          ) -> None:
#     '''
#     Helper function to calculate hit rate scores
#     '''
#     search_type = ['kw', 'vector', 'hybrid'] if search_type == 'all' else [search_type]
#     for prefix in search_type:
#         results_dict[f'{prefix}_hit_rate'] = round(results_dict[f'{prefix}_hit_rate']/results_dict['total_questions'],2)

# def calc_mrr_scores(results_dict: Dict[str, Union[str, int]],
#                     search_type: Literal['hybrid', 'all']='all'
#                     ) -> None:
#     '''
#     Helper function to calculate mrr scores
#     '''
#     search_type = ['kw', 'vector', 'hybrid'] if search_type == 'all' else [search_type]
#     for prefix in search_type:
#         results_dict[f'{prefix}_mrr'] = round(results_dict[f'{prefix}_mrr']/results_dict['total_questions'],2)

# def create_dir(dir_path: str) -> None:
#     '''
#     Checks if directory exists, and creates new directory
#     if it does not exist
#     '''
#     if not os.path.exists(dir_path):
#         os.makedirs(dir_path)

# def record_results(results_dict: Dict[str, Union[str, int]], 
#                    chunk_size: int, 
#                    dir_outpath: str='./eval_results',
#                    as_text: bool=False
#                    ) -> None:
#     '''
#     Write results to output file in either txt or json format

#     Args:
#     -----
#     results_dict: Dict[str, Union[str, int]]
#         Dictionary containing results of evaluation
#     chunk_size: int
#         Size of text chunks in tokens
#     dir_outpath: str
#         Path to output directory.  Directory only, filename is hardcoded
#         as part of this function.
#     as_text: bool
#         If True, write results as text file.  If False, write as json file.
#     '''
#     create_dir(dir_outpath)
#     time_marker = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
#     ext = 'txt' if as_text else 'json'
#     path = os.path.join(dir_outpath, f'retrieval_eval_{chunk_size}_{time_marker}.{ext}')
#     if as_text:
#         with open(path, 'a') as f:
#             f.write(f"{results_dict}\n")
#     else: 
#         with open(path, 'w') as f:
#             json.dump(results_dict, f, indent=4)

# def add_params(client: WeaviateClient, 
#                class_name: str, 
#                results_dict: dict, 
#                param_options: dict, 
#                hnsw_config_keys: list[str]
#               ) -> dict:
#     '''
#     Helper function that adds parameters to the results_dict:
#     - Adds HNSW Index parameters to results_dict
#     - Adds optional user-defined parameters to results_dict
#     '''
#     hnsw_params = {k:v for k,v in client.show_class_config(class_name)['vectorIndexConfig'].items() if k in hnsw_config_keys}
#     if hnsw_params:
#         results_dict = {**results_dict, **hnsw_params}
#     if param_options and isinstance(param_options, dict):
#         results_dict = {**results_dict, **param_options}
#     return results_dict

### Load raw data
Load raw data from parquet file.  Raw data should be in the same format as the dataset (corpus) created in [Notebook 1](https://github.com/americanthinker/vectorsearch-applications/blob/main/1-Data_Preprocessing_Week1_COLAB.ipynb). 

In [4]:
data_path = './data/impact_theory_minilm_256.parquet'
data = FileIO().load_parquet(data_path)
len(data)

Shape of data: (26448, 12)
Memory Usage: 2.42+ MB


26448

### Data Length Analysis
Conduct an analysis of the length of the content chunks.  Can use either raw words or tokens to assess length.  The main point here is to get a sense of the mean length of content chunks in the data and to set the `total_chars` param in the `clean_validate_data` method with an appropriate value.

In [5]:
#in this example the mean content length is @ 1,000
lengths = [len(d['content']) for d in data]
df = pd.DataFrame(lengths)
df.describe() 

Unnamed: 0,0
count,26448.0
mean,991.729053
std,126.34487
min,4.0
25%,944.0
50%,1005.0
75%,1060.0
max,1974.0


### Split Data

The `train_val_split` function will clean and validate the raw data as a first step and then split into user defined train/val splits.  
- Cleaning simply strips the keys from the data that are not needed for the query/content generation process
- Validation consists of ensuring that only content chunks of length > `total_chars` are passed to the LLM (this step prevents the LLM from asking questions from sparse context)

Users define the number of training samples and validation samples to generate.  Number of questions per content chunk can also be set to more than 1, however a note of caution:
- Setting `num_questions_per_chunk` > 1 saves time (and money) by asking more than one question per content chunk, however, the dataset will be less diverse.  There is also the potential for the model to generate lower quality questions if the content chunk isn't large enough or meaningful enough to generate more than one question from the content.
- Retrieval evaluation results from fine-tuning an embedding model with 200-300 training samples showed an uptick of 5-10% points.  Upper bound on retrieval improvement as a funtion of training sample size is yet to be determined (have fun pushing the boundaries! 👊)
- A validation data set is not required for seeing improvement from fine tuning.  The addition of a validation dataset, however, allows a user to test the results of fine tuning on an unseen dataset. 

In [6]:
#split data into train/val sets
#in this example we are creating a training set of n=10, val set of n=5, and asking the LLM to only ask 1 question per chunk. 
train, val = generator.train_val_split(data, 10, 5, 1, total_chars=950)

Length Training Data: 10
Length Validation Data: 5


### Generate QA pairs

To generate query/context pairs we need to pass in our cleaned data splits, a question asking generation prompt, and the number of questions per chunk (needs to be same value passed into the `train_val_split` function.
The `qa_generation_prompt` is already preconfigured and supplies the LLM with additional context about the Impact Theory show to ensure high quality questions are asked given the additional context.   
Print out the prompt to see what is being asked of the model:

In [7]:
print(qa_generation_prompt)

The output from this function is a llama_index class `EmbeddingQAFinetuneDataset`, which is a simple wrapper for a series of three dictionaries (`corpus`, `queries`, and `relevant_docs`).  The llama_index class is not absolutely necessary, but it is helpful in making transitions smoother when using the llama_index `SentenceTransformersFinetuneEngine` class for fine-tuning.  It takes roughly 80 seconds to generate 100 query/context pairs so a sample size of 300 takes about 4 minutes (much faster than if you were to do this manually!).

In [12]:
training_set = generator.generate_qa_embedding_pairs(train, qa_generation_prompt, 2)
# val_set = generator.generate_qa_embedding_pairs(val, qa_generation_prompt, 1)

100%|████████████████████████████████████████████████████████████████████| 10/10 [00:13<00:00,  1.30s/it]


In [13]:
#EmbeddingQAFinetuneDataset has no len, so check length of queries instead
len(training_set.queries), len(val_set.queries)

(20, 5)

### Dataset Analysis

Always a good idea to check the quality of the pairs generated.  Most pairs will be high quality but some will not be, this is a chance for human intervention to adjust the questions manually to ensure the quality remains high. 

In [14]:
def show_qa_pairs(data: EmbeddingQAFinetuneDataset, print_results: bool=True):
    pairs = []
    for k, v in data.queries.items():
        doc_id = data.relevant_docs[k][0]
        context = data.corpus[doc_id]
        pairs.append((v, context))
    if print_results:
        for tup in pairs:
            print(f'Question: {tup[0]}\nContext: {tup[1]}\n\n')
    return pairs    

In [17]:
pairs = show_qa_pairs(training_set, print_results=True)

### Save to Disk  
Save to disk using your own filepaths, below is an example using the length of the sets as part of the filepath.

In [28]:
# training_set.save_json('./data/training_data_10.json')
# val_set.save_json('./data/validation_data_5.json')