In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
import os

# Get the current research's directory
research_dir = os.path.dirname(os.path.abspath('__file__'))

# Move one directory back
parent_dir = os.path.dirname(research_dir)

# Change the current working directory to the parent directory
os.chdir(parent_dir)

# Print the current working directory to confirm
print(f"Current working directory: {os.getcwd()}")

Current working directory: /home/eiingeeel/projects/frankenst-ai


In [4]:
from services.llm import LLMServices

# Inicializa lo necesario (leer YAML, configurar modelos, etc.)
LLMServices.launch()

# Accede directamente al modelo configurado
model = LLMServices.model
embeddings = LLMServices.embeddings


In [6]:
from langchain_chroma import Chroma

# Define Vector DB
vectorstore = Chroma(
    collection_name="pokemon_series",
    embedding_function=embeddings,
    persist_directory="./.chroma_db",  # Where to save data locally, remove if not necessary
)

In [None]:
from frank.utils.rag.unstructured import MultiVectorDocumentIndexing

# Index phase
indexing = MultiVectorDocumentIndexing(llm=model, llm_multimodal=model, vectorstore=vectorstore)

indexing.load_pdf('artifacts/rag_docs/EP003 - Ash Catches a Pokémon.pdf')
indexing.split_pdf()
indexing.summarize_elements()
indexing.embed_store_documents()
retriever = indexing.get_retriever()

In [None]:
indexing.summaries

In [None]:
docs = retriever.invoke("In the third chapter of the Pokémon series, what is the name of the Pokémon that was flying in the daytime sky and subsequently caught?")
docs

[]

In [None]:
from frank.utils.rag.processing import parse_docs, show_base64_image

docs_dict = parse_docs(docs)
show_base64_image(docs_dict['images'][0])

In [None]:
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


def build_prompt(kwargs):
    docs_by_type = kwargs["context"]
    user_question = kwargs["question"]

    context_text = ""
    if len(docs_by_type["texts"]) > 0:
        for text_element in docs_by_type["texts"]:
            context_text += text_element.text

    # construct prompt with context (including images)
    prompt_template = f"""
    Answer the question based only on the following context, which can include text, tables, and the below image.
    Context: {context_text}
    Question: {user_question}
    """

    prompt_content = [{"type": "text", "text": prompt_template}]

    if len(docs_by_type["images"]) > 0:
        for image in docs_by_type["images"]:
            prompt_content.append(
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
                }
            )

    return ChatPromptTemplate.from_messages(
        [
            HumanMessage(content=prompt_content),
        ]
    )


chain = (
    {
        "context": retriever | RunnableLambda(parse_docs),
        "question": RunnablePassthrough(),
    }
    | RunnableLambda(build_prompt)
    | model
    | StrOutputParser()
)

chain_with_sources = {
    "context": retriever | RunnableLambda(parse_docs),
    "question": RunnablePassthrough(),
} | RunnablePassthrough().assign(
    response=(
        RunnableLambda(build_prompt)
        | model
        | StrOutputParser()
    )
)

In [None]:
response = chain.invoke(
    "In the third chapter of the Pokémon series, what is the name of the Pokémon that was flying in the daytime sky and subsequently caught?"
)
print(response)

# CUSTOM AGENTIC-ADAPTATIVE RAG GRAPH IMPLEMENTATION


- Self-RAG: https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_self_rag_pinecone_movies.ipynb https://langchain-ai.github.io/langgraph/tutorials/rag/langgraph_self_rag/ 

¿Tiene sentido en LangGrapg el siguiente Workflow? ¿En qué casos tendría sentido esta arquitectura? ¿Qué ventajas puede tener entre hacer solo un Rag Agentic o un RAG Adaptativo?

Una mezcla de RAG Agentic y Adaptativo. Es decir. Un nodo 'AgenticRAG'. Que decide contestar o ir a un ToolNode con function calling. Resulta que este ToolNode tiene dos tools que son dos retriever distintos. Uno un MultiVectorRetriever y otro un WebSearch.

Después con un conditional edge GradeDocuments evaluaría el contexto recuperado para saber si es útil o no para generar la respuesta. Si no lo es. Va a un nodo rewrite question. En caso de que lo sea. Va al nodo Answer.


El StateGraph tendría esta pinta:

class GraphState(TypedDict):
    query: str
    retrieved_docs: list[Document] = []
    answer: str = ""
    interations: int = 0


# LangRunnable el retriever.

In [None]:
from langchain.tools.retriever import create_retriever_tool

retriever_tool = create_retriever_tool(
    retriever,
    "multivector_retriever_pokemon",
    "Search and return information about Pokémon series.",
)

from langgraph.graph import MessagesState

response_model = model


def generate_query_or_respond(state: MessagesState):
    """Call the model to generate a response based on the current state. Given
    the question, it will decide to retrieve using the retriever tool, or simply respond to the user.
    """
    response = (
        response_model
        .bind_tools([retriever_tool]).invoke(state["messages"])
    )
    return {"messages": [response]}


from pydantic import BaseModel, Field
from typing import Literal

GRADE_PROMPT = (
    "You are a grader assessing relevance of a retrieved document to a user question. \n "
    "Here is the retrieved document: \n\n {context} \n\n"
    "Here is the user question: {question} \n"
    "If the document contains keyword(s) or semantic meaning related to the user question, grade it as relevant. \n"
    "Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."
)

#To structured output
class GradeDocuments(BaseModel):
    """Grade documents using a binary score for relevance check."""

    binary_score: str = Field(
        description="Relevance score: 'yes' if relevant, or 'no' if not relevant"
    )


# grader_model = model
grader_model = model


