In [26]:
import os
from typing import List, Dict
from langchain.embeddings import HuggingFaceEmbeddings
# from langchain_huggingface import HuggingFaceEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain_milvus import Milvus
from langchain.document_loaders import TextLoader
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from huggingface_hub import hf_hub_download
from vllm import LLM, SamplingParams
from langchain_community.llms import VLLM
from datasets import Dataset
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_utilization, context_recall, answer_correctness 
from ragas import evaluate
from ragas.run_config import RunConfig
import pandas as pd
from tqdm import tqdm
import subprocess
import pickle

from langchain_core.prompts import ChatPromptTemplate
from langchain_ollama.llms import OllamaLLM

from langchain_community.llms import LlamaCpp
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler

from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from langchain_core.documents import Document
from langchain_core.documents.compressor import BaseDocumentCompressor
from pydantic import BaseModel

import warnings
warnings.filterwarnings("ignore")

2024-09-13 22:45:00.980360: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-13 22:45:02.841911: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [27]:
config = { # The first value in every position is baseline
    'model_name': 'llama3.1-8b-q4',  # llama3.1-8b-q4 / gemma-2-9b-it-simpo-q4 / tlite-q4
    'embed_model_name_short': 'e5l', # e5l (multilingual-e5-large) / bgem3 (bge-m3)
    'chunk_size': 512, # 512/128, 1024/256, 256/64 
    'chunk_overlap': 128,
    'llm_framework': 'VLLM', # VLLM, LLamaCpp, Ollama (Ollama is only for gemma2 model)
    'vectorstore_name': 'MILVUS', # Vector Database MILVUS / FAISS
    'retriever_name': 'vectorstore', # 'vectorstore' / 'ensemble' (BM25 + vertorstore)
    'retriever_k': 4, # 4, if reranker, then retriever_k=30 (reranker returns 4)
    'compressor_name': None, # None / 'cross_encoder_reranker' / 'gluing_chunks' 
    'chain_type': 'stuff', # 'stuff'
}
# If chosen a HybridSearch
ensemble_config = {
    'ensemble_retrievers_names': ['BM25', 'vectorstore'], 
    'ensemble_retrievers_weights': [0.4, 0.6], 
}

llama_config = {
    'repo_id': 'lmstudio-community/Meta-Llama-3.1-8B-Instruct-GGUF',
    'filename': 'Meta-Llama-3.1-8B-Instruct-Q4_K_M.gguf',
    'tokenizer': 'hugging-quants/Meta-Llama-3.1-8B-Instruct-AWQ-INT4'
    }

gemma_config = {
    'repo_id': "mannix/gemma2-9b-simpo",
    'llm_framework': 'Ollama'
    }

tlite_config = {
    'repo_id': 'mradermacher/saiga_tlite_8b-GGUF',
    'filename': 'saiga_tlite_8b.Q4_K_M.gguf',
    'tokenizer': 'IlyaGusev/saiga_tlite_8b'
    }

reranker_config = {
    'reranker_model': "BAAI/bge-reranker-v2-m3",
    'retriever_k': 30
}


def update_config_with_model(config, llama_config, gemma_config, tlite_config):
    """
    Update the configuration based on the selected model.

    Parameters:
    config (dict): The configuration to update.
    llama_config (dict): The configuration for the Llama model.
    gemma_config (dict): The configuration for the Gemma model.
    tlite_config (dict): The configuration for the T-lite model.

    Raises:
    ValueError: If an incorrect model_name is selected.
    """
    if config['model_name'] == 'llama3.1-8b-q4':
        config.update(llama_config)
    elif config['model_name'] == 'gemma-2-9b-it-simpo-q4':
        config.update(gemma_config)
    elif config['model_name'] == 'tlite-q4':
        config.update(tlite_config)
    else:
        ValueError('Incorrect model_name: choose from llama3.1-8b-q4, gemma-2-9b-it-simpo-q4, or tlite-q4')
    
    if config['embed_model_name_short'] == 'e5l':
        config['embedding_model'] = "intfloat/multilingual-e5-large"
    elif config['embed_model_name_short'] == 'bgem3':
        config['embedding_model'] = 'BAAI/bge-m3'
    
    if config['retriever_name'] == 'ensemble':
        config.update(ensemble_config)
    
    if config['compressor_name'] == 'cross_encoder_reranker':
        config.update(reranker_config)


