<a href="https://colab.research.google.com/github/adidam/rag-impl/blob/main/rag_visualizer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
!pip install rank_bm25
!pip install datasets
!pip install gradio
!pip install pymilvus pymilvus[model]
!pip install langchain_groq

Collecting langchain_groq
  Downloading langchain_groq-0.2.4-py3-none-any.whl.metadata (3.0 kB)
Collecting groq<1,>=0.4.1 (from langchain_groq)
  Downloading groq-0.18.0-py3-none-any.whl.metadata (14 kB)
Downloading langchain_groq-0.2.4-py3-none-any.whl (14 kB)
Downloading groq-0.18.0-py3-none-any.whl (121 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m121.9/121.9 kB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: groq, langchain_groq
Successfully installed groq-0.18.0 langchain_groq-0.2.4


In [4]:
import numpy as np
import os
import torch
import torch.nn as nn
from torchvision import datasets, transforms, models
from torch.utils.data import random_split
import torch.optim as optim
import matplotlib.pyplot as plt
import nltk

nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

# **Tokenization and other utility functions**

In [None]:
from sentence_transformers import SentenceTransformer
from nltk.tokenize import sent_tokenize
from transformers import AutoTokenizer

# embedding_model = "sentence-transformers/all-MiniLM-L6-v2"
embedding_model = "BAAI/LLM-Embedder"
# embedding_model = "BAAI/bge-large-en"

# Initialize the tokenizer
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
embedder = SentenceTransformer(embedding_model)  # Pretrained sentence transformer

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
# New code - 12/4 10 pm
import hashlib
from rank_bm25 import BM25Okapi
from nltk.tokenize import word_tokenize
import scipy.sparse as sp

# Initialize the tokenizer
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
#tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

# Sliding window configuration
TOKEN_LIMIT = 512
SLIDING_WINDOW_OVERLAP = 100  # Overlap between consecutive chunks (in tokens)

# Function for chunking with token limit and sliding window
def chunk_with_token_limit(text, token_limit=512, overlap=100):
    sentences = sent_tokenize(text)  # Split text into sentences
    chunks = []  # Store resulting chunks
    current_chunk = []  # Temporarily hold sentences for the current chunk
    current_chunk_tokens = 0  # Token count for the current chunk

    for sentence in sentences:
        # Tokenize the sentence and calculate its token count
        sentence_tokens = tokenizer.tokenize(sentence)
        num_tokens = len(sentence_tokens)

        # print(f"Tokens: {sentence_tokens[0]}")

        # If adding this sentence exceeds the token limit
        if current_chunk_tokens + num_tokens > token_limit:
            # Save the current chunk
            chunk_text = " ".join(current_chunk)
            chunks.append(chunk_text)

            # Prepare the next chunk with overlap
            overlap_tokens = tokenizer.tokenize(" ".join(current_chunk[-1:]))
            current_chunk = [sentence for sentence in current_chunk[-(overlap // len(overlap_tokens)) :]] if current_chunk else []
            current_chunk_tokens = sum(len(tokenizer.tokenize(sent)) for sent in current_chunk)

        # Add the sentence to the current chunk
        current_chunk.append(sentence)
        current_chunk_tokens += num_tokens

    # Add the last chunk if it exists
    if current_chunk:
        chunk_text = " ".join(current_chunk)
        chunks.append(chunk_text)

    return chunks

def process_document_with_identifiers(document):
    processed_data = []
    title_count = -1  # to start from 0
    # print("document>>>>>>>",document)
    for section in document:
        section_chunks = []
        passage_count = [ord('a')]  # Passage identifier as a list to handle nested increments
        title_count += 1  # Increment title count

        # Tokenize the section into sentences
        sentences = sent_tokenize(section)
        for sentence in sentences:
            if sentence.startswith("Title:"):
                # New document detected
                identifier = f"{title_count}{''.join(chr(c) for c in passage_count)}"  # Identifier for the title

                # Commented this line to integrate and test with small to big. To be uncommented after testing
                chunked_texts = chunk_with_token_limit(sentence, TOKEN_LIMIT, SLIDING_WINDOW_OVERLAP)

                #Added this line to replace sliding window tokenization with hybrid tokenization (sliding window + small-to-big)
                #chunked_texts = hybrid_chunking(sentence,tokenizer, TOKEN_LIMIT, SLIDING_WINDOW_OVERLAP)

                for chunk in chunked_texts:
                    section_chunks.append([identifier, chunk])
                passage_count = [ord('a')]  # Reset passage count for the new document
            else:
                # Sentence under the current document
                identifier = f"{title_count}{''.join(chr(c) for c in passage_count)}"

                # Commented this line to integrate and test with small to big. To be uncommented after testing
                chunked_texts = chunk_with_token_limit(sentence, TOKEN_LIMIT, SLIDING_WINDOW_OVERLAP)

                #Added this line to replace sliding window tokenization with hybrid tokenization (sliding window + small-to-big)
                #chunked_texts = hybrid_chunking(sentence,tokenizer, TOKEN_LIMIT, SLIDING_WINDOW_OVERLAP)
                #print("chunked_texts>>>>process_document_with_identifiers>>>>> "+ "".join(chunked_texts))

                for chunk in chunked_texts:
                    section_chunks.append([identifier, chunk])

                # Increment passage_count intelligently
                i = len(passage_count) - 1
                while i >= 0:
                    passage_count[i] += 1
                    if passage_count[i] > ord('z'):
                        passage_count[i] = ord('a')
                        if i == 0:
                            passage_count.insert(0, ord('a'))  # Add a new character to the identifier
                        i -= 1
                    else:
                        break


        #print("section_chunks>>>>>>>",section_chunks)
        processed_data.append(section_chunks)

    return processed_data

# Function to generate a hash based on content and key metadata
def generate_hash(content, metadata):
    """Generate a unique hash for the document content and key metadata."""
    key_fields = f"{content}|{metadata.get('item_index')}|{metadata.get('prefix')}"
    return hashlib.md5(key_fields.encode('utf-8')).hexdigest()

# Function to retrieve existing hashes from the database
def get_existing_hashes(collection):
    """Retrieve all existing hashes (IDs) currently in the database."""
    all_records = collection.get(include=["documents", "metadatas"])  # Fetch documents and metadata
    existing_hashes = set()
    for doc, metadata in zip(all_records["documents"], all_records["metadatas"]):
        doc_hash = generate_hash(doc, metadata)
        existing_hashes.add(doc_hash)
    return existing_hashes

# Function to retrieve existing hashes from the database
def get_existing_hashes_milvus(all_records):
    """Retrieve all existing hashes (IDs) currently in the database."""
    existing_hashes = set()
    print(f"all records >>> {len(all_records)}")
    if all_records == None or len(all_records) == 0:
        return existing_hashes

    for record in all_records:
        doc = record.get("documents")
        metadata = record.get("metadata")
        doc_hash = generate_hash(doc, metadata)
        existing_hashes.add(doc_hash)
    return existing_hashes


# Tokenize corpus and prepare BM25 encoder
def prepare_bm25_encoder(texts):
    tokenized_corpus = [word_tokenize(text.lower()) for text in texts]
    bm25_encoder = BM25Okapi(tokenized_corpus)
    return bm25_encoder

def generate_sparse_vector_bm25(query, bm25_encoder):
    tokenized_query = word_tokenize(query.lower())
    scores = bm25_encoder.get_scores(tokenized_query)
    # Convert scores to CSR format
    sparse_vector = sp.csr_matrix(scores)
    return sparse_vector

# **Milvus VectorDataStore class**

In [None]:
import numpy as np
import time
from pymilvus import connections
from pymilvus import FieldSchema, CollectionSchema, DataType, Collection
from pymilvus import MilvusClient
from pymilvus import utility

class VectorDataStore:
    db_url = "http://localhost:19530"

    #description = f"collection created for {self.name}"

    def __init__(self, path="/content/ragbench.db"):
        self.client = MilvusClient(path)


    def get_or_create_collection(self, name, vec_dim=128):
        try:
            self.get_collection(name)
        except:
            print(f"Collection {name} doesn't exist. Creating...")
            self.create_collection(name, vec_dim)


    def create_collection(self, name, vec_dim=128):
        if self.client.has_collection(name):
            self.default_collection_name = name

        self.description = f"collection to store {name}"

        index_params = self.client.prepare_index_params()
        index_params.add_index(
            field_name="embedding",
            index_type="AUTOINDEX",
            metric_type="COSINE"
        )
        index_params.add_index(
            field_name="sparse",
            index_type="SPARSE_INVERTED_INDEX",
            metric_type="IP"
        )
        schema = self.client.create_schema(
            auto_id=False,
            enable_dynamic_fields=True,
        )
        schema.add_field(field_name="pk", datatype=DataType.VARCHAR, max_length=64, is_primary=True)
        schema.add_field(field_name="metadata", datatype=DataType.JSON)
        schema.add_field(field_name="documents", datatype=DataType.VARCHAR, max_length=512)
        schema.add_field(field_name="sparse", datatype=DataType.SPARSE_FLOAT_VECTOR)
        schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=vec_dim)

        collection = self.client.create_collection(collection_name=name,
                                       schema=schema,
                                       index_params=index_params)
        self.current_collection = collection
        return collection


    def get_collection(self, name):
        if not self.client.has_collection(name):
            raise ValueError(f"Collection '{name}' does not exist.")
        self.current_collection = Collection(name)
        return self.current_collection

    def get_all_records(self, collection):
        all_records = self.client.query(
            collection_name=collection,
            filter=None,
            output_fields=["documents", "metadata"],
            limit=10000
        )
        if all_records == None:
            all_records = []

        return all_records

    def has_entities(self, name):
        if not self.client.has_collection(name):
            raise ValueError(f"Collection '{name}' does not exists.")
        self.default_collection = name
        collection_stats = self.client.get_collection_stats(name)
        count = collection_stats.get("row_count", 0)  # Retrieve the number of entities
        return count

    def insert(self, collection_name: str, metadata: list[dict[str, any]],
                documents: list[str], sparse_embs: np.ndarray, embeddings: np.ndarray, ids: list[int]):

        if not self.client.has_collection(collection_name):
            raise ValueError(f"Collection '{collection_name}' does not exist. Create it first.")

        if len(metadata) != len(embeddings) != len(documents) != len(ids):
           raise ValueError("Metadata, documnets, ids and embeddings must have the same length.")

        data = []
        for meta, doc, sp_embs, emb, id in zip(metadata, documents, sparse_embs, embeddings, ids):
          datum = {
              "pk": id,
              "metadata": meta,
              "documents": doc,
              "sparse": sp_embs,
              "embedding": emb.tolist(),
          }
          data.append(datum)

        self.client.insert(collection_name, data)
        print(f"Inserted {len(metadata)} records into collection '{collection_name}'.")

    def drop_collection(self, collection_name: str):
        if not self.client.has_collection(collection_name):
            raise ValueError(f"Collection '{collection_name}' does not exist.")
        self.client.drop_collection(collection_name)
        print(f"Dropped collection '{collection_name}'.")

    def delete_all(self, collection_name: str):
        if not self.client.has_collection(collection_name):
            raise ValueError(f"Collection '{collection_name}' does not exist.")
        self.client.delete(collection_name, expr="pk >= 0")
        self.client.flush([collection_name])

    def search(self, query_embedding: np.ndarray, collection='all', top_k: int = 10) -> list[dict[str, any]]:
        """
        Search across all collections for the top-k closest embeddings.
        :param query_embedding: The embedding vector to search for.
        :param top_k: Number of top results to retrieve.
        :return: A list of dictionaries containing collection name, id, metadata, and distance.
        """
        results = []
        if collection=='all':
          collections = self.client.list_collections()
        else:
          collections = [collection]

        start_time = time.time()
        for collection_name in collections:
            if not self.client.has_collection(collection_name):
                continue

            # Set params to COSINE to match chromadb
            search_params = {"metric_type": "COSINE", "params": {"ef": 128}}

            search_results = self.client.search(
                collection_name=collection_name,
                data=[query_embedding],
                anns_field="embedding",
                search_params=search_params,
                limit=top_k,
                output_fields=["metadata", "documents"]
            )

            print(f"search results size : {len(search_results)}")

            for hits in search_results:
                for hit in hits:
                    print(f"Collection: {collection_name}, data: {str(hit)}")
                    results.append({
                        "collection": collection_name,
                        "id": hit["id"],
                        "metadata": hit["entity"]["metadata"],
                        "distance": hit["distance"],
                        "documents": hit["entity"]["documents"]
                      })

        results = sorted(results, key=lambda x: x["distance"])[:top_k]
        end_time = time.time()
        print(f"Search completed. Found {len(results)} results. in {end_time - start_time} secs")
        return results

    def sparse_search(self, query_embedding: np.ndarray, collection='all', top_k : int=10)-> list[dict[str, any]] :
        results = []
        if (collection=='all'):
          collections = self.client.list_collections()
        else:
          collections = [collection]
        start_time = time.time()
        for collection_name in collections:
            if not self.client.has_collection(collection_name):
                continue

            # Set params to COSINE to match chromadb
            search_params = {"metric_type": "IP", "params": {"ef": 128}}

            search_results = self.client.search(
                collection_name=collection_name,
                data=[query_embedding],
                anns_field="sparse",
                search_params=search_params,
                limit=top_k,
                output_fields=["metadata", "documents"]
            )

            print(f"search results size : {len(search_results)}")

            for hits in search_results:
                for hit in hits:
                    print(f"Collection: {collection_name}, data: {str(hit)}")
                    results.append({
                        "collection": collection_name,
                        "id": hit["id"],
                        "metadata": hit["entity"]["metadata"],
                        "distance": hit["distance"],
                        "documents": hit["entity"]["documents"]
                      })

        results = sorted(results, key=lambda x: x["distance"])[:top_k]
        end_time = time.time()
        print(f"Search completed. Found {len(results)} results. in {end_time - start_time} secs")
        return results

    def hybrid_search(self, sparse_query_embedding: np.ndarray, dense_query_embedding: np.ndarray, collection='all', top_k : int=10, alpha=0.3)-> list[dict[str, any]] :
        results = []
        start_time = time.time()
        sparse_results = self.sparse_search(sparse_query_embedding, collection, top_k)
        n = int(len(sparse_results) * alpha)
        alpha_sparse_results = sparse_results[:n]
        dense_results = self.search(dense_query_embedding, collection, top_k)
        #'results = sorted(results, key=lambda x: x["distance"])[:top_k]
        results = dense_results + alpha_sparse_results
        end_time = time.time()
        print(f"Hybrid Search completed. Found {len(results)} results. in {end_time - start_time} secs")
        return results

    def extract_documents(self, search_results: list[dict[str, any]]) -> list[np.ndarray]:
      """
      Extract embedding values from search results.
      :param search_results: List of dictionaries containing search results.
      :return: List of embedding vectors as NumPy arrays.
      """
      return [result["documents"] for result in search_results if "documents" in result]

In [None]:
datasets = ['covidqa','cuad','delucionqa','emanual','expertqa','finqa','hagrid','hotpotqa','msmarco','pubmedqa','tatqa','techqa']
datastor = VectorDataStore()

insert_data = True
store_client = "Milvus"
num_records = 0

vector_dim = embedder.get_sentence_embedding_dimension()

In [None]:
g_data_index = random.randint(1, 12) - 1
dataset = datasets[g_data_index]
print(f"Selected dataset: {datasets[g_data_index]}")
milvus_collection_name = f"ragbench_collection_{dataset}_v16"
datastor.drop_collection(milvus_collection_name)

data = load_dataset("rungalileo/ragbench", datasets[g_data_index], split="train")
top_5_rows = data.select(range(5))

In [None]:
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import random

# Initialize storage for documents, IDs, and metadata

for i in range(2):
  all_documents = []
  all_ids = []
  all_metadatas = []

  # Process each dataset
  doc_idx = 0  # Global document index for unique IDs
  for dataset in [datasets[i]]:
      milvus_collection_name = f"ragbench_collection_{dataset}_v16"
      try:
        num_records = datastor.has_entities(milvus_collection_name)
      except:
        num_records = 0
        print("we are good. collection doesnt exists")

      if num_records > 0:
          datastor.drop_collection(milvus_collection_name)

      datastor.get_or_create_collection(milvus_collection_name, vector_dim)

      print(f"Processing {dataset}...")
      data = load_dataset("rungalileo/ragbench", dataset, split="train")
      #only select first 5 records for debugging duplicate records. **PLEASE REMOVE THIS AFTER DEBUGGING**
      data = data.select(range(10))
      for idx, row in tqdm(enumerate(data), desc=f"Processing {dataset}"):
          # Extract document text
          print(f"~~~~~~>>>  question: {row.get('question', '')}")
          doc_text = row.get('documents', '')

          # Skip if no documents found
          if not doc_text:
              continue

          # Process the document
          processed_output = process_document_with_identifiers(doc_text)
          added_item_idxs = set()

          # Populate the lists
          for section_idx, section in enumerate(processed_output):
              for item_idx, (prefix, content) in enumerate(section):
                  # Skip if this item_idx has already been processed
                  if item_idx in added_item_idxs:
                      continue

                  # Add the item_idx to the set to track it
                  added_item_idxs.add(item_idx)

                  # Add the document
                  document = f"[{prefix}] {content}"
                  all_documents.append(document)

                  # Construct a globally unique ID
                  doc_id = f"{dataset}_{doc_idx}_{section_idx}_{item_idx}"
                  all_ids.append(doc_id)

                  # Construct metadata
                  metadata = {
                      "dataset": dataset,
                      "global_index": doc_idx,
                      "section_index": section_idx,
                      "item_index": item_idx,
                      "prefix": prefix,
                      "type": "Title" if prefix.endswith("a") else "Passage",
                  }
                  all_metadatas.append(metadata)

          doc_idx += 1  # Increment global document index

  # Step 4: Generate Embeddings
  embedder = SentenceTransformer(embedding_model)  # Pretrained sentence transformer
  batch_size = 2500  # Adjust based on available memory

  # Generate embeddings in batches
  all_embeddings = []
  for i in tqdm(range(0, len(all_documents), batch_size), desc="Generating embeddings"):
      batch_docs = all_documents[i:i + batch_size]
      batch_embeddings = embedder.encode(batch_docs, show_progress_bar=True)
      all_embeddings.extend(batch_embeddings)

  bm25_encoder = prepare_bm25_encoder(all_documents)
  sparse_vectors = [generate_sparse_vector_bm25(text, bm25_encoder) for text in all_documents]

  # Adding data to milvus with enhanced duplicate check
  all_recs = datastor.get_all_records(milvus_collection_name)
  #print(f"sample: {str(all_recs[0])}")
  existing_hashes = get_existing_hashes_milvus(all_recs)

  for i in tqdm(range(0, len(all_documents), batch_size), desc="Adding data to DB"):
      batch_embeddings = all_embeddings[i:i + batch_size]
      batch_sparse_embs = sparse_vectors[i:i + batch_size]
      batch_metadatas = all_metadatas[i:i + batch_size]
      batch_documents = all_documents[i:i + batch_size]
      batch_ids = []

      # Generate hashes for each document in the batch
      for doc, metadata in zip(batch_documents, batch_metadatas):
          doc_hash = generate_hash(doc, metadata)
          if doc_hash not in existing_hashes:
              batch_ids.append(doc_hash)
              existing_hashes.add(doc_hash)  # Add hash to local set to avoid duplicates in the same batch
          else:
              print(f"Skipping duplicate document: {doc[:15]}...")  # Print a preview of the duplicate doc

      # Add non-duplicate documents to the database
      if batch_ids:  # Ensure there are non-duplicate documents to add
          # Add the batch to the Milvus collection
          if store_client == "Milvus" and insert_data:
              datastor.insert(milvus_collection_name,
                  metadata=batch_metadatas,
                  documents=batch_documents,
                  sparse_embs = np.array(batch_sparse_embs),
                  embeddings=np.array(batch_embeddings),
                  ids=batch_ids
              )

  print(f"total data inserted into {milvus_collection_name} iteration {i}")
  print(f"records : {datastor.has_entities(milvus_collection_name)} ")

2025-02-14 04:50:33,959 [ERROR][handler]: RPC error: [has_collection], <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-02-14T04:50:33.958208967+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111))>, <Time:{'RPC start': '2025-02-14 04:47:03.186759', 'RPC error': '2025-02-14 04:50:33.959179'}> (decorators.py:140)


we are good. collection doesnt exists


2025-02-14 04:54:04,732 [ERROR][handler]: RPC error: [has_collection], <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)", grpc_status:14, created_time:"2025-02-14T04:54:04.731336759+00:00"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111))>, <Time:{'RPC start': '2025-02-14 04:50:33.961451', 'RPC error': '2025-02-14 04:54:04.732095'}> (decorators.py:140)


