In [12]:
import sys
from datetime import datetime

if ".." not in sys.path:
    sys.path.insert(0, "..")

from src.settings import get_settings
from src.utils.logger import create_logger

settings = get_settings()
logger = create_logger(path=settings.paths.logs_dir)
logger.debug(f"settings loaded as \n{settings.model_dump_json(indent=2)}")

[37m[2m10:36 AM[0m[37m[0m | [37m[2m__main__[0m[37m[0m | [37m[22mDEBUG    [0m | [37m[22msettings loaded as 
{
  "models": {
    "hf": {
      "chat": "EssentialAI/rnj-1-instruct:together",
      "embedding_snowflake": "Snowflake/snowflake-arctic-embed-l-v2.0",
      "embedding_specter": "allenai/specter2_base",
      "encoder": "m3rg-iitd/matscibert",
      "reranker": "Qwen/Qwen3-Reranker-0.6B",
      "router": "openai/gpt-oss-20b:together"
    },
    "nebius": {
      "reasoning": "zai-org/GLM-4.5-Air",
      "tool_user": "Qwen/Qwen3-30B-A3B-Instruct-2507",
      "chat": "meta-llama/Meta-Llama-3.1-8B-Instruct-fast",
      "embedding_baai_bge": "BAAI/bge-multilingual-gemma2",
      "router": "openai/gpt-oss-20b"
    }
  },
  "paths": {
    "base_dir": "/home/rudy/code/lattice",
    "data_dir": "/home/rudy/code/lattice/data",
    "logs_dir": "/home/rudy/code/lattice/logs",
    "agents_dir": "/home/rudy/code/lattice/agents",
    "skills_dir": "/home/rudy/code/lattice/skill

In [8]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    HumanMessage,
    SystemMessage,
)

from langgraph.graph import StateGraph, END, START
from langgraph.graph.message import add_messages
from typing import TypedDict, List, Literal, Tuple, Annotated, Dict, Optional

from pydantic import BaseModel, Field
from pydantic.json_schema import SkipJsonSchema


In [None]:
from src.prompts.system import PRIMER_SYSTEM_PROMPT

version = "0.0.1-alpha"


In [9]:
llm = ChatOpenAI(
    model=settings.models.hf.router,
    base_url=settings.env.HF_API_ENDPOINT,
    api_key=settings.env.HF_API_KEY,
)

In [None]:
class ResearchSubQuestion(BaseModel):
    subquestion_text: str = Field(
        description="A secondary, granular question that breaks down the main research question into actionable parts."
    )


class ResearchQuestion(BaseModel):
    question_text: str = Field(
        description="A high-level, focused research question (RQ) addressing a specific physical phenomenon or material property."
    )
    research_subquestions: Optional[List[ResearchSubQuestion]] = Field(
        default=None,
        description="A list of 2-3 granular sub-questions that help in answering the primary RQ.",
    )


class ResearchPlan(BaseModel):
    created_at: SkipJsonSchema[datetime] = Field(
        default_factory=datetime.now,
    )
    summary: str = Field(
        description="A concise summary of the research goals. Max 4-5 sentences focusing on the 'what', 'why' and 'how'."
    )
    keywords: Optional[List[str]] = Field(
        default=None,
        description="List of relevant scientific terms, experimental methods etc.",
    )
    research_questions: List[ResearchQuestion] = Field(
        description="A set of 3-5 primary research questions that form the backbone of the literature survey."
    )
    is_approved_by_user: bool = Field(
        default=False,
        description="Set to True if the research_plan is complete and approved by the user explicitly. Always set to False if the research_plan is being created for the first time.",
    )

    def to_markdown(self) -> str:
        """Converts the ResearchPlan object into a structured Markdown document."""
        timestamp = self.created_at.strftime("%Y-%m-%d %H:%M:%S")
        md = f"# RESEARCH PLAN\n {timestamp} by Lattice.\n"

        # Summary Section
        md += f"## Summary\n{self.summary}\n\n"

        # Keywords Section
        if self.keywords:
            md += "## Keywords\n"
            md += ", ".join([f"`{k}`" for k in self.keywords]) + "\n\n"

        # Research Questions Section
        md += "## Research Questions\n"
        for i, rq in enumerate(self.research_questions, 1):
            md += f"### RQ{i}: {rq.question_text}\n"

            if rq.research_subquestions:
                for j, sub in enumerate(rq.research_subquestions, 1):
                    md += f"  - **Sub-{j}**: {sub.subquestion_text}\n"
            md += "\n"
        return md


class PlannerResponse(BaseModel):
    message: str = Field(
        description="The response or message from the agent. The response or message MUST NOT be the research_plan."
    )
    research_plan: Optional[ResearchPlan] = Field(
        "This will contain ONLY the research_plan created by the agent. This is to be created when the agent is sure of the plan to be made and has asked all relevant clarifications to the user."
    )

In [None]:
class ResearchPlanValidationRoute(BaseModel):
    step: bool = Field(
        description="Defines whether the research_plan exists or not.",
        default=False,
    )
    is_approved_by_user: bool = Field(
        description="Defines whether the research_plan is approved by the user or not.",
        default=False,
    )

In [None]:
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    research_plan: Optional[ResearchPlan]
    is_plan_approved_by_user: bool


class RouterState(TypedDict):
    deicision: str

In [None]:
def chat_node(state: AgentState) -> AgentState | List | Dict:
    structured_llm = llm.with_structured_output(PlannerResponse, include_raw=True)
    SYSTEM_PROMPT = PRIMER_SYSTEM_PROMPT.format(
        date=datetime.datetime.now().strftime("%B %Y"), version="0.0.1-alpha"
    )
    response = structured_llm.invoke([SystemMessage(SYSTEM_PROMPT)] + state["messages"])
    raw_message = response["raw"]
    parsed_plan = response["parsed"]
    # return an update to the state
    update = {"messages": raw_message, "is_plan_approved_by_user": False}
    if parsed_plan and parsed_plan.research_plan:
        update["research_plan"] = research_plan

In [None]:
def router_node(state: RouterState) -> RouterState | List | Dict:
    router = llm.with_structured_output(ResearchPlanValidationRoute)
    decision = router.invoke(
        [
            SystemMessage(
                content="Decide if a research plan has been implemented based on conversation history."
            )
        ]
        + state["messages"]
    )
    return {"decision": decision}