In [1]:
from config.secret_keys import OPENAI_API_KEY, TAVILY_API_KEY, USER_AGENT
import os

os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
os.environ["TAVILY_API_KEY"] = TAVILY_API_KEY
os.environ["USER_AGENT"] = USER_AGENT

---

In [2]:
from finpilot.writer import WriterProcess

In [3]:
writer_process = WriterProcess()



In [4]:
retrieve_node = writer_process.get_retrieve_node()
write_node = writer_process.get_write_node()
filter_documents_node = writer_process.get_filter_documents_node()
transform_query_node = writer_process.get_transform_query_node()
web_search_node = writer_process.get_web_search_node()
decide_write_or_rewrite_query, decide_to_retrieve_or_web_search, decide_to_regenerate_or_rewrite_query_or_end = writer_process.get_conditional_edge_func()

In [5]:
from typing_extensions import TypedDict
from typing import Annotated, List
from langgraph.graph.message import add_messages

class State(TypedDict):
    """
    Represents the state of graph.

    Args:
        question : question that user ask
        generation : LLM generation
        documents : list of documents
    """
    question : str
    generation : str
    messages : Annotated[list, add_messages]
    documents : List[str]

In [6]:
from langgraph.graph import StateGraph, START, END

workflow = StateGraph(State)

# Add Nodes
workflow.add_node("retriever", retrieve_node)
workflow.add_node("filter_documents", filter_documents_node)
workflow.add_node("writer", write_node)
workflow.add_node("transform_query", transform_query_node)
workflow.add_node("web_search", web_search_node)

# connects edges
workflow.add_edge(START, "retriever")
workflow.add_edge("retriever", "filter_documents")
workflow.add_conditional_edges(
    "filter_documents",
    decide_write_or_rewrite_query,
    {
        "writer" : "writer",
        "transform_query" : "transform_query"
    },
)
workflow.add_conditional_edges(
    "transform_query",
    decide_to_retrieve_or_web_search,
    {
        "retriever" : "retriever",
        "web_search" : "web_search"
    }
)
workflow.add_edge("web_search", "writer")
workflow.add_conditional_edges(
    "writer",
    decide_to_regenerate_or_rewrite_query_or_end,
    {
        "not supported" : "writer",
        "useful" : END,
        "not useful" : "transform_query"
    }
)

app = workflow.compile()