Collection ragbench_collection_covidqa_v16 doesn't exist. Creating...


2025-02-14 04:57:35,506 [ERROR][handler]: RPC error: [has_collection], <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-02-14T04:57:35.505933703+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111))>, <Time:{'RPC start': '2025-02-14 04:54:04.734549', 'RPC error': '2025-02-14 04:57:35.506726'}> (decorators.py:140)


MilvusException: <MilvusException: (code=<bound method _MultiThreadedRendezvous.code of <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-02-14T04:57:35.505933703+00:00", grpc_status:14, grpc_message:"failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111)"}"
>>, message=Retry run out of 75 retry times, message=failed to connect to all addresses; last error: UNKNOWN: unix:/tmp/tmp7c47lyjx_ragbench.db.sock: connect: Connection refused (111))>

#**Retrieval part**

In [None]:
import torch
import pandas as pd
import langchain
from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Function to retrieve relevant chunks
def retrieve_docs_milvus(question, collection_name, top_k=10):
    # Generate embedding for the query
    # Generate the dense embedding for the question
    start_time = time.time()
    dense_query_embedding = embedder.encode(question).tolist()

    # Generate the sparse embedding for the question
    #sparse_query_embedding = generate_sparse_vector_bm25(question, bm25_encoder)

    # Perform vector search to find relevant chunks
    #results = datastor.extract_documents(datastor.search(query_embedding, milvus_collection_name, top_k))
    results = datastor.search(dense_query_embedding, collection_name, top_k)
    #results = datastor.hybrid_search(sparse_query_embedding, dense_query_embedding, milvus_collection_name, top_k)
    print(f"results: retrieve_docs_milvus >>>  {results}")

    # HyDE search with pseudo document
    #pseudo_docs = fetch_docs_pseudo(query)
    #print(f"pseudo_docs: retrieve_docs_milvus >>>  {pseudo_docs}")

    # Extract 'documents' field
    documents_list = [item['documents'] for item in results]
    #documents_list += pseudo_docs

    end_time = time.time()
    print(f"Hybrid Search completed. Found {len(results)} results (approx. 2 * {top_k} + alpha * {top_k}). in {end_time - start_time} secs")
    # Print the extracted documents
    print("retrieve_docs_milvus >>> documents_list from Hybrid + HyDE search >>>>", documents_list)

    # Extract the retrieved chunks
    # chunks = documents_list
    # should sort and push context - but later

    return documents_list