update_config_with_model(config, llama_config, gemma_config, tlite_config)

In [28]:
# внутри используется реранкер и эвристики
class ChunkCompressor(BaseDocumentCompressor):
    """
    A class for compressing documents into chunks.
    This class is a subclass of BaseDocumentCompressor and overrides the compress_documents method.
    """

    chunks: list
    """
    A list of chunks.
    Each chunk is a Document object.
    """

    chunk_overlap: int
    """
    The amount of overlap between chunks.
    """

    def compress_documents(self, inputs, query=None, callbacks=None):
        """
        Compress the given chunks into extended chunks(add a neighboor chunks to answer).

        Parameters:
        inputs (list): The list of chunks to compress.
        query (str): The query to use for compression.
        callbacks (list): The list of callbacks to use for compression.

        Returns:
        list: The list of compressed chunks.
        """
        outputs = []
        for chunk in inputs:
            current_id = chunk.metadata['chunk_index']
            new_chunk = Document(page_content=chunk.page_content, metadata=chunk.metadata)
            
            # Add a left-context chunk
            if current_id > 0:
                left_neighbor = self.chunks[current_id - 1]
                new_chunk.page_content = left_neighbor.page_content[:-self.config['chunk_overlap']] + new_chunk.page_content
            
            # Add a right-context chunk
            if current_id < len(self.chunks) - 1:
                right_neighbor = self.chunks[current_id + 1]
                new_chunk.page_content += right_neighbor.page_content[self.config['chunk_overlap']:]
            # 
            outputs.append(new_chunk)
        
        return outputs

