In [None]:
! pip install -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python firecrawl-py transformers torch einops

In [8]:
import os 
from dotenv import load_dotenv

load_dotenv()

LANGCHAIN_TRACING_V2 = os.environ['LANGCHAIN_TRACING_V2'] = 'true'
LANGCHAIN_ENDPOINT = os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
LANGCHAIN_API_KEY = os.environ['LANGCHAIN_API_KEY']
FIRECRAWL_API_KEY = os.environ['FIRECRAWL_API_KEY']

In [7]:
from langchain_community.chat_models import ChatOllama

llm = ChatOllama(model="llama3.1:latest", format="json", temperature=0)

In [42]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import FireCrawlLoader
from langchain.docstore.document import Document

# Load documents
urls = [
    "https://www.tokyotechies.com",
    "https://www.tokyotechies.com/about-us",
    "https://www.tokyotechies.com/solutions/kotae"
]
docs = [FireCrawlLoader(api_key=FIRECRAWL_API_KEY, url=url, mode="scrape").load() for url in urls]

# Flatten the list of documents
docs_list = [item for sublist in docs for item in sublist]

# Split documents
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=250, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)

# Filter out complex metadata
filtered_docs = []
for doc in doc_splits:
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        clean_metadata = {k: v for k, v in doc.metadata.items() if isinstance(v, (str, int, float, bool))}
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))

# Save the filtered_docs to a file or cache for later use
import pickle

with open('filtered_docs.pkl', 'wb') as f:
    pickle.dump(filtered_docs, f)

In [None]:
import pickle 

with open('filtered_docs.pkl', 'rb') as f:
    filtered_docs = pickle.load(f)

print(filtered_docs)

In [16]:
from transformers import AutoModel, AutoTokenizer
import torch
from langchain_community.vectorstores import Chroma

# Load the filtered_docs from the saved file
import pickle

with open('filtered_docs.pkl', 'rb') as f:
    filtered_docs = pickle.load(f)

# Load the embedding model and tokenizer
model_name = "nomic-ai/nomic-embed-text-v1"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name, trust_remote_code=True)