# Load Cross-Encoder model and tokenizer
cross_encoder_model_name = "cross-encoder/ms-marco-MiniLM-L-6-v2"
tokenizer = AutoTokenizer.from_pretrained(cross_encoder_model_name)
model = AutoModelForSequenceClassification.from_pretrained(cross_encoder_model_name)

# Ensure PyTorch is in evaluation mode
model.eval()

def rerank_with_cross_encoder(question):
    """
    Rerank documents based on relevance scores from a Cross-Encoder model.

    Args:
        query (str): The query string.
        documents (list): A list of document strings.

    Returns:
        list: A list of tuples (document, score), sorted by score in descending order.
    """
    query_embedding = embedder.encode(question)
    results = datastor.search(query_embedding, 5)
    documents = datastor.extract_documents(results)

    scores = []
    for doc in documents:
        # Tokenize query-document pair
        inputs = tokenizer(
            query,
            doc,
            return_tensors="pt",
            max_length=512,  # Limit for most transformer models
            truncation=True,
            padding="max_length",
        )
        # Compute relevance scores
        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits

            # Handle binary classification or regression logits
            if logits.size(1) == 2:  # Binary classification
                score = torch.softmax(logits, dim=1)[:, 1].item()  # Probability of relevance (class 1)
            else:  # Regression or single-class output
                score = logits.squeeze().item()  # Direct score (e.g., relevance regression)

            scores.append((doc, score))
    # Sort by score in descending order
    return sorted(scores, key=lambda x: x[1], reverse=True)

