In [1]:
import os
from dotenv import load_dotenv
from typing import Annotated, Literal, TypedDict, List, Dict, Any
from typing_extensions import NotRequired

# Load environment variables from .env file
load_dotenv('/home/mohammed/Desktop/tech_projects/growbal/envs/1.env')
openai_api_key = os.getenv('OPENAI_API_KEY')
serp_api_key = os.getenv('SERPER_API_KEY')
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
# Removed SQLite checkpointing for now
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.utilities import SerpAPIWrapper, ArxivAPIWrapper
from langchain_community.document_loaders import WikipediaLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_postgres import PGVector  # or use FAISS if local
# pip: langgraph, langgraph-checkpoint-sqlite, langchain-openai, langchain-community, langchain-postgres

# ---------- State ----------
class ChecklistItem(TypedDict):
    key: str
    prompt: str
    required: bool
    status: Literal["PENDING","ASKED","ANSWERED","VERIFIED","BLOCKED"]
    value: NotRequired[str]

class State(TypedDict):
    messages: Annotated[List[Any], add_messages]
    provider_profile: Dict[str, Any]
    checklist: List[ChecklistItem]
    research_notes: List[Dict[str, Any]]
    vector_store_id: NotRequired[str]
    status: Literal["ON_TRACK","NEEDS_INFO","ESCALATE","FINALIZE_SAVE","RESTART","ABORT"]

# ---------- Models & Tools ----------
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, api_key=openai_api_key).bind_tools([])  # we bind later if desired
serp = SerpAPIWrapper(serpapi_api_key=serp_api_key)  # Initialize with API key
arxiv = ArxivAPIWrapper()
# Wikipedia -> use loader since summaries come as Documents
def wiki_search(query: str, k=3):
    return WikipediaLoader(query=query, load_max_docs=k).load()

def search_web(query: str) -> str:
    """Search the web using SERP API.
    
    Args:
        query: The search query to run
        
    Returns:
        Search results as a string
    """
    return serp.run(query)

def search_arxiv(query: str) -> str:
    """Search academic papers on ArXiv.
    
    Args:
        query: The search query for ArXiv
        
    Returns:
        Relevant paper information as a string
    """
    return arxiv.run(query)

def search_wikipedia(query: str) -> List[str]:
    """Search Wikipedia articles.
    
    Args:
        query: The search query for Wikipedia
        
    Returns:
        List of relevant Wikipedia article contents
    """
    return [d.page_content for d in wiki_search(query)]

tools = {
    "serp": search_web,
    "arxiv": search_arxiv,
    "wikipedia": search_wikipedia
}

tool_node = ToolNode(tools=list(tools.values()))  # Pass the functions as a list

# ---------- Nodes ----------
def intake_and_clarify(state: State):
    """Generate crisp, minimal clarifying Qs based on provider_profile + checklist PENDING items."""
    # produce 1-3 high-signal questions, store as messages & mark items ASKED
    # (omitted: prompt; keep deterministic)
    return {"messages": [{"role":"assistant","content":"...clarifying Qs..."}]}

def research_planner(state: State):
    """Decide which tools to call for which queries."""
    queries = []  # craft from checklist gaps + provider niche
    # Return messages with tool-calls as per LangGraph ToolNode contract (AIMessage w/ tool calls)
    return {"messages":[{"role":"assistant","tool_calls":[{"id":"t1","name":"serp","args":{"q": "best practices for UAE tax"}}, ...]}]}

def parse_and_index(state: State):
    """Chunk, embed, and upsert documents; store collection handle."""
    splitter = RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=150)
    texts = state.get("research_notes", [])
    chunks = [c for t in texts for c in splitter.split_text(t["content"])]
    if chunks:
        vs = PGVector.from_texts(
            texts=chunks,
            embedding=OpenAIEmbeddings(),  # swap for Voyage/Cohere/etc.
            collection_name="provider_"+state["provider_profile"]["id"]
        )
        return {"vector_store_id": vs.collection_name}
    return {}

def summarize_and_update_checklist(state: State):
    """Pull top-k from vector store and produce concise answers; update checklist values/status."""
    # pseudo: docs = vs.as_retriever(k=6).invoke(question)
    # update checklist items -> ANSWERED/VERIFIED as appropriate
    updated = []
    for item in state["checklist"]:
        if item["status"] in ("PENDING","ASKED"):
            # ...set value/status deterministically based on retrieved evidence...
            item = {**item, "status":"ANSWERED"}
        updated.append(item)
    return {"checklist": updated, "messages":[{"role":"assistant","content":"Drafted answers & notes."}]}

def status_router(state: State):
    cl = state["checklist"]
    if any(i["required"] and i["status"] in ("PENDING","ASKED") for i in cl):
        return "AskMore"
    if all((not i["required"]) or i["status"] in ("ANSWERED","VERIFIED") for i in cl):
        return "SaveAndClose"
    # detect conflicts / tool failures (omitted): route to ESCALATE if needed
    return "ResearchPlanner"  # keep looping until done

def ask_more(state: State):
    """Ask only the missing high-signal items, not a form dump."""
    return {"messages":[{"role":"assistant","content":"I still need just these 2 details: ..."}], "status":"NEEDS_INFO"}

def save_and_close(state: State):
    """Persist provider summary & request supporting documents."""
    return {
        "status":"FINALIZE_SAVE",
        "messages":[{"role":"assistant","content":"Saved your profile. Upload any supporting documents now (optional)."}]
    }

# ---------- Graph ----------
graph = StateGraph(State)
graph.add_node("IntakeAndClarify", intake_and_clarify)
graph.add_node("ResearchPlanner", research_planner)
graph.add_node("ToolExec", tool_node)  # executes serp/wiki/arxiv concurrently
graph.add_node("ParseAndIndex", parse_and_index)
graph.add_node("SummarizeAndUpdate", summarize_and_update_checklist)
graph.add_node("AskMore", ask_more)
graph.add_node("SaveAndClose", save_and_close)

graph.add_edge(START, "IntakeAndClarify")
graph.add_edge("IntakeAndClarify", "ResearchPlanner")
graph.add_edge("ResearchPlanner", "ToolExec")
graph.add_edge("ToolExec", "ParseAndIndex")
graph.add_edge("ParseAndIndex", "SummarizeAndUpdate")
graph.add_conditional_edges("SummarizeAndUpdate", status_router, {
    "AskMore": "AskMore",
    "SaveAndClose": "SaveAndClose",
    "ResearchPlanner": "ResearchPlanner",
})
graph.add_edge("AskMore", "ResearchPlanner")
graph.add_edge("SaveAndClose", END)

app = graph.compile()
# invoke with: app.invoke({"messages": [], "provider_profile": {}, "checklist": [], "research_notes": []})


In [None]:
# Visualize the workflow graph
from IPython.display import Image, display

# Ensure pygraphviz is available in this kernel
try:
    import pygraphviz  # noqa: F401
except Exception:
    import sys, subprocess
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "pygraphviz"])  # quiet install
        import pygraphviz  # noqa: F401
    except Exception as install_e:
        print("pygraphviz install failed:", install_e)

# Render the compiled graph (or the uncompiled graph as fallback)
try:
    display(Image(app.get_graph().draw_png()))
except Exception as e:
    try:
        display(Image(graph.get_graph().draw_png()))
    except Exception as inner_e:
        print("Graph visualization failed:", e or inner_e)