# Function to generate embeddings using the loaded model
def embed_text(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings[0].cpu().numpy().tolist()  # Convert ndarray to list

# Wrapper class to use with Chroma
class CustomEmbedding:
    def embed_documents(self, texts):
        return [embed_text(text) for text in texts]

    def embed_query(self, text):
        return embed_text(text)

# Instantiate the embedding class
custom_embedding = CustomEmbedding()

# Add documents with embeddings to the vectorDB using the embedding class
vectorstore = Chroma.from_documents(
    documents=filtered_docs,
    collection_name="rag-chroma",
    embedding=custom_embedding,  # Use the embedding class instance
)

retriever = vectorstore.as_retriever()



  from .autonotebook import tqdm as notebook_tqdm
  state_dict = loader(resolved_archive_file)
<All keys matched successfully>


In [17]:
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser

# Define the prompt template
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assesing 
    relevance of a retrieved document to a user question. If the document contains keywords related to the user question,
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n 
    Give a binary score 'yes' or 'no' score to indicate whether the docuemnt is relevant to the question. \n 
    Providde the binary score as a JSON with a single key 'score' and no premable or explaination.
    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document* \n\n {document} \n\n
    Here is the user question* {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
""",
    input_variables=["question", "document"]
)

# Chain the prompt, LLM, and output parser together
retrieval_grader = prompt | llm | JsonOutputParser()

# Define the user question
question = "What is Tokyo Techies?"

# Retrieve documents related to the question
docs = retriever.invoke(question)

# Get the content of the second retrieved document
doc_txt = docs[1].page_content  # Use page_content instead of page_context

# Grade the relevance of the document
result = retrieval_grader.invoke({"question": question, "document": doc_txt})

# Print the result
print(result)

{'score': 'yes'}


In [18]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import PromptTemplate


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assitant for question-answering tasks.
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know.
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question}
    Context: {context}
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>
""",
input_variables=["question", "document"]
)

#Post processing
def format_doc(docs):
    return"\n\n".join(doc.page_content for doc in docs)

rag_chain = prompt | llm | StrOutputParser()

#Run

question = "What do you know about kotae?"
docs = retriever.invoke(question)
generation = rag_chain.invoke({"context": docs, "question": question})
print(generation)

{ "Kotae is a chatbot platform that allows small businesses to automate conversations and delight customers. It can be trained using a company's knowledge base, website scrapes, training files, and FAQs." 

  





  





  





  





  





  





  





  





  





  








In [33]:
### Hallucination Grader
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assesing whether
    an answer is grounded in / supported by a set of facts. Give binary scores 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preambel or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n -------- \n
    {documents}
    \n -------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>
""",
input_variables=["generation", "document"]
)

hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader.invoke({"documents": docs, "generation": generation})

{'score': 'yes'}

In [19]:
### Answer grader
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assesing whether
    the answer is useful in resolve a question. Give binary scores 'yes' or 'no' score to indicate 
    whether the answer is use to resolve a question. Provide the binary score as a JSON with a 
    single key 'score' and no preambel or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the answer:
    \n -------- \n
    {generation}
    \n -------- \n
    Here is the question: {question}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>
""",
input_variables=["generation", "question"]
)

answer_grader = prompt | llm | JsonOutputParser()
answer_grader.invoke({"question": question, "generation": generation})

{'score': 'yes'}

In [39]:
from typing_extensions import TypedDict
from typing import List
from langchain.schema import Document

### State

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: The input question for the generation process.
        generation: The LLM-generated answer.
        retry_generation: Flag to indicate if generation needs to be retried.
        documents: List of retrieved documents.
    """
    question: str
    generation: str
    retry_generation: str
    documents: List[Document]

### Nodes

def retrieve(state: GraphState) -> GraphState:
    """
    Retrieve documents from the vectorstore.

    Args:
        state (GraphState): The current graph state.

    Returns:
        GraphState: Updated state with retrieved documents.
    """
    print("---RETRIEVE---")
    question = state["question"]

    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}

def grade_documents(state: GraphState) -> GraphState:
    """
    Determines whether the retrieved documents are relevant to the question.
    If any document is not relevant, set a flag to retry generation.

    Args:
        state (GraphState): The current graph state.

    Returns:
        GraphState: Updated state with filtered documents and retry flag.
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Initialize retry flag
    retry_generation = "no"
    filtered_docs = []

    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        if grade.lower() == "yes":
            print("--GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            retry_generation = "yes"

    return {"documents": filtered_docs, "question": question, "retry_generation": retry_generation}

def generate(state: GraphState) -> GraphState:
    """
    Generate an answer using RAG on retrieved documents.

    Args:
        state (GraphState): The current graph state.

    Returns:
        GraphState: Updated state with LLM generation.
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    
    # Ensure the "generation" key is correctly added to the state
    return {"documents": documents, "question": question, "generation": generation}


def retry_generation_node(state: GraphState) -> GraphState:
    """
    Retry generation based on the question.

    Args:
        state (GraphState): The current graph state.

    Returns:
        GraphState: Updated state with additional document filtering.
    """
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]

    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        if grade.lower() == "yes":
            print("--GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue

    return {"documents": filtered_docs, "question": question}
    
def grade_generation_v_documents_and_question(state: GraphState) -> str:
    """
    Grades the generated answer against the documents and the original question.
    
    Args:
        state (GraphState): The current graph state.
    
    Returns:
        str: Decision string indicating whether the generation is 'useful', 'not useful', or 'not supported'.
    """
    print("---GRADE GENERATION vs DOCUMENTS AND QUESTION---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    # First check if the generation is grounded in the documents
    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    if grade.lower() == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")

        # Now check if the generation addresses the question
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade.lower() == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS---")
        return "not support"
### Conditional Edge
def decide_to_generate(state: GraphState) -> str:
    """
    Determines whether to generate an answer, or add web search.

    Args:
        state (GraphState): The current graph state.

    Returns:
        str: Binary decision for the next node to execute.
    """
    print("---ASSESS GRADED DOCUMENTS---")
    retry_generation = state["retry_generation"]

    if retry_generation == "yes":
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION")
        return "retrygeneration"
    else:
        print("---DECISION: GENERATE---")
        return "generate"
        
def grade_generation_v_documents_and_question(state: GraphState) -> str:
    """
    Grades the generated answer against the documents and the original question.
    
    Args:
        state (GraphState): The current graph state.
    
    Returns:
        str: Decision string indicating whether the generation is 'useful', 'not useful', or 'not supported'.
    """
    print("---GRADE GENERATION vs DOCUMENTS AND QUESTION---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    # First check if the generation is grounded in the documents
    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    if grade.lower() == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")

        # Now check if the generation addresses the question
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade.lower() == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS---")
        return "not support"


def check_hallucination(state: GraphState) -> str:
    """
    Determines whether the generated answer is grounded in the documents.

    Args:
        state (GraphState): The current graph state.

    Returns:
        str: Binary decision for the next node to execute.
    """
    print("---CHECK HALLUCINATION---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']

    if grade.lower() == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")

        # Check if the generation addresses the question
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score["score"]
        if grade.lower() == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"

    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENT, RE-TRY---")
        return "not supported"

from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)

# Define the nodes with unique names
workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("retry_generation_node", retry_generation_node)
workflow.add_node("generate", generate)

In [40]:
# Build the graph
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")

# Add conditional edges based on the decision made in 'decide_to_generate'
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "retrygeneration": "retry_generation_node",  # Ensure this matches the node name
        "generate": "generate",
    },
)

# Adding edge from 'retry_generation_node' to 'generate'
workflow.add_edge("retry_generation_node", "generate")  # Updated to match the node name

# Adding conditional edges after generation
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,  # This function is now defined above
    {
        "not support": "generate",  # Ensure this is a valid return from your grading function
        "useful": END,
        "not useful": "retry_generation_node",  # Updated to match the node name
    },
)


In [57]:
# Compile the workflow
app = workflow.compile()

from pprint import pprint

# Define the inputs
inputs = {"question": "what is kotae"}

final_output = None

# Stream the outputs
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
        #print(value)  # Print the value associated with each key
        final_output = output  # Store the final output
pprint(final_output["generate"].keys())
# Print the final generation result, if it exists
final_output_answer = final_output["generate"]
if "generation" in final_output_answer:
    print(final_output_answer["generation"])
else:
    print("No generation key found in the final output.")

---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
--GRADE: DOCUMENT RELEVANT---
--GRADE: DOCUMENT RELEVANT---
--GRADE: DOCUMENT RELEVANT---
--GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---GRADE GENERATION vs DOCUMENTS AND QUESTION---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
dict_keys(['question', 'generation', 'documents'])
{ "Kotae"  : "A chatbot that automates customer inquiries with responses generated from a company's own data, allowing for effortless setup and customization." }