def query_response_from_llm(query: str, collection_name, model_name="llama3-8b-8192"):

    # retrieve chunks from milvus db
    chunks = retrieve_docs_milvus(query, collection_name)

    # retrieve chunks from chroma db
    #chunks = retrieve_docs(query)

    # Flatten the list if necessary
    if any(isinstance(chunk, list) for chunk in chunks):
      chunks = [item for sublist in chunks for item in (sublist if isinstance(sublist, list) else [sublist])]

    chat = ChatGroq(temperature=0.3, groq_api_key="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", model_name="llama3-8b-8192")

    #chat = ChatGroq(temperature=0.3, groq_api_key="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", model_name="deepseek-r1-distill-llama-70b")

    prompt=ChatPromptTemplate.from_template(
      """
      Please provide a response to the query below, strictly adhering to the
      information presented in the following documents.
      Do not generate any text beyond what is explicitly stated in the documents.

      Context: {context}

      Question: {query}

      Answer:
      """
    )


    # Summarize the retrieved doc chunks
    # Compress documents using abstractive summarization before appending to context
    chunk_summary = ""

    '''
    print("chunks>>>>",chunks)
    sorted_chunks = sort_documents(query, chunks)
    print(f"sorted_chunks: {sorted_chunks}")
    chunk_summary = summarize_docs(sorted_chunks)
    print("chunk_summary>>>>",chunk_summary)
    chunks = chunk_summary.join(chunks)
    '''

    #Cross encoder reranked docs (comment this for summarization)
    reranked_chunks = rerank_with_cross_encoder(query)
    doc_chunks = [t[0] for t in reranked_chunks]
    print(doc_chunks)

    #monot5 reranked docs
    #reranked_chunks = rerank_with_monot5(question)

    #doc_chunks = [chunk[0] for chunk in reranked_chunks]
    #chunk_summary = summarize_docs(doc_chunks)
    #doc_chunks = chunk_summary

    print("doc_chunks>>after re-ranking>>", doc_chunks)
    chain = prompt | chat

    context = "".join(doc_chunks)

    print("context>>>from 1st RAG>>>>>> ",context)

    groq_response = chain.invoke({"context": context, "query": query})

    print("groq_response>>>from 1st RAG>>>>>> ",groq_response)

    answer = groq_response
    return answer, context