In [29]:
class CustomRAGPipeline:
    """
    A class for a custom RAG (Retrieval Augmented Generation) pipeline.
    This pipeline is designed to process and retrieve information from a given set of documents.
    """
    def __init__(self, 
                 documents_path: str,
                 config: dict,
                 recalc_embedding: bool = False,
                 recalc_chunks: bool = False,
                 ):
        """        
        Initialize the class object with the given documents path and configuration.

        Parameters:
        documents_path (str): The path to the text documents.
        config (dict): The configuration for the pipeline.
        recalc_embedding (bool): Whether to recalculate the embeddings.
        recalc_chunks (bool): Whether to recalculate the chunks.
        """        
        self.config = config
        self.documents_path = documents_path
        self.embedding_model = self.config['embedding_model']
        
        self.vectorstore = None
        self.qa_chain = None

        self.embeddings = HuggingFaceEmbeddings(model_name=self.embedding_model)
        self.chunks = None
        
        self.retriever = None
        self.compressor = None

        self.vectorstore_path = '_'.join([self.config['embed_model_name_short'], 
                                          self.config['vectorstore_name'], 
                                          str(self.config['chunk_size']), 
                                          str(self.config['chunk_overlap'])]
                                        )
        
        self.chunks_path = '_'.join(['chunks', 
                                     str(self.config['chunk_size']), 
                                     str(self.config['chunk_overlap'])]
                                   ) + '.pkl'

        if not recalc_embedding:
            if os.path.exists(self.vectorstore_path) and self.config['vectorstore_name'] == 'FAISS':
                self.vectorstore = FAISS.load_local(self.vectorstore_path, self.embeddings, allow_dangerous_deserialization=True)
            elif os.path.isfile(f"{self.vectorstore_path}.db") and self.config['vectorstore_name'] == 'MILVUS':
                self.vectorstore = Milvus(
                    self.embeddings,
                    connection_args={"uri": f"./{self.vectorstore_path}.db"},
                    collection_name="RAG",
                )
        
        if not recalc_chunks:
            if os.path.exists(self.chunks_path):
                with open(self.chunks_path, "rb") as f:
                    self.chunks = pickle.load(f)

    # Load chosen model
        if self.config['llm_framework'] == 'VLLM':
            self.llm = self.load_vllm_model()
        elif self.config['llm_framework'] == 'LLamaCpp':
            self.llm = self.load_llama_cpp_model()
        elif self.config['llm_framework'] == 'Ollama':
            self.llm = self.load_ollama_model()
            
            
    def load_vllm_model(self):
        """
        Load the vLLM model from HuggingFace Hub.

        Returns:
        vllm_llm (VLLM): The loaded vLLM model.
        """
        repo_id = self.config['repo_id']
        filename = self.config['filename']
        tokenizer = self.config['tokenizer']
        model_path = hf_hub_download(repo_id, filename=filename)
        
        # Initialize vLLM with the downloaded model
        vllm_llm = VLLM(model=model_path,
                        vllm_kwargs={"quantization": "awq", 
                                     'max_model_len': 8192,
                                     'gpu_memory_utilization': 0.75},
                        temperature=0.75,
                        stop=["<|eot_id|>"]
                        )
        
        return vllm_llm


    def load_llama_cpp_model(self):
        """
        Initialize the LlamaCpp model.

        Returns:
        llama_cpp_llm (LlamaCpp): The initialized LlamaCpp model.
        """
        repo_id = self.config['repo_id']
        filename = self.config['filename']
        model_path = hf_hub_download(repo_id, filename=filename)
        
        # Initialize LlamaCpp
        llama_cpp_llm = LlamaCpp(model_path=model_path,
                                temperature=0.8,
                                top_p=0.95,
                                top_k=30,
                                max_tokens=64,
                                n_ctx=8192,
                                n_parts=-1,
                                n_gpu_layers=64,
                                n_threads=8,
                                frequency_penalty=1.1,
                                verbose=True,
                                stop=["<|eot_id|>"],  # Stopping on the EOS token
                                )
        
        return llama_cpp_llm

    
    def load_ollama_model(self):
        """
        Pull the Ollama model from the repository.

        Returns:
        OllamaLLM: The Ollama model.
        """
        model_name = self.config['repo_id']
        
        command = f"ollama pull {model_name}"
        # Try to pull Ollama model
        try:
            subprocess.run(command, shell=True, check=True)
            print(f'Pullled the model {model_name} successfully')
        except subprocess.CalledProcessError as e:
            print(f"Error pulling model {model_name}: {e}")
        
        return OllamaLLM(
            model=model_name,
            temperature=0.8,
            top_p=0.95,
            top_k=30,
            max_tokens=1024,
            stop=["<|eot_id|>"]
        )
    
    def load_and_process_documents(self):
        """
        Load and process the documents.
        If the retriever is an "ensemble", or the retriever is an "BM25", 
        or the compressor is a "gluing_chunks", or the vectorstore is not initialized,
        the documents are split into chunks and added to the vectorstore.
        """
        if (self.config['retriever_name'] == 'ensemble') \
            or (self.config['retriever_name'] == 'BM25') \
            or (self.config['compressor_name'] == 'gluing_chunks') \
            or (not self.vectorstore):

            loader = TextLoader(self.documents_path)
            documents = loader.load()
            
            # Split the documents into chunks
            text_splitter = CharacterTextSplitter(
                        separator=" ",
                        chunk_size=self.config['chunk_size'],
                        chunk_overlap=self.config['chunk_overlap'],
                        length_function=len,
                        is_separator_regex=False,
                    )
            texts = text_splitter.split_documents(documents)

            # Add chuck index to metadata to find neighborous-chunks after retrieving
            for index, document in enumerate(texts):
                document.metadata["chunk_index"] = index
            self.chunks = texts
            
            with open(self.chunks_path, "wb") as f:
                pickle.dump(self.chunks, f)
            
            if not self.vectorstore:
                if self.config['vectorstore_name'] == 'FAISS':
                    # Create a FAISS vector store from the documents
                    self.vectorstore = FAISS.from_documents(texts, self.embeddings)
                    self.vectorstore.save_local(self.vectorstore_path)
                elif self.config['vectorstore_name'] == 'MILVUS':
                    # Create a MILVUS vector store from the documents
                    self.vectorstore = Milvus.from_documents(
                        texts,
                        self.embeddings,
                        collection_name="RAG",
                        connection_args={"uri": f"./{self.vectorstore_path}.db"})
    
    
    def init_retriever(self):   
        """
        Initialize the retriever.
        If the retriever name is 'ensemble', 'BM25', or 'vectorstore', the corresponding retriever is initialized.

        Returns:
        retriever: The initialized retriever.
        """
        if self.config['retriever_name'] == 'ensemble':
            # Build an ensemble retriever based on the config data
            retrievers = []
            for retriever in self.config['ensemble_retrievers_names']:
                self.config['retriever_name'] = retriever
                retrievers.append(self.init_retriever())
                
            self.retriever = EnsembleRetriever(retrievers=retrievers,
                                              weights=self.config['ensemble_retrievers_weights'])
            self.config['retriever_name'] = 'ensemble'
            
        elif self.config['retriever_name'] == 'BM25':
            # Initialize the BM25 retriever with top_k relevance chunks
            self.retriever = BM25Retriever.from_documents(documents=self.chunks)
            self.retriever.k = self.config['retriever_k']

        elif self.config['retriever_name'] == 'vectorstore':
            # Initialize the vectorstore retriever
            self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": self.config['retriever_k']})
        else:
            # Another retriever options aren't supported in this version :(
            ValueError('Incorrect retriever name')
        
        return self.retriever
    
    
    def init_compressor(self):
        """
        Initialize the compressor.
        If the compressor name is 'gluing_chunks', the compressor is initialized as a ChunkCompressor.
        If the compressor name is 'cross_encoder_reranker', the compressor is initialized as a CrossEncoderReranker.
        """
        if self.config['compressor_name'] == 'gluing_chunks':
            # Initialize the common compressor class for gluing
            self.compressor = ChunkCompressor(chunks=self.chunks, chunk_overlap=self.config['chunk_overlap'])
        elif self.config['compressor_name'] == 'cross_encoder_reranker':
            # Initialize the cross-encoder reranker
            model = HuggingFaceCrossEncoder(model_name=self.config['reranker_model'])
            self.compressor = CrossEncoderReranker(model=model, top_n=4)
    
    
    def setup_qa_chain(self, custom_prompt: str = None):
        """
        Set up the QA chain.
        The retriever and compressor are initialized, and the QA chain is set up with the given custom prompt.

        Parameters:
        custom_prompt (str): The custom prompt for the QA chain.
        """
        self.init_retriever()
        self.init_compressor()
        
        
        if self.compressor:
            # Build a retriever-compressor chain if compressor exists             
            compression_retriever = ContextualCompressionRetriever(
                base_compressor=self.compressor, 
                base_retriever=self.retriever
            )
        else:
            # Otherwise put only a retriever on a QA chain
            compression_retriever = self.retriever
                
        prompt_template = PromptTemplate(
            input_variables=["context", "question"],
            template=custom_prompt
        )
        # Build a QA chain with the given prompt and the class attributes
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type=self.config['chain_type'],
            retriever=compression_retriever,
            return_source_documents=True,
            chain_type_kwargs={"prompt": prompt_template}
        )
    
    
    def query(self, question: str) -> Dict:
        """
        Query the pipeline with the given question.

        Parameters:
        question (str): The question to query the pipeline with.

        Returns:
        Dict: The result of the query.
        """
        if not self.qa_chain:
            raise ValueError("QA chain not set up. Call setup_qa_chain() first.")
        
        # Run the QA chain with the provided question
        return self.qa_chain({"query": question})



