In [2]:
import chromadb
import gradio as gr
import os
import time
import urllib.parse
from flashrank import Ranker
from chromadb import ClientAPI
from enum import Enum
from langchain_core.vectorstores import VectorStoreRetriever
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain.retrievers.document_compressors.flashrank_rerank import FlashrankRerank
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.prompts import ChatPromptTemplate
from langchain_chroma import Chroma
from langchain_cohere import CohereRerank
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from dotenv import load_dotenv, find_dotenv

load_dotenv(find_dotenv())

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
DB_PATH = os.getenv('DATABASE_PATH')

  from .autonotebook import tqdm as notebook_tqdm


sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\larsk\AppData\Local\sagemaker\sagemaker\config.yaml


In [3]:
class RetrieverType(Enum):
    DEFAULT = "default"
    DEFAULT_WITH_SCORES = "default_with_scores"
    MULTIQUERY = "multiquery"


class RerankerType(Enum):
    FLASHRANK = "flashrank"
    COHERE = "cohere"


class ChunkingStrategy(Enum):
    """ BASIC, TITLE
    """
    BASIC = "basic"
    BY_TITLE = "by_title"


PROMPT = ChatPromptTemplate([
    ("system", """Du bist ein Assistent einer öffentlichen Behörde und deine Aufgabe ist es, Fragen nur auf Basis des bereitgestellten Kontexts zu beantworten.

- Wenn die Frage anhand des gegebenen Kontexts beantwortet werden kann, beantworte sie unter Einbeziehung relevanter Paragrafen, Gesetze oder Vorschriften, die im Kontext erwähnt werden.
- Wenn die Frage im Kontext nicht eindeutig beantwortet werden kann oder keine ausreichenden Informationen vorliegen, gib an, dass du die Frage nicht beantworten kannst.
- Achte besonders darauf, dass du keine Informationen hinzufügst, die nicht im Kontext enthalten sind.

Am Ende deiner Antwort weise bitte darauf hin, dass du ein ChatBot bist und die Antwort unbedingt von einer qualifizierten Person überprüft werden sollte.

<kontext>
{context}
</kontext>"""),
    ("human", "Frage: {input}")
])


LLM = ChatOpenAI(
    model="gpt-4o-mini", 
    api_key=OPENAI_API_KEY, 
    temperature=0.0
)

EMBEDDING_MODEL_NAME = "text-embedding-ada-002"
EMBEDDINGS = OpenAIEmbeddings(
    model=EMBEDDING_MODEL_NAME, 
    api_key=OPENAI_API_KEY
)

In [4]:
# Vectorstore Basic

chroma_client_basic = chromadb.PersistentClient(
        path=os.path.join(
            DB_PATH, 
            'Unstructured', 
            ChunkingStrategy.BASIC.value, 
            EMBEDDING_MODEL_NAME
        )
    )
collection_name_basic = chroma_client_basic.list_collections()[0].name
print(f"Collection: {collection_name_basic}")

vectorstore_basic = Chroma(
    collection_name=collection_name_basic,
    embedding_function=EMBEDDINGS,
    client=chroma_client_basic,
    create_collection_if_not_exists=False
)

Collection: collection_1500


In [5]:
# Vectorstore By Title

chroma_client_by_title = chromadb.PersistentClient(
        path=os.path.join(
            DB_PATH, 
            'Unstructured', 
            ChunkingStrategy.BY_TITLE.value, 
            EMBEDDING_MODEL_NAME
        )
    )
collection_name_by_title = chroma_client_by_title.list_collections()[0].name
print(f"Collection: {collection_name_by_title}")

vectorstore_by_title = Chroma(
    collection_name=collection_name_by_title,
    embedding_function=EMBEDDINGS,
    client=chroma_client_by_title,
    create_collection_if_not_exists=False
)

Collection: collection_1800


In [6]:
def get_vectorstore(strategy):
    s = ChunkingStrategy(strategy)
    if s == ChunkingStrategy.BASIC:
        return vectorstore_basic
    elif s == ChunkingStrategy.BY_TITLE:
        return vectorstore_by_title