def generate_prompt():
    """
    Generate a prompt template for assessing the support and relevance of an LLM-generated response.
    """
    return """
    I asked someone to answer a question based on one or more documents.
    Your task is to review their response and assess whether or not each sentence
    in that response is supported by text in the documents. And if so, which
    sentences in the documents provide that support. You will also tell me which
    of the documents contain useful information for answering the question, and
    which of the documents the answer was sourced from.
    Here are the documents, each of which is split into sentences.Alongside each
    sentence is associated key, such as ’[0a].’ or ’[0b].’ that you can use to refer
    to it:

    ‘‘‘
    {documents}
    ‘‘‘
    The question was:
    ‘‘‘
    {question}
    ‘‘‘

    Here is their response, split into sentences. Alongside each sentence is
    associated key, such as ’a.’ or ’b.’ that you can use to refer to it. Note
    that these keys are unique to the response, and are not related to the keys
    in the documents:
    ‘‘‘
    {answer}
    ‘‘‘
    You must respond with a JSON object matching this schema:
    ‘‘‘
    {{
    "relevance_explanation": string,
    "all_relevant_sentence_keys": [string],
    "overall_supported_explanation": string,
    "overall_supported": boolean,
    "sentence_support_information": [
    {{
    "response_sentence_key": string,
    "explanation": string,
    "supporting_sentence_keys": [string],
    "fully_supported": boolean
    }},
    ],
    "all_utilized_sentence_keys": [string]
    }}
    ‘‘‘
    The relevance_explanation field is a string explaining which documents
    contain useful information for answering the question. Provide a step-by-step
    breakdown of information provided in the documents and how it is useful for
    answering the question.
    The all_relevant_sentence_keys field is a list of all document sentences keys
    (e.g. ’0a’) that are relevant to the question. Include every sentence that is
    useful and relevant to the question, even if it was not used in the response,
    or if only parts of the sentence are useful. Ignore the provided response when
    making this judgement and base your judgement solely on the provided documents
    and question. Omit sentences that, if removed from the document, would not
    impact someone’s ability to answer the question.
    The overall_supported_explanation field is a string explaining why the response
    *as a whole* is or is not supported by the documents. In this field, provide a
    step-by-step breakdown of the claims made in the response and the support (or
    lack thereof) for those claims in the documents. Begin by assessing each claim
    separately, one by one; don’t make any remarks about the response as a whole
    until you have assessed all the claims in isolation.
    The overall_supported field is a boolean indicating whether the response as a
    whole is supported by the documents. This value should reflect the conclusion
    you drew at the end of your step-by-step breakdown in overall_supported_explanation.
    In the sentence_support_information field, provide information about the support
    *for each sentence* in the response.
    The sentence_support_information field is a list of objects, one for each sentence
    in the response. Each object MUST have the following fields:
    - response_sentence_key: a string identifying the sentence in the response.
    This key is the same as the one used in the response above.

    - explanation: a string explaining why the sentence is or is not supported by the
    documents.
    - supporting_sentence_keys: keys (e.g. ’[0a]’) of sentences from the documents that
    support the response sentence. If the sentence is not supported, this list MUST
    be empty. If the sentence is supported, this list MUST contain one or more keys.
    In special cases where the sentence is supported, but not by any specific sentence,
    you can use the string "supported_without_sentence" to indicate that the sentence
    is generally supported by the documents. Consider cases where the sentence is
    expressing inability to answer the question due to lack of relevant information in
    the provided context as "supported_without_sentence". In cases where the sentence
    is making a general statement (e.g. outlining the steps to produce an answer, or
    summarizing previously stated sentences, or a transition sentence), use the
    string "general". In cases where the sentence is correctly stating a well-known fact,
    like a mathematical formula, use the string "well_known_fact". In cases where the
    sentence is performing numerical reasoning (e.g. addition, multiplication), use
    the string "numerical_reasoning".
    - fully_supported: a boolean indicating whether the sentence is fully supported by
    the documents.
    - This value should reflect the conclusion you drew at the end of your step-by-step
    breakdown in explanation.
    - If supporting_sentence_keys is an empty list, then fully_supported must be false.
    - Otherwise, use fully_supported to clarify whether everything in the response
    sentence is fully supported by the document text indicated in supporting_sentence_keys
    (fully_supported = true), or whether the sentence is only partially or incompletely
    supported by that document text (fully_supported = false).
    The all_utilized_sentence_keys field is a list of all sentences keys (e.g. ’0a’) that
    were used to construct the answer. Include every sentence that either directly supported
    the answer, or was implicitly used to construct the answer, even if it was not used
    in its entirety. Omit sentences that were not used, and could have been removed from
    the documents without affecting the answer.
    You must respond with a valid JSON string. Use escapes for quotes, e.g. \\"\\", and
    newlines, e.g. \\n. Do not write anything before or after the JSON string. Do not
    wrap the JSON string in backticks like ‘‘‘ or ‘‘‘json.
    As a reminder: your task is to review the response and assess which documents contain
    useful information pertaining to the question, and how each sentence in the response
    is supported by the text in the documents.
    """.strip()