if __name__ == "__main__":
    # Initialize the pipeline                   
    rag_pipeline = CustomRAGPipeline(documents_path="hmao_npa.txt", config=config)
    
    # Load and process documents
    rag_pipeline.load_and_process_documents()
    

INFO 09-13 22:47:16 config.py:1647] Downcasting torch.float32 to torch.float16.
INFO 09-13 22:47:16 llm_engine.py:213] Initializing an LLM engine (v0.6.0) with config: model='/tmp/xdg_cache/huggingface/hub/models--lmstudio-community--Meta-Llama-3.1-8B-Instruct-GGUF/snapshots/8601e6db71269a2b12255ebdf09ab75becf22cc8/Meta-Llama-3.1-8B-Instruct-Q4_K_M.gguf', speculative_config=None, tokenizer='/tmp/xdg_cache/huggingface/hub/models--lmstudio-community--Meta-Llama-3.1-8B-Instruct-GGUF/snapshots/8601e6db71269a2b12255ebdf09ab75becf22cc8/Meta-Llama-3.1-8B-Instruct-Q4_K_M.gguf', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, rope_scaling=None, rope_theta=None, tokenizer_revision=None, trust_remote_code=False, dtype=torch.float16, max_seq_len=8192, download_dir=None, load_format=LoadFormat.GGUF, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=gguf, enforce_eager=False, kv_cache_dtype=auto, quantization_p