def grade_documents(
    state: MessagesState,
) -> Literal["generate_answer", "rewrite_question"]:
    """Determine whether the retrieved documents are relevant to the question."""
    question = state["messages"][0].content
    context = state["messages"][-1].content

    prompt = GRADE_PROMPT.format(question=question, context=context)
    response = (
        grader_model
        .with_structured_output(GradeDocuments, method="json_schema").invoke(
            [{"role": "user", "content": prompt}]
        )
    )
    score = 'yes' # response.binary_score

    if score == "yes":
        return "generate_answer"
    else:
        return "rewrite_question"
    
REWRITE_PROMPT = (
    "Look at the input and try to reason about the underlying semantic intent / meaning.\n"
    "Here is the initial question:"
    "\n ------- \n"
    "{question}"
    "\n ------- \n"
    "Formulate an improved question:"
)


def rewrite_question(state: MessagesState):
    """Rewrite the original user question."""
    messages = state["messages"]
    question = messages[0].content
    prompt = REWRITE_PROMPT.format(question=question)
    response = response_model.invoke([{"role": "user", "content": prompt}])
    return {"messages": [{"role": "user", "content": response.content}]}

GENERATE_PROMPT = (
    "You are an assistant for question-answering tasks. "
    "Use the following pieces of retrieved context to answer the question. "
    "If you don't know the answer, just say that you don't know. "
    "Use three sentences maximum and keep the answer concise.\n"
    "Question: {question} \n"
    "Context: {context}"
)


def generate_answer(state: MessagesState):
    """Generate an answer."""
    question = state["messages"][0].content
    context = state["messages"][-1].content
    prompt = GENERATE_PROMPT.format(question=question, context=context)
    response = response_model.invoke([{"role": "user", "content": prompt}])
    return {"messages": [response]}


from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition

workflow = StateGraph(MessagesState)

# Define the nodes we will cycle between
workflow.add_node(generate_query_or_respond)
workflow.add_node("retrieve", ToolNode([retriever_tool]))
workflow.add_node(rewrite_question)
workflow.add_node(generate_answer)

workflow.add_edge(START, "generate_query_or_respond")

# Decide whether to retrieve
workflow.add_conditional_edges(
    "generate_query_or_respond",
    # Assess LLM decision (call `retriever_tool` tool or respond to the user)
    tools_condition,
    {
        # Translate the condition outputs to nodes in our graph
        "tools": "retrieve",
        END: END,
    },
)

# Edges taken after the `action` node is called.
workflow.add_conditional_edges(
    "retrieve",
    # Assess agent decision
    grade_documents,
)
workflow.add_edge("generate_answer", END)
workflow.add_edge("rewrite_question", "generate_query_or_respond")

# Compile
graph = workflow.compile()

from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
user_input = "Who is sleeping while Caterpy and Pikachu get to know each other?"
message_input = {"messages": [{"role": "human", "content": user_input}]}

for chunk in graph.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": "In the charpter 3 of the pokemon series, which is the pokemon that was flying in the day sky and then capture?",
            }
        ]
    }
):
    for node, update in chunk.items():
        print("Update from node", node)
        update["messages"][-1].pretty_print()
        print("\n\n")

In [None]:
user_input = "Who is sleeping while Caterpy and Pikachu get to know each other?"
message_input = {"messages": [{"role": "human", "content": user_input}]}
graph.invoke(message_input)

In [None]:
retriever.invoke("Who is sleeping while Caterpy and Pikachu get to know each other?")

# TEST MULTIVECTOR RETRIEVER AS A TOOL

# TEST STRUCTURED OUTPUT

In [None]:
from langchain.output_parsers import PydanticOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field

# 1. Definir el modelo estructurado
class Persona(BaseModel):
    nombre: str = Field(description="El nombre completo de la persona")
    edad: int = Field(description="Edad de la persona")
    ocupacion: str = Field(description="Trabajo u ocupación actual")

# 2. Crear el parser
parser = PydanticOutputParser(pydantic_object=Persona)

# 3. Crear el prompt con instrucciones de formato
system_prompt = (
    "Eres un extractor de datos personales."
    "siguiendo estas instrucciones:\n{format_instructions}"
)

prompt_template = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    MessagesPlaceholder(variable_name="messages"),
])

# 4. Conectar con Azure OpenAI (asegúrate de configurar bien este modelo)
llm = model

# 5. Encadenar todo
chain = prompt_template.partial(format_instructions=parser.get_format_instructions()) | llm | parser

# 6. Ejecutar con mensaje de entrada
messages = [{"role": "user", "content": "Hola, me llamo Ana Martínez, tengo 34 años y trabajo como ingeniera de software en México."}]


# 7. Invocar y mostrar salida
output = chain.invoke({"messages": messages})
print(output.model_dump())


In [None]:
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable

# Definir el modelo estructurado
class Persona(BaseModel):
    nombre: str = Field(description="El nombre completo de la persona")
    edad: int = Field(description="Edad de la persona")
    ocupacion: str = Field(description="Trabajo u ocupación actual")


# Crear el prompt con instrucciones de formato
system_prompt = (
    "Eres un extractor de datos personales."
)

prompt_template = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    MessagesPlaceholder(variable_name="messages"),
])

# Crear el modelo con salida estructurada
structured_llm: Runnable = model.with_structured_output(Persona, method="json_schema")

# Crear el pipeline completo
chain = prompt_template | structured_llm

# Ejemplo de uso
messages = [{"role": "user", "content": "Hola, me llamo Ana Martínez, tengo 34 años y trabajo como ingeniera de software en México."}]
output = chain.invoke({"messages": messages})

print(output.model_dump())