def analyze_llm_response(question, context, answer):
  chat = ChatGroq(temperature=0.3, groq_api_key="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", model_name="llama3-70b-8192")
  #chat = ChatGroq(temperature=0.3, groq_api_key="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", model_name="deepseek-r1-distill-llama-70b")

  prompt_template_with_docs = PromptTemplate(
      input_variables=["documents", "question", "answer"],
      template=generate_prompt(),
  )
  groq_response_with_context_qanda = chain.invoke({"documents": contextcontext_for_llm, "question": question, "answer":answer})

In [2]:
import gradio as gr

# Placeholder function to get response from your LLM/RAG pipeline
def get_response(query, model_name, rag_bench):
    response = f"Response from {model_name} for query: {query}"  # Replace with actual model call
    answer, context = query_response_from_llm(query, rag_bench, model_name)
    return answer

# Available LLM models
llm_models = ["Llama-3.2-1B", "GPT-4", "Mistral", "Custom-LLM"]  # Modify as needed
rag_banch_datasets = ['covidqa','cuad','delucionqa','emanual','expertqa','finqa','hagrid','hotpotqa','msmarco','pubmedqa','tatqa','techqa']

# Gradio Interface
with gr.Blocks() as demo:
    gr.Markdown("## LLM/RAG Query Interface")

    with gr.Row():
        query_input = gr.Textbox(label="Enter your query")
        rag_bench_selector = gr.Dropdown(rag_banch_datasets, label="Select RAG Bench Dataset")
        model_selector = gr.Dropdown(llm_models, label="Select LLM Model")

    submit_btn = gr.Button("Submit")
    response_output = gr.Textbox(label="Response", interactive=False)

    submit_btn.click(get_response, inputs=[query_input, model_selector, rag_bench_selector], outputs=response_output)

# Launch the interface
demo.launch(share=True, debug=True)


Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://63bd0a7ebc1af83ff6.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://63bd0a7ebc1af83ff6.gradio.live