You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message.


INFO 09-13 22:48:11 selector.py:217] Cannot use FlashAttention-2 backend for Volta and Turing GPUs.
INFO 09-13 22:48:11 selector.py:116] Using XFormers backend.
INFO 09-13 22:48:11 model_runner.py:915] Starting to load model /tmp/xdg_cache/huggingface/hub/models--lmstudio-community--Meta-Llama-3.1-8B-Instruct-GGUF/snapshots/8601e6db71269a2b12255ebdf09ab75becf22cc8/Meta-Llama-3.1-8B-Instruct-Q4_K_M.gguf...
INFO 09-13 22:48:26 selector.py:217] Cannot use FlashAttention-2 backend for Volta and Turing GPUs.
INFO 09-13 22:48:26 selector.py:116] Using XFormers backend.
INFO 09-13 22:48:43 model_runner.py:926] Loading model weights took 4.7372 GB
INFO 09-13 22:48:59 gpu_executor.py:122] # GPU blocks: 8388, # CPU blocks: 2048
INFO 09-13 22:49:01 model_runner.py:1217] Capturing the model for CUDA graphs. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI.
INFO 09-13 22:49:01 model_runn

In [31]:
system_prompt = '''Use the following pieces of context to answer the question at the end. 
If you don't know the answer, just s
ay that you don't know, don't try to make up an answer.
Think step by step. Give full answer. Answer only in Russian. If context doesnt match the answer, say that you do not know the answer.
{context}'''
user_prompt = '''Question: {question}
Answer:'''

custom_prompt = f"""
<|begin_of_text|>
<|start_header_id|>system<|end_header_id|>
{system_prompt}
<|eot_id|>
<|start_header_id|>user<|end_header_id|>
{user_prompt}
<|eot_id|>
<|start_header_id|>assistant<|end_header_id|>"""

rag_pipeline.setup_qa_chain(custom_prompt)

if __name__ == "__main__":
    # result = rag_pipeline.query("Какой герб изображен на бланках и штампах Комитета по средствам массовой информации и полиграфии Ханты-Мансийского автономного округа?")
    result = rag_pipeline.query("Что такое должностной оклад и как он рассчитывается?")
    # result = rag_pipeline.query("Какие мероприятия проводит Департамент охраны окружающей среды и экологической безопасности автономного округа в 2010 году?")
    # result = rag_pipeline.query('Когда юридические лица и ИП должны сообщать об аварийных выбросах?')
    print(result['result'])


Processed prompts: 100%|██████████| 1/1 [00:02<00:00,  2.81s/it, est. speed input: 301.68 toks/s, output: 23.18 toks/s]


Должностной оклад — фиксированный размер оплаты труда работника за исполнение трудовых (должностных) обязанностей определенной сложности за календарный месяц без учета компенсационных, стимулирующих и социальных выплат.





In [None]:
def create_ragas_dataset(rag_pipeline, eval_dataset):
    """
    Create dataset for model evaluation with ragas library.

    Parameters:
    rag_pipeline (CustomRAGPipeline): Initialized instance of CustomRAGPipeline class
    eval_dataset (pd.DataFrame): Prepared dataset for metrics calculation

    Returns:
    rag_eval_dataset(pd.DataFrame): The results with model answers and contexts.
    """
    rag_dataset = []
    for index, row in tqdm(eval_dataset.iterrows()):
        answer = rag_pipeline.query(row["question"])
        rag_dataset.append(
            {"question" : row["question"],
             "answer" : answer["result"],
             "contexts" : [context.page_content for context in answer["source_documents"]],
             "ground_truth" : row["ground_truth"]
             }
        )
    rag_df = pd.DataFrame(rag_dataset)
    rag_eval_dataset = Dataset.from_pandas(rag_df)
    return rag_eval_dataset


eval_dataset = pd.read_excel('v2_ragas_npa_dataset_firstPart.xlsx')
eval_dataset = eval_dataset.groupby('evolution_type', group_keys=False).apply(lambda x: x.sample(25, random_state=42)).copy()
eval_df = create_ragas_dataset(rag_pipeline, eval_dataset)

In [None]:
eval_df.save_to_disk('eval_df_baseline_new.hf')