In [1]:
import os, time
import chromadb

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import TextLoader
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.llms import Ollama
from langchain_core.runnables import RunnablePassthrough, RunnablePick

from langchain import hub

from chromadb.errors import InvalidDimensionException

In [2]:
DATABASE_PATH = '/home/raj/nlp/cmu-rag/rag/chroma/txt/'
# embedding_name = 'llama2'
# persist_directory = DATABASE_PATH + embedding_name
# embedding = OllamaEmbeddings(model=embedding_name)
# vector_store = Chroma(persist_directory=persist_directory, embedding_function=embedding)

EMBEDDING_OPTIONS = ['tinyllama', 'llama2', 'gemma', 'mistral', 'neural-chat', 'openchat']
VECTOR_STORE_DIRECTORIES = [DATABASE_PATH + embedding_name for embedding_name in EMBEDDING_OPTIONS]

QUESTION_CATEGORIES = ['history']
ANNOTATION_DIR = '/home/raj/nlp/cmu-rag/annotation/test/'


In [3]:
def load_vector_store(dir, embedding_name):
    try:
        vector_store = Chroma(persist_directory=dir, embedding_function=OllamaEmbeddings(model=embedding_name))
    except InvalidDimensionException:
        vector_store = Chroma(persist_directory=dir, embedding_function=OllamaEmbeddings(model=embedding_name), force=True)
    return vector_store

def load_vector_store_non_ollama_embedding(dir, embedding_model):
    try:
        vector_store = Chroma(persist_directory=dir, embedding_function=embedding_model)
    except InvalidDimensionException:
        vector_store = Chroma(persist_directory=dir, embedding_function=embedding_model, force=True)
    return vector_store

def create_chain(vector_store, llm_model = 'llama2'):
    rag_prompt_llama = hub.pull("rlm/rag-prompt-llama")
    prompt_message = """You are an assistant for question-answering tasks. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use as few words as possible and keep the answer concise. Do not mention the context in your response.
    Question: {question} 
    Context: {context} 
    Answer:"""

    rag_prompt_llama.messages[0].prompt.template = prompt_message

    llm = Ollama(model = llm_model)

    def format_docs(docs):
        return "\n\n".join(doc.page_content for doc in docs)

    retriever = vector_store.as_retriever()

    qa_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | rag_prompt_llama
        | llm
        | StrOutputParser()
    )

    return qa_chain


In [4]:
def get_questions(category, dir = ANNOTATION_DIR):
    questions = []
    for file in os.listdir(dir + category):
        if file.endswith('questions.txt'):
            with open(dir + category + '/' + file, 'r') as f:
                for line in f.readlines():
                    questions.append(line.strip())
    return questions

def generate_answers(qa_chain, questions):
    if not questions:
        raise ValueError("No questions to answer")
    if not qa_chain:
        raise ValueError("No qa_chain to answer questions")
    answers = []
    for question in questions:
        if not question:
            continue
        answer = dict()
        answer_raw = qa_chain.invoke(question)
        answer["raw"] = answer_raw
        num_lines = answer_raw.count('\n')
        answer["num_lines"] = num_lines
        lines = answer_raw.split('\n')
        if num_lines == 0:
            answer["processed"] = lines[0] if "i don't know" not in lines[0].lower() else "I do not know"
        else:
            answer_lines = []
            for line in lines:
                if "i don't know" not in line.lower():
                    answer_lines.append(line)
            answer["processed"] = " ".join(answer_lines)
        answers.append(answer)

    return answers

def write_answers(answers, file_name):
    with open(file_name, 'w') as f:
        for answer in answers:
            f.write(answer["processed"] + '\n')


In [5]:
for dir in VECTOR_STORE_DIRECTORIES:
    embedding_name = EMBEDDING_OPTIONS[VECTOR_STORE_DIRECTORIES.index(dir)]
    vector_store = load_vector_store(dir, embedding_name)
    chain = create_chain(vector_store, embedding_name)
    for category in QUESTION_CATEGORIES:
        print("Answering questions for category: {} with embedding {}".format(category, embedding_name))
        questions = get_questions(category, dir=ANNOTATION_DIR)
        answers = generate_answers(qa_chain=chain, questions=questions)
        write_to_file = embedding_name + '_answers.txt'
        write_answers(answers, ANNOTATION_DIR + category + '/' + write_to_file)
        print(f"Answers written to {ANNOTATION_DIR + category + '/' + write_to_file}")

Answering questions for category: history with embedding tinyllama


KeyboardInterrupt: 

In [8]:
from langchain_community.embeddings import HuggingFaceBgeEmbeddings

model_name = "BAAI/bge-large-en"
model_kwargs = {"device": "cuda"}
encode_kwargs = {"normalize_embeddings": True}
hf = HuggingFaceBgeEmbeddings(
    model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs
)


import torch

#clear cache on cuda
torch.cuda.empty_cache()

embedding_name = 'bge-large-en'
vector_store = load_vector_store_non_ollama_embedding(dir=DATABASE_PATH+'bge-large-en', embedding_model=hf)
chain = create_chain(vector_store)
questions = get_questions(category, dir=ANNOTATION_DIR)
answers = generate_answers(qa_chain=chain, questions=questions)
write_to_file = embedding_name + '_answers.txt'
write_answers(answers, ANNOTATION_DIR + category + '/' + write_to_file)
