In [79]:
import os
from dotenv import load_dotenv, dotenv_values 

load_dotenv() 


os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = os.getenv('LANGCHAIN_API_KEY')
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv('AZURE_ENDPOINT')
azure_endpoint = os.getenv('AZURE_ENDPOINT')
model_name = os.getenv('MODEL_NAME')
embedding_model = os.getenv('EMBEDDING_MODEL_NAME')
azure_deployment=os.getenv('DEPLOYMENT_NAME')
openai_api_version="2023-05-15",



In [80]:

### LLM

from langchain_openai import AzureChatOpenAI

llm = AzureChatOpenAI(
    azure_deployment=model_name,
    api_version="2024-05-01-preview",
    temperature=0,
)

# old local LLM 
#llm = ChatOllama(model=local_llm, format="json", temperature=0)

In [81]:

### Index

from langchain_community.document_loaders import DirectoryLoader

from langchain_community.document_loaders import PDFPlumberLoader

path = '/Users/fabioangeloni/FIC/Repos/no-work/ai/KnowledgeAi/data/data_generator'
loader = DirectoryLoader(path, glob="**/*.pdf",loader_cls=PDFPlumberLoader, show_progress=True,use_multithreading=True)
docs = loader.load()

100%|██████████| 291/291 [00:14<00:00, 20.11it/s]


In [82]:
#docs_list = [item for sublist in docs for item in sublist]

from langchain_experimental.text_splitter import SemanticChunker

from langchain_community.vectorstores import Chroma

from langchain_openai import AzureOpenAIEmbeddings




text_splitter = SemanticChunker(AzureOpenAIEmbeddings(azure_deployment=azure_deployment, azure_endpoint=azure_endpoint,chunk_size=256))

doc_splits = text_splitter.transform_documents(docs)



In [83]:
# Add to vectorDB
vectorstore = Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chroma",
    embedding=AzureOpenAIEmbeddings(azure_deployment=azure_deployment, azure_endpoint=azure_endpoint,chunk_size=256),
    persist_directory="./new_chroma_db",
)


In [None]:
# load from disk
vectorstore = Chroma(collection_name="rag-chroma",persist_directory="./new_chroma_db",embedding_function=AzureOpenAIEmbeddings(azure_deployment=azure_deployment, azure_endpoint=azure_endpoint,chunk_size=256))


In [84]:
retriever = vectorstore.as_retriever()

In [85]:

### Retrieval Grader used to evaluta the relevance of the retrieved document to the user question

from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"],
)

retrieval_grader = prompt | llm | JsonOutputParser()
#tests
# question = "how can you help me?"
# docs = retriever.invoke(question)
# doc_txt = docs[1].page_content
# print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

In [86]:
### Generate answer

from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are the Teamsystem DOCS assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} 
    Context: {context} 
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)


# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)


# Chain
rag_chain = prompt | llm | StrOutputParser()

# Run
question = "what is Fatture in cloud?"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

Fatture in Cloud is a platform primarily focused on invoicing, including electronic invoicing, which allows users to manage their business comprehensively. It is used daily by over 400,000 professionals and small businesses and is considered a market leader in the micro and small business segment. The platform targets small entrepreneurs, freelancers, and independent professionals. <|eot_id|>


The context mentions that the memory component of an LLM-powered autonomous agent system includes a long-term memory module (external database) that records a comprehensive list of agents' experience in natural language, referred to as "memory stream". This suggests that the agent has some form of memory or recall mechanism.


In [87]:

### Hallucination Grader used to evaluate the generated answer based on the retrived docs from the LLM model and if it's hallucinated or not

# Prompt
prompt = PromptTemplate(
    template=""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})


{'score': 'yes'}

In [88]:

### Answer Grader

# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

In [89]:

### Router used to route the user question to the correct datasource(vectorstore or give error no docs found)

from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an expert at routing a 
    user question to a vectorstore or give the user an error message as no documents have been found. Use the vectorstore for questions on LLM  agents, 
    prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords 
    in the question related to these topics. Otherwise, use docs_not_present. Give a binary choice 'docs_not_present' 
    or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and 
    no premable or explanation. Question to route: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question"],
)

question_router = prompt | llm | JsonOutputParser()
question = "what can i do with onefront and TSID Login?"
docs = retriever.invoke(question)
print(docs)
doc_txt = docs[1].page_content
print(question_router.invoke({"question": question}))