def get_retriever(strategy, retriever_type, n_retriever): 
    t = RetrieverType(retriever_type)
    vectorstore = get_vectorstore(strategy)
    retriever = vectorstore.as_retriever(
        search_type='similarity',
        search_kwargs={
            'k': n_retriever,
        }
    )
    if t == RetrieverType.DEFAULT or t == RetrieverType.DEFAULT_WITH_SCORES:
        return retriever
    elif t == RetrieverType.MULTIQUERY:
        return MultiQueryRetriever.from_llm(
            retriever=retriever,
            llm=LLM,
        )
    

def get_reranker(reranker_type, n_reranker):
    t = RerankerType(reranker_type)
    if t == RerankerType.FLASHRANK:
        client = Ranker(
            model_name='rank-T5-flan',
            max_length=4096,
        )
        return FlashrankRerank(
            client=client,
            top_n=n_reranker,
        )
    elif t == RerankerType.COHERE:
        return CohereRerank(
            top_n=n_reranker,
            model='rerank-multilingual-v3.0',
            cohere_api_key=COHERE_API_KEY,
        )


def generate_answer(query, strategy, retriever_type, n_retriever, reranker_type, n_reranker):
    retriever = get_retriever(strategy, retriever_type, n_retriever)
    reranker = get_reranker(reranker_type[0], n_reranker) if reranker_type else None
    
    if reranker_type:
        retriever = ContextualCompressionRetriever(
            base_retriever=retriever,
            base_compressor=reranker,
        )
    
    rag_chain = create_retrieval_chain(
        retriever=retriever,
        combine_docs_chain=create_stuff_documents_chain(
            llm=LLM,
            prompt=PROMPT,
        )
    )
    
    return rag_chain.invoke({"input": query})

In [7]:
def pretty_output(answer, context):
    return_str = f"<span>{answer}</span><br><br><span>Kontext:</span><br><ul>"
    for doc in context:
        file_path = doc.metadata["source"]
        formatted_path = file_path.replace("\\", "/").replace("Data", "Source")
        encoded_path = urllib.parse.quote(formatted_path)
        file_url = f"file:///{encoded_path}"
        return_str += f"<li><a href='{file_url}' target='_blank'>{os.path.basename(file_path)}</a><span> - </span><span>Seite {doc.metadata['page_number']}</span></li>"
    
    return return_str + "</ul>"


def generate(query, history, strategy, retriever_type, n_retriever, reranker_type, n_reranker):   # by_title default 5 [] 1
    response = generate_answer(query, strategy, retriever_type, n_retriever, reranker_type, n_reranker)  # dict_keys(['input', 'context', 'answer'])
    answer = pretty_output(response['answer'], response['context'])
    for i in range(len(answer)):  
        time.sleep(0.01)
        yield answer[:i+1]


chat = gr.ChatInterface(
    theme="Ryouko-Yamanda65777/ryo",
    fn=generate,
    type="messages",
    additional_inputs=[
        gr.Dropdown([cs.value for cs in ChunkingStrategy], label="Chunking Strategy", value=ChunkingStrategy.BY_TITLE.value),
        gr.Dropdown([rt.value for rt in RetrieverType], label="Retriever Type", value=RetrieverType.DEFAULT.value),
        gr.Slider(minimum=1, maximum=50, step=1, value=1, label="Number of Chunks to retrieve with Retriever"),
        gr.Dropdown([rt.value for rt in RerankerType], label="Reranker Type", multiselect=True, max_choices=1),
        gr.Slider(minimum=1, maximum=10, step=1, value=1, label="Number of Chunks to rerank with Reranker and provide to LLM"),
    ],
    additional_inputs_accordion=gr.Accordion(label="Advanced Options", open=False),
    examples=[
        [
            "Ist es möglich, die Höhe einer russischen Rente umgerechnet in Euro zu erfahren?",
            "by_title",
            "default",
            5,
            [],
            1
        ],
        [
            "Unter welchen Voraussetzungen ist ein Stromguthaben (Haushaltsstrom) als Einkommen im Sinne des § 11 SGB II bedarfsmindernd zu berücksichtigen?",
            "by_title",
            "default",
            5,
            [],
            1
        ],
        [
            "Können Hilfeempfängern im Rahmen der Aufnahme einer sozialversicherungspflichtigen Beschäftigung ggfs. Fahrzeuge zur Verfügung gestellt werden, um den Arbeitsplatz zu erreichen? Fall ja, für welchen Zeitraum ist dies möglich?",
            "by_title",
            "default",
            5,
            [],
            1
        ]
    ]
)

chat.launch()

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.


