In [9]:
import logging
import pandas as pd
import numpy as np
from pathlib import Path
from langchain_chroma import Chroma
from langchain.schema import Document
from sentence_transformers import SentenceTransformer
from dotenv import load_dotenv

class SentenceTransformerEmbeddings:
    def __init__(self, model):
        self.model = model
    
    def embed_documents(self, texts):
        return self.model.encode(texts).tolist()
    
    def embed_query(self, text):
        return self.model.encode(text).tolist()

# Initialize models
bi_encoder = SentenceTransformer('all-MiniLM-L6-v2')

DATA_FILE = './data_oncology.xlsx'
VECTOR_STORE_DIR = './chroma_db_oncology'

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def _remove_duplicates(df: pd.DataFrame, similarity_threshold: float = 0.85) -> pd.DataFrame:
    logger.info("Removing duplicates from dataset")
    print(f"Initial number of entries: {len(df)}")
    
    # 1. Remove exact duplicates
    df = df.drop_duplicates(subset=['Question', 'Answer'], keep='first')
    print(f"After removing exact duplicates: {len(df)}")
    
    # 2. Remove similar questions
    if len(df) > 1:
        questions = df['Question'].tolist()
        question_embeddings = bi_encoder.encode(questions)
        similarity_matrix = np.dot(question_embeddings, question_embeddings.T)
        
        to_drop = set()
        for i in range(len(df)):
            if i in to_drop:
                continue
            for j in range(i + 1, len(df)):
                if j in to_drop:
                    continue
                if similarity_matrix[i, j] > similarity_threshold:
                    if len(df.iloc[i]['Answer']) < len(df.iloc[j]['Answer']):
                        to_drop.add(i)
                    else:
                        to_drop.add(j)
        
        df = df.drop(df.index[list(to_drop)])
        print(f"After removing similar questions: {len(df)}")
    
    return df


def create_vectorstore():
    embeddings = SentenceTransformerEmbeddings(bi_encoder)
    
    try:
        import chromadb
        client = chromadb.PersistentClient(path=str(VECTOR_STORE_DIR))
        client.delete_collection("oncology_qa")
        logger.info("Deleted existing collection")
    except Exception as e:
        logger.info(f"No existing collection to delete: {e}")
    
    vector_store = Chroma(
        collection_name="oncology_qa",
        embedding_function=embeddings,
        persist_directory=str(VECTOR_STORE_DIR)
    )
    
    try:            
        oncology_data = pd.read_excel(DATA_FILE)
        logger.info(f"Loaded {len(oncology_data)} rows from Excel")
    except Exception as e:
        logger.error(f"Error loading Excel file: {e}")
        return None
    
    oncology_data = _remove_duplicates(oncology_data)
    
    documents = []
    for _, row in oncology_data.iterrows():
        content = f"Question: {row['Question']}\nAnswer: {row['Answer']}"
        metadata = {"Question": row['Question'], "Answer": row['Answer']}
        documents.append(Document(page_content=content, metadata=metadata))
    
    vector_store.add_documents(documents=documents)
    logger.info(f"Vector store created with {len(documents)} documents.")
    return vector_store


def main():
    logger.info("Initializing vector store...")
    vector_store = create_vectorstore()
    if vector_store:
        logger.info("Vector store initialized successfully")
        return vector_store
    else:
        logger.error("Failed to initialize vector store")
        return None

if __name__ == "__main__":
    main()

INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:__main__:Initializing vector store...
INFO:__main__:Deleted existing collection
ERROR:__main__:Error loading Excel file: Missing optional dependency 'openpyxl'.  Use pip or conda to install openpyxl.
ERROR:__main__:Failed to initialize vector store


In [2]:
from langchain_chroma import Chroma
from langchain.schema import Document
from sentence_transformers import CrossEncoder
from typing import List, Dict, Any
import logging
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize cross-encoder for re-ranking
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

def format_result(doc: Document, score: float = 1.0) -> Dict[str, Any]:
    """Format a document into a result dictionary."""
    return {
        "question": doc.metadata.get('Question', ''),
        "answer": doc.metadata.get('Answer', ''),
        "score": float(score)
    }

def get_vector_store() -> Chroma:
    """Initialize and return the Chroma vector store."""
    embeddings = SentenceTransformerEmbeddings(bi_encoder)
    return Chroma(
        collection_name="oncology_qa",
        embedding_function=embeddings,
        persist_directory=str(VECTOR_STORE_DIR)
    )

def search_qa(query: str, k: int = 5, use_cross_encoder: bool = False) -> List[Dict[str, Any]]:
    """
    Search the QA knowledge base for relevant answers.
    
    Args:
        query: The search query
        k: Number of results to return
        use_cross_encoder: Whether to use cross-encoder for re-ranking
        
    Returns:
        List of dictionaries containing question, answer, and score
    """
    try:
        logger.info(f"Searching knowledge base for: {query}")
        
        vector_store = get_vector_store()
        fetch_count = k * 3 if use_cross_encoder else k
        initial_results = vector_store.similarity_search(query, k=fetch_count)
        
        if not initial_results:
            return []
            
        if not use_cross_encoder:
            return [format_result(doc, ) for doc in initial_results[:k]]
        
        
        # Re-rank with cross-encoder
        query_doc_pairs = [(query, doc.page_content) for doc in initial_results]
        scores = cross_encoder.predict(query_doc_pairs)
        
        # Combine results with scores and sort
        scored_results = zip(initial_results, scores)
        top_results = sorted(scored_results, key=lambda x: x[1], reverse=True)[:k]
        
        return [format_result(doc, score) for doc, score in top_results]
        
    except Exception as e:
        logger.error(f"Search failed for query '{query}': {str(e)}", exc_info=True)
        return []

INFO:sentence_transformers.cross_encoder.CrossEncoder:Use pytorch device: cpu


In [6]:
query = "How is breast cancer treated?"

results = search_qa(query, k=5, use_cross_encoder=True)

for result in results:
    print(f"Question: {result['question']}\nAnswer: {result['answer']}\nScore: {result['score']}\n")

INFO:__main__:Searching knowledge base for: How is breast cancer treated?
Batches: 100%|██████████| 1/1 [00:00<00:00, 47.53it/s]
