In [1]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_nvidia_ai_endpoints import ChatNVIDIA

loader = DirectoryLoader(
    './data/tmp_txt',
    glob="**/*.txt",
    loader_cls=TextLoader  # Explicitly use TextLoader for .txt files
)
docs = []
for doc in loader.load():
    source = doc.metadata['source']
    source = source.split('[')[-1]
    video_id = source.split(']')[0]
    doc.metadata['video_id'] = video_id
    docs.append(doc)

In [2]:
docs[0]

Document(metadata={'source': 'data/tmp_txt/Alan Watts - The Meaning Of Life [EDqqB8kh_Ek].en.txt', 'video_id': 'EDqqB8kh_Ek'}, page_content="it's very commonly said\nthat the root of most human unhappiness\nis the sense that one's life has no\nmeaning\nthis is i suppose most frequently said\nin\ncircles interested in psychotherapy\nbecause the feeling of meaninglessness\nis often equated\nwith\nthe existence of neurosis\nand so many\nactivities into which one is encouraged\nto enter\nphilosophies one is encouraged to\nbelieve and religions one is encouraged\nto join\nare commended on them on the basis of\nthe fact that they give life a meaning\ni think it's very fascinating to think\nout what\nthis idea itself means or what it is\nintended when it's said that\nlife has to have a purpose\n[Music]\nwell now\nit's pretty obvious i think\nthat when we talk about life having or\nnot having a meaning\nwe're not using quite the ordinary sense\nof the word meaning\nas the attribute of a sign\n

In [3]:
docs[0].page_content

"it's very commonly said\nthat the root of most human unhappiness\nis the sense that one's life has no\nmeaning\nthis is i suppose most frequently said\nin\ncircles interested in psychotherapy\nbecause the feeling of meaninglessness\nis often equated\nwith\nthe existence of neurosis\nand so many\nactivities into which one is encouraged\nto enter\nphilosophies one is encouraged to\nbelieve and religions one is encouraged\nto join\nare commended on them on the basis of\nthe fact that they give life a meaning\ni think it's very fascinating to think\nout what\nthis idea itself means or what it is\nintended when it's said that\nlife has to have a purpose\n[Music]\nwell now\nit's pretty obvious i think\nthat when we talk about life having or\nnot having a meaning\nwe're not using quite the ordinary sense\nof the word meaning\nas the attribute of a sign\n[Music]\nwe're not saying are we that we expect\nthis natural universe to behave as if it\nwere a collection of words\nsignifying something 

In [4]:
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500, chunk_overlap=100
)
doc_splits = text_splitter.split_documents(docs)

In [5]:
doc_splits[0]

Document(metadata={'source': 'data/tmp_txt/Alan Watts - The Meaning Of Life [EDqqB8kh_Ek].en.txt', 'video_id': 'EDqqB8kh_Ek'}, page_content="it's very commonly said\nthat the root of most human unhappiness\nis the sense that one's life has no\nmeaning\nthis is i suppose most frequently said\nin\ncircles interested in psychotherapy\nbecause the feeling of meaninglessness\nis often equated\nwith\nthe existence of neurosis\nand so many\nactivities into which one is encouraged\nto enter\nphilosophies one is encouraged to\nbelieve and religions one is encouraged\nto join\nare commended on them on the basis of\nthe fact that they give life a meaning")

In [6]:
doc_splits[0].page_content

"it's very commonly said\nthat the root of most human unhappiness\nis the sense that one's life has no\nmeaning\nthis is i suppose most frequently said\nin\ncircles interested in psychotherapy\nbecause the feeling of meaninglessness\nis often equated\nwith\nthe existence of neurosis\nand so many\nactivities into which one is encouraged\nto enter\nphilosophies one is encouraged to\nbelieve and religions one is encouraged\nto join\nare commended on them on the basis of\nthe fact that they give life a meaning"

In [7]:
from langchain_community.document_loaders import TextLoader
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings

embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
retriever = FAISS.from_documents(doc_splits, embeddings).as_retriever(search_kwargs={"k": 20})

  from .autonotebook import tqdm as notebook_tqdm


In [8]:
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import FAISS
from langchain.retrievers.document_compressors import FlashrankRerank



embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2")
reranker = FlashrankRerank()

bm25_retriever = BM25Retriever.from_documents(doc_splits)
faiss_vectorstore = FAISS.from_documents(doc_splits, embeddings)
faiss_retriever = faiss_vectorstore.as_retriever(search_kwargs={"k": 5})

hybrid_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_retriever], weights=[0.7, 0.3]
)

In [9]:
from langchain_ollama import ChatOllama

llm = ChatOllama(
    model = "llama3.2",
    temperature = 0.8,
    num_predict = 1024,
)

In [10]:
messages = [
    ("system", "You are a helpful translator. Translate the user sentence to French."),
    ("human", "I love programming."),
]
llm.invoke(messages)

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


AIMessage(content='The translation of "I love programming" in French is :\n\nJ\'aime programmer.\n\nOr, in a more natural and idiomatic expression:\n\nJe suis passionné(e) de programmation.\n\nThis second option conveys a stronger sense of enthusiasm and dedication to the subject.', additional_kwargs={}, response_metadata={'model': 'llama3.2', 'created_at': '2025-01-28T00:23:47.712008Z', 'done': True, 'done_reason': 'stop', 'total_duration': 3991686333, 'load_duration': 58542458, 'prompt_eval_count': 42, 'prompt_eval_duration': 2471000000, 'eval_count': 55, 'eval_duration': 1460000000, 'message': Message(role='assistant', content='', images=None, tool_calls=None)}, id='run-88cd57bf-ced0-4423-9cf2-0c0b0a45a618-0', usage_metadata={'input_tokens': 42, 'output_tokens': 55, 'total_tokens': 97})

In [11]:
from pydantic import BaseModel

class Person(BaseModel):
    age: int
    name: str

messages = "Patricia is 25 years old."
llm.with_structured_output(Person).invoke(messages)

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


Person(age=25, name='Patricia')

In [12]:
question = "What is the meaning of life?"

In [14]:
from typing import Literal, Optional, Tuple, List
from langchain_core.pydantic_v1 import BaseModel, Field

class SubQuery(BaseModel):
    """Given a user question, break it down into distinct sub questions that \
    you need to answer in order to answer the original question."""

    questions: List[str] = Field(description="The list of sub questions")

sub_question_generator = llm.with_structured_output(SubQuery)
print(sub_question_generator.invoke(question))

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


questions=['What does it mean to be alive?', 'Is there a purpose to human existence?', 'Can technology help us find answers to these questions?']


In [15]:
# from langchain import hub
# from langchain_core.output_parsers import StrOutputParser

# # Prompt
# prompt = hub.pull("rlm/rag-prompt")

# # Post-processing
# def format_docs(docs):
#     return "\n\n".join(doc.page_content for doc in docs)

# # Chain
# rag_chain = prompt | llm | StrOutputParser()

# # Run
# docs = hybrid_retriever.get_relevant_documents(question)
# generation = rag_chain.invoke({"context": format_docs(docs), "question": question})
# print(generation)

In [16]:
from langchain import hub
from langchain_core.output_parsers import StrOutputParser

# Prompt
prompt = hub.pull("rlm/rag-prompt")

# Post-processing to format documents
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

# Chain
rag_chain = prompt | llm | StrOutputParser()

# Retrieve relevant documents
docs = hybrid_retriever.get_relevant_documents(question)
sources = [doc.metadata.get('video_id', 'No source found') for doc in docs]
unique_sources = list(set(sources))

# Generate answer
generation = rag_chain.invoke({
    "context": format_docs(docs),
    "question": question
})

# Display results
print("Answer:", generation)
print("\nSources:", *unique_sources, sep="\n- ")

  docs = hybrid_retriever.get_relevant_documents(question)
INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


Answer: The text doesn't provide a clear answer to what the meaning of life is, instead emphasizing that it's a question that has puzzled people for centuries and may not have a single definitive answer. It suggests that the meaning of life can be found through personal responsibility, right action, and conduct, rather than seeking external answers or justification. The author argues that humans have been conditioned to believe in grand significance, but that all such meanings are arbitrary.

Sources:
- sctMPouTWiQ
- dNKgdQFbb2k
- NX2ep5fCJZ8
- oRrbZp-iCqk
- mPasHnN8IVo
- y9Trdafp83U
- VallKm2I0bk


In [17]:
from langchain_core.prompts import ChatPromptTemplate

### Retrieval Grader

# Data model
class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )


# LLM with function call

retrieval_grader = llm.with_structured_output(GradeDocuments)

# Prompt
system = """You are a grader assessing relevance of a retrieved document to a user question. \n 
    It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""

grade_prompt = ChatPromptTemplate.from_messages(
    [
     
        ("system", system),
        ("human", "Retrieved document: \n\n {document} \n\n User question: {question}"),
    ]
)

retrieval_grader = grade_prompt | retrieval_grader
docs = hybrid_retriever.get_relevant_documents(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


binary_score='yes'


In [18]:
### Hallucination Grader

# Data model
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""

    binary_score: str = Field(
        description="Answer is grounded in the facts, 'yes' or 'no'"
    )


hallucination_grader = llm.with_structured_output(GradeHallucinations)

# Prompt
system = """You are a grader assessing whether an LLM generation is grounded in / supported by a set of retrieved facts. \n 
     Give a binary score 'yes' or 'no'. 'Yes' means that the answer is grounded in / supported by the set of facts."""
hallucination_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "Set of facts: \n\n {documents} \n\n LLM generation: {generation}"),
    ]
)

hallucination_grader = hallucination_prompt | hallucination_grader
hallucination_grader.invoke({"documents": format_docs(docs), "generation": generation})

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


GradeHallucinations(binary_score='no')

In [19]:
### Answer Grader

# Data model
class GradeAnswer(BaseModel):
    """Binary score to assess answer addresses question."""

    binary_score: str = Field(
        description="Answer addresses the question, 'yes' or 'no'"
    )


generation_grader = llm.with_structured_output(GradeAnswer)

# Prompt
system = """You are a grader assessing whether an answer addresses / resolves a question \n 
     Give a binary score 'yes' or 'no'. Yes' means that the answer resolves the question."""
answer_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        ("human", "User question: \n\n {question} \n\n LLM generation: {generation}"),
    ]
)

answer_grader = answer_prompt | generation_grader
answer_grader.invoke({"question": question, "generation": generation})

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


GradeAnswer(binary_score='no')

In [20]:
### Question Re-writer

# Prompt
system = """You a question re-writer that converts an input question to a better version that is optimized \n 
     for vectorstore retrieval. Look at the input and try to reason about the underlying semantic intent / meaning."""
re_write_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        (
            "human",
            "Here is the initial question: \n\n {question} \n Formulate an improved question.",
        ),
    ]
)

question_rewriter = re_write_prompt | llm | StrOutputParser()
question_rewriter.invoke({"question": question})

INFO:httpx:HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"


'Improved Question:\n\nWhat are the common philosophical, scientific, or spiritual perspectives on the nature and significance of human existence, purpose, and fulfillment?\n\nReasoning:\nThe original question "What is the meaning of life?" is quite broad and open-ended, making it challenging to retrieve relevant information from a vector store. By rephrasing the question, I\'ve tried to:\n\n1. **Make it more specific**: While still exploring the topic of human existence, this revised question focuses on "perspectives" rather than a singular answer.\n2. **Specify the context**: Adding "philosophical, scientific, or spiritual" helps narrow down the scope of possible answers and makes it easier for the vector store to identify relevant information.\n3. **Inquire about related concepts**: By asking about "nature," "significance," "purpose," and "fulfillment," I\'ve attempted to capture a range of potential meanings and aspects of human existence, which can lead to more informative and div