[Document(page_content='Per ottenere i codici (codice ID e codice Secret) accedi a TS Digital. ☛ https://app.teamsystemdigital.com/portale/\nLa maschera di login di TS Digital permette quindi due soluzioni di accesso:\n● il TeamSystem ID, univoco per tutti i software TeamSystem\nIl TeamSystem ID è un’utenza unica che permette la connessione ai vari software TeamSystem. Non è\nquindi necessario ricordare diverse credenziali di accesso per l’utilizzo dei vari software TeamSystem e\npotrai usarla ogni volta che ti sarà richiesta un login TeamSystem. Se non si dispone di un TeamSystem ID,\nè consigliato fare il primo accesso alla piattaforma TS Digital con le proprie credenziali / la propria mail e\nsuccessivamente abbinare il TeamSystem ID. ● L’utenza TS Digital.', metadata={'CreationDate': "D:20231009225220+00'00'", 'Creator': 'Chromium', 'ModDate': "D:20231009225220+00'00'", 'Producer': 'Skia/PDF m117', 'file_path': '/Users/fabioangeloni/FIC/Repos/no-work/ai/KnowledgeAi/data/data_genera

In [90]:

from typing_extensions import TypedDict
from typing import List
from langchain_core.documents import Document

### State


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        has_failed: whether the generation has failed or not
        documents: list of documents
    """

    question: str
    generation: str
    docs_not_present: str
    documents: List[str]
    


### Nodes


def retrieve(state):
    """
    Retrieve documents from vectorstore

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def generate(state):
    """
    Generate answer using RAG on retrieved documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}

def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    docs_not_present = "No"
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score["score"]
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
    if len(filtered_docs) == 0:
        docs_not_present = "Yes"

    return {"documents": filtered_docs, "question": question, "docs_not_present": docs_not_present}

def docs_not_present(state):
    """
    give user asnwer that no documents are present

    Args:
        //TODO DELETE THIS state (dict): The current graph state

    Returns:
        //TODO DELETE THIS state (dict): Appended web results to documents
    """

    print("---QUEST FAILED---")
    # question = state["question"]
    # documents = state["documents"]

    # # Web search
    # docs = web_search_tool.invoke({"query": question})
    # web_results = "\n".join([d["content"] for d in docs])
    # web_results = Document(page_content=web_results)
    # if documents is not None:
    #     documents.append(web_results)
    # else:
    #     documents = [web_results]
    error_message = "No documents found"
    return {"documents": error_message, "question": state["question"], "docs_not_present": "Yes"}



### Conditional edge


def route_question(state):
    """
    Route question to RAG or give error.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})
    print(source)
    print(source["datasource"])
    if source["datasource"] == "vectorstore":
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
    else:
        print("---GIVE ERROR MESSAGE TO USER---")
        return "docs_not_present"


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or give error message to user.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    #question = state["question"]
    docs_not_present = state["docs_not_present"]
    #filtered_documents = state["documents"]

    if docs_not_present == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, RETURN NO DOCS TO USER---"
        )
        return "docs_not_present"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"

def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score["score"]

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"

from langgraph.graph import END, StateGraph

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("docs_not_present_node", docs_not_present)  # docs_not_present
workflow.add_node("retrieve_node", retrieve)  # retrieve
workflow.add_node("grade_documents_node", grade_documents)  # grade documents
workflow.add_node("generate_node", generate)  # generate


Graph Build

In [91]:
# Build graph
workflow.set_conditional_entry_point(
    route_question,
    {
        "docs_not_present": "docs_not_present_node",
        "vectorstore": "retrieve_node",
    },
)

workflow.add_edge("retrieve_node", "grade_documents_node")
workflow.add_conditional_edges(
    "grade_documents_node",# after grade_documents the decide_to_generate is called to decide if we should generate the answer or not
    decide_to_generate,
    {
        "docs_not_present": "docs_not_present_node",
        "generate": "generate_node",
    },
)
workflow.add_edge("docs_not_present_node", "generate_node")
workflow.add_conditional_edges(
    "generate_node",
    grade_generation_v_documents_and_question,
    {
        "not supported": END,
        "useful": END,
        "not useful": END,
    },
)

In [92]:
# Compile
app = workflow.compile()

# Test
from pprint import pprint

inputs = {"question": "what is Fatture in cloud?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])

---ROUTE QUESTION---
cos'è fatture in cloud?
{'datasource': 'docs_not_present'}
docs_not_present
---GIVE ERROR MESSAGE TO USER---
---QUEST FAILED---
'Finished running: docs_not_present_node:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---
'Finished running: generate_node:'
'Non lo so.'