In [21]:
from typing import List

from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    sub_questions:  List[str]
    generation: str
    documents: List[str]

In [22]:
### Nodes

def decompose(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---QUERY DECOMPOSITION ---")
    question = state["question"]

    # Reranking
    sub_queries = sub_question_generator.invoke(question)
    return {"sub_questions": sub_queries.questions, "question": question}

def retrieve(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    sub_questions = state["sub_questions"]
    question = state["question"]

    # Retrieval
    documents = []
    for sub_question in sub_questions:
        docs = hybrid_retriever.get_relevant_documents(sub_question)
        documents.extend(docs)
    return {"documents": documents, "question": question}


def rerank(state):
    """
    Retrieve documents

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RERANK---")
    question = state["question"]
    documents = state["documents"]

    # Reranking
    documents = reranker.compress_documents(query=question, documents=documents)
    return {"documents": documents, "question": question}

In [23]:
def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    unique_sources = list(set([doc.metadata.get('video_id', 'No source found') for doc in documents]))
    unique_urls = [
        f"https://www.youtube.com/watch?v={source}" for source in unique_sources
    ]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation, "sources": unique_urls}


def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates documents key with only filtered relevant documents
    """

    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]

    # Score each doc
    filtered_docs = []
    for d in documents:
        score = retrieval_grader.invoke(
            {"question": question, "document": d.page_content}
        )
        grade = score.binary_score
        if grade == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            continue
    return {"documents": filtered_docs, "question": question}


def transform_query(state):
    """
    Transform the query to produce a better question.

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): Updates question key with a re-phrased question
    """

    print("---TRANSFORM QUERY---")
    question = state["question"]
    documents = state["documents"]

    # Re-write question
    better_question = question_rewriter.invoke({"question": question})
    return {"documents": documents, "question": better_question}

In [24]:
### Edges


def decide_to_generate(state):
    """
    Determines whether to generate an answer, or re-generate a question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Binary decision for next node to call
    """

    print("---ASSESS GRADED DOCUMENTS---")
    state["question"]
    filtered_documents = state["documents"]

    if not filtered_documents:
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print(
            "---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, TRANSFORM QUERY---"
        )
        return "transform_query"
    # We have relevant documents, so generate answer
    print("---DECISION: GENERATE---")
    return "generate"
    
def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.

    Args:
        state (dict): The current graph state

    Returns:
        str: Decision for next node to call
    """

    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]

    score = hallucination_grader.invoke(
        {"documents": documents, "generation": generation}
    )
    grade = score.binary_score

    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question, "generation": generation})
        grade = score.binary_score
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
        return "not useful"
    pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
    return "not supported"

In [25]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("decompose", decompose) #query decompostion
workflow.add_node("retrieve", retrieve)  # retrieve
workflow.add_node("rerank", rerank)  # rerank
workflow.add_node("grade_documents", grade_documents)  # grade documents
workflow.add_node("generate", generate)  # generatae
workflow.add_node("transform_query", transform_query)  # transform_query

# Build graph
workflow.add_edge(START, "decompose")
workflow.add_edge("decompose", "retrieve")
workflow.add_edge("retrieve", "rerank")
workflow.add_edge("rerank", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "transform_query": "transform_query",
        "generate": "generate",
    },
)
workflow.add_edge("transform_query", "retrieve")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "transform_query",
    },
)

# Compile
app = workflow.compile()

In [None]:
from pprint import pprint

# Run
inputs = {"question": question}
for output in app.stream(inputs):
    for key, value in output.items():
        # Node
        pprint(f"Node '{key}':")
        # Optional: print full state at each node
        # pprint.pprint(value["keys"], indent=2, width=80, depth=None)
        print(value)
    pprint("\n---\n")

# Final generation
pprint(value["generation"])
pprint(value["sources"])

---QUERY DECOMPOSITION ---
