In [1]:
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")

In [40]:
import sys
import io
from dataclasses import dataclass

# TODO run code in venv (Docker)

class CodeExecutor:
    """A class to execute code and capture printed output."""
    def __init__(self, namespace: dict):
        self.namespace = namespace

    def execute(self, code: str) -> tuple[bool, str]:
        old_stdout = sys.stdout
        redirected_output = io.StringIO()
        sys.stdout = redirected_output

        try:
            exec(code, self.namespace)
            output = redirected_output.getvalue()
            return True, output
        except Exception as e:
            return False, f"{type(e).__name__}: {e}"
        finally:
            sys.stdout = old_stdout

@dataclass
class ExecuteResult:
    success: bool
    message: str | None
    final_state: dict

In [41]:
from enum import Enum
from typing import List
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI

class PydanticActionNode(BaseModel):
    """Schema for action node."""
    action_id: int = Field(..., description="The sequential ID of this action step, starting from 1.")
    description: str = Field(..., description="A brief, natural language description of what this code does.")
    code: str = Field(..., description="A valid, executable snippet of Python code for this action.")

class PydanticActionGraph(BaseModel):
    """Schema for task graph."""
    task_nodes: List[PydanticActionNode] = Field(default=[] , description="List of task nodes for the process.")

class TaskStatus(str, Enum):
    """Enum for task status values."""
    SUCCESS = "success"
    FAILED = "failed" 
    PENDING = "pending"

class TaskType(str, Enum):
    """Enum for task type values."""
    DATA_LOADING = "data_loading"
    EXPLORATION = "exploration"
    FEATURE_ENGINEERING = "feature_engineering"
    MODEL_TRAINING = "model_training"
    EVALUATION = "evaluation"
    VISUALIZATION = "visualization"

class ActionNode:
    """Represents a single executable code snippet within a task."""
    def __init__(self, action_id: int, code: str, description: str):
        self.action_id = action_id
        self.description = description
        self.code = code
        self.status = TaskStatus.PENDING
        self.result = None

    def __repr__(self):
        return f"ActionNode(id={self.action_id}, description='{self.description}', status='{self.status.value}', code='{self.code[:30]}...')"

class ActionGraph:
    """Manages the sequence of actions for a single parent task."""
    def __init__(self):
        self.nodes: List[ActionNode] = []
        self.result: ExecuteResult | None = None

    def add_action(self, node: ActionNode):
        self.nodes.append(node)
        self.nodes.sort(key=lambda n: n.action_id)
        
    def __repr__(self):
        return f"ActionGraph with {len(self.nodes)} actions."
    
    def execute_action_graph(self, namespace: dict):
        executor = CodeExecutor(namespace=namespace)
        last_message = ''
        for current_action_node in self.nodes:
            exec_success, result = executor.execute(current_action_node.code)
            if not exec_success:
                final_state = executor.namespace.get("agent_state", {})
                current_action_node.status = TaskStatus.FAILED
                self.result = ExecuteResult(success=False, message=result, final_state=final_state)
                return
            current_action_node.status = TaskStatus.SUCCESS
            last_message = result
        
        final_state = executor.namespace.get("agent_state", {})
        self.result = ExecuteResult(success=True, message=last_message, final_state=final_state)

    def print_actions(self):
        for action in self.nodes:
            print(f"  - ID: {action.action_id}")
            print(f"    Description: {action.description}")
            print(f"    Code: {action.code}")
            print("--------------------")

In [42]:
from graphlib import TopologicalSorter, CycleError
from typing import TypedDict, List, Dict, Optional
from langchain_core.prompts import ChatPromptTemplate

class GraphState(TypedDict):
    """A comprehensive state for a data science pipeline."""
    run_id: str
    
    raw_data_path: str
    
    cleaned_data_path: Optional[str]
    model_path: Optional[str]
    
    evaluation_results: Optional[Dict[str, float]]
    
    visualization_paths: Optional[List[str]]

    previously_done: str

class TaskNode:
    """Represents a single node in the task graph."""
    def __init__(self, task_id: str, instruction: str, dependencies: list[str], task_type: TaskType, output: str):
        if not isinstance(task_id, str) or not task_id:
            raise ValueError("task_id must be a non-empty string.")
        
        self.task_id = task_id
        self.instruction = instruction
        self.dependencies = dependencies
        self.status = TaskStatus.PENDING
        self.task_type = task_type
        self.output = output
        self.result: str | None = None
        self.action_graph = ActionGraph()

    def __repr__(self):
        """Provides a string representation for the task node."""
        return (f"TaskNode(id='{self.task_id}', status='{self.status.value}', "
                f"instruction='{self.instruction[:30]}...', deps={self.dependencies})")

    def generate_action_graph(self, llm: ChatOpenAI, agent_state: GraphState, tool_sets=[], additional_instruction: str = "" ) -> ActionGraph:
        structured_llm = llm.with_structured_output(PydanticActionGraph)
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a data science code generator. Given a task instruction, break it down into a sequence of executable Python code snippets. Assume pandas is imported as pd and the data is in a DataFrame named 'df'."),
            ("human", "Generate the action steps for this task: {instruction}. {additional_instruction}. Here is the Agent State where you can find information needed: {agent_state}")
        ])
        chain = prompt | structured_llm
        
        pydantic_action_graph: PydanticActionGraph = chain.invoke({"instruction": self.instruction, "additional_instruction":additional_instruction, "agent_state":agent_state}) # type: ignore
        
        action_graph = ActionGraph()
        for p_node in pydantic_action_graph.task_nodes:
            action_node = ActionNode(
                action_id=p_node.action_id,
                description=p_node.description,
                code=p_node.code
            )
            action_graph.add_action(action_node)
        return action_graph

    def refine_and_update_action_graph(self, llm: ChatOpenAI, agent_state: GraphState):
        refined_graph: ActionGraph = self.generate_action_graph(llm=llm, agent_state=agent_state, additional_instruction=f"A few errors were encountered when running the actions one-by-one. You need to debug the code using the error message: {self.action_graph.result}")
        self.action_graph.nodes = refined_graph.nodes
        self.action_graph.result = None

    def iterate_refining(self, llm: ChatOpenAI, agent_state: GraphState):
        """Generate and refine the ActionGraph within a trial limit."""
        current_state: GraphState = agent_state
        
        max_retries = 3
        action_graph: ActionGraph = self.generate_action_graph(llm=llm, agent_state=current_state)
        self.action_graph = action_graph
        for _ in range(max_retries):
            namespace = {"agent_state": current_state}
            self.action_graph.execute_action_graph(namespace)
            if self.action_graph.result is None:
                raise Exception("Action Graph Result is None")
            if self.action_graph.result.success is False:
                self.refine_and_update_action_graph(llm=llm, agent_state=current_state)
            else:
                self.status = TaskStatus.SUCCESS
                break
        if self.action_graph.result is not None and self.action_graph.result.success is False:
            self.status = TaskStatus.FAILED
            self.result = str(self.action_graph.result)

    def run_action_graph(self, llm: ChatOpenAI, agent_state: GraphState) -> GraphState:
        current_state: GraphState = agent_state
        namespace = {"agent_state": current_state}
        self.action_graph.execute_action_graph(namespace)
        if self.action_graph.result and self.action_graph.result.final_state:
            return self.action_graph.result.final_state # type: ignore
        else:
            raise RuntimeError(f'Failed to run ActionGraph {self.task_id}: {self.action_graph.result}')

class TaskGraph:
    """Manages the entire Directed Acyclic Graph (DAG) of tasks."""
    def __init__(self):
        self.nodes: dict[str, TaskNode] = {}

    def add_task(self, task: TaskNode, replace: bool = False):
        """Adds a TaskNode to the graph.
        If a task with the same id exists:
          - if replace is True, overwrite the existing TaskNode,
          - otherwise raise ValueError.
        """
        if not isinstance(task.task_id, str) or not task.task_id:
            raise ValueError("task_id must be a non-empty string.")
        if task.task_id in self.nodes:
            if replace:
                self.nodes[task.task_id] = task
                print(f"Replaced existing Task with id '{task.task_id}'.")
                return
            raise ValueError(f"Task with id '{task.task_id}' already exists. Pass replace=True to overwrite.")
        self.nodes[task.task_id] = task

    def get_execution_order(self) -> list[str]:
        """
        Determines the execution order of tasks using topological sort.
        This is crucial for executing tasks in the correct sequence based on their dependencies.
        """
        graph_representation = {
            task_id: node.dependencies for task_id, node in self.nodes.items()
        }
        
        try:
            ts = TopologicalSorter(graph_representation)
            return list(ts.static_order())
        except CycleError as e:
            print(f"Error: A cycle was detected in the task graph. Cannot determine execution order. Details: {e}")
            return []

    def print_graph(self, verbose: bool = False):
        """Prints a summary of all tasks and their dependencies."""
        if not self.nodes:
            print("Graph is empty.")
            return
        
        if verbose:
            try:
                with open("action-graph-codes", "w", encoding="utf-8") as f:
                    for task_id, node in self.nodes.items():
                        print(f"  - ID: {task_id}, Status: {node.status.value}")
                        print(f"    Instruction: {node.instruction}")
                        print(f"    Dependencies: {node.dependencies or 'None'}")
                        f.write(f"--- Task {task_id} ---\n")
                        f.write(f"Instruction: {node.instruction}\n")
                        # if node has an action_graph, write each action's code
                        if getattr(node, 'action_graph', None) and node.action_graph.nodes:
                            for action in node.action_graph.nodes:
                                f.write(f"# Action {action.action_id}: {action.description}\n")
                                f.write(action.code + "\n\n")
                        else:
                            f.write("# No action graph available\n\n")
                print("Saved action graphs codes to 'action-graph-codes'")
            except Exception as e:
                print(f"Failed to write action graphs to file: {e}")
        
        else:
            print("--- Task Graph ---")
            for task_id, node in self.nodes.items():
                print(f"  - ID: {task_id}, Status: {node.status.value}")
                print(f"    Instruction: {node.instruction}")
                print(f"    Dependencies: {node.dependencies or 'None'}")
            print("--------------------")

    def run_workflow(self, llm: ChatOpenAI, agent_state: GraphState, stop_on_failure: bool = True) -> GraphState:
        """
        Run tasks in topological order. For each TaskNode:
          - evaluate optional `condition` (simple eval with limited globals),
          - execute via TaskNode.iterate_refining(llm, agent_state),
          - update agent_state from action_graph result if present.
        Returns final agent_state dict.
        """
        # ensure order
        order = self.get_execution_order()
        if not order:
            print("No execution order (empty graph or cycle detected).")
            return agent_state

        current_state = agent_state

        for tid in order:
            if tid not in self.nodes:
                print(f"Skipping unknown task id {tid}")
                continue

            node = self.nodes[tid]
            print(f"== Running task {tid}")

            # evaluate optional condition in a minimal sandbox
            # if getattr(node, "condition", None):
            #     cond_expr = node.condition
            #     try:
            #         # limited eval: only agent_state provided
            #         cond_ok = bool(eval(cond_expr, {"__builtins__": {}}, {"agent_state": agent_state}))
            #     except Exception as e:
            #         print(f"Condition eval error for task {tid}: {e} -> skipping")
            #         cond_ok = False
            #     if not cond_ok:
            #         print(f"Condition false for task {tid}; marking as skipped.")
            #         node.status = TaskStatus.SUCCESS
            #         continue

            # prepare namespace for execution (TaskNode expects GraphState)
            
            try:
                current_state = node.run_action_graph(llm=llm, agent_state=current_state)
            except Exception as e:
                print(f"Exception when running task {tid}: {type(e).__name__}: {e}")
                node.status = TaskStatus.FAILED
                if stop_on_failure:
                    break
                else:
                    continue

            if node.status == TaskStatus.FAILED and stop_on_failure:
                print("Stopping workflow due to failure.")
                break

        return current_state

In [43]:
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Any, Set, Dict

class PydanticTaskNode(BaseModel):
    """Schema for task node."""
    task_id: str = Field(..., description="Unique id for the task node in number, e.g. 1, 2, 3 etc")
    dependencies: List[str] = Field(..., description="A list of unique ids of nodes must be completed before this.")
    instruction: str = Field(..., description="A concise instruction for the task node.")
    task_type: TaskType = Field(description="Current status of the task")
    output: str = Field(..., description="description of what data or model is produced.")

    # produces: List[str] = Field(default_factory=list, description="Named artifacts produced by this task (e.g. cleaned.csv, model.pkl).")
    # consumes: List[str] = Field(default_factory=list, description="Named artifacts consumed by this task.")
    # condition: Optional[str] = Field(None, description="Optional condition expression (evaluated at runtime) to decide whether to run this task.")
    # parallelizable: bool = Field(True, description="Whether this task can be run in parallel with other independent tasks.")
    # retry_policy: Optional[Dict[str, Any]] = Field(None, description="Retry policy, e.g. {'max_retries': 3, 'backoff': 'exponential'}")
    
class PydanticTaskGraph(BaseModel):
    """Schema for task graph."""
    task_nodes: List[PydanticTaskNode] = Field(..., description="List of task nodes for the process.")

In [None]:
from langgraph.graph import StateGraph, START, END

# --- SETUP PHASE ---

# 1. Instantiate your LLM
# llm = ChatOpenAI(...) 

# 2. Create your TaskNode objects based on your plan
# This plan could also come from an initial LLM call
task_a = TaskNode(task_id="A", instruction="Load data from 'data.csv'", dependencies=[], ...)
task_b = TaskNode(task_id="B", instruction="Clean the data, remove nulls", dependencies=["A"], ...)
task_c = TaskNode(task_id="C", instruction="Generate a summary plot", dependencies=["B"], ...)
task_d = TaskNode(task_id="D", instruction="Analyze the cleaned data", dependencies=["B"], ...)

all_tasks = {"A": task_a, "B": task_b, "C": task_c, "D": task_d}

# --- GRAPH ASSEMBLY ---

workflow = StateGraph(MainGraphState)

# 3. Add a node for each task using the generic executor
for task_id in all_tasks.keys():
    # Use functools.partial to create a unique function for each node
    # that has the task_id "baked in".
    node_function = partial(task_executor_node, task_id=task_id)
    workflow.add_node(task_id, node_function)

# 4. Add the central router
workflow.add_node("router", task_router)

# 5. Define the workflow logic
workflow.set_entry_point("router")

# After any task node finishes, it goes back to the router to decide what's next
for task_id in all_tasks.keys():
    workflow.add_edge(task_id, "router")

# The router decides which task node to run. This is a conditional edge.
workflow.add_conditional_edges(
    "router",
    # The router's output (a task_id or list of task_ids) directly maps to the node to run
    lambda x: x,
    # The mapping is just an identity function because the router's output is the node name
    {task_id: task_id for task_id in all_tasks.keys()}
)


# --- COMPILE AND RUN ---

# 6. Compile the graph
main_graph = workflow.compile()

# 7. Invoke with the initial state
initial_state = {"tasks": all_tasks}
final_state = main_graph.invoke(initial_state)

print("\n--- FINAL TASK STATUS ---")
for task_id, task in final_state['tasks'].items():
    print(f"Task '{task_id}': {task.status.value}")

In [9]:
def select_task_node(task_graph: TaskGraph) -> TaskNode | None:
    """
    Select a task node with PENDING status from the task graph.
    Returns the first pending task found, or None if no pending tasks exist.
    """
    for task in task_graph.nodes.values():
        if task == TaskStatus.PENDING:
            return task
    
    return None

def is_graph_finished(task_graph: TaskGraph) -> bool:
    """
    Checks if there is any pending nodes in graph.
    Returns boolean value to indicate the graph condition.
    """
    for task in task_graph.nodes.values():
        if task.status == TaskStatus.PENDING:
            return False
    
    return True
  

In [44]:
class MasterAgent:
    def __init__(self, model:str, tools=[], max_retries:int = 3):

        self.max_retries = max_retries
        self.llm = ChatOpenAI(
            model=model,
            temperature=0,
            max_completion_tokens=None,
            timeout=None,
            max_retries=2,
          )
        
        self.tools = tools
        self.task_graph: TaskGraph = TaskGraph()
    
    def _generate_task_graph(self, human_input:str):
        """Generates a task graph"""
        structured_llm = self.llm.with_structured_output(PydanticTaskGraph)

        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "You are a data science planner. Given a project description, decompose it into a sequence of data science tasks. Respond with a complete graph of all tasks required. The agents are allowed to access the data in directory '/data' and store/retrieve models in directory '/models'. All the agents will be provided the global AgentState which will store the messages from previous agent"),
                ("human", "{input}"),
            ]
        )
        
        chain = prompt | structured_llm
        
        pydantic_response: PydanticTaskGraph = chain.invoke({"input": human_input}) # type: ignore
        
        for pydantic_node in pydantic_response.task_nodes:
            node_data = pydantic_node.model_dump()
            node = TaskNode(
                task_id=node_data["task_id"], 
                instruction=node_data["instruction"], 
                dependencies=node_data["dependencies"], 
                task_type=node_data["task_type"],
                output=node_data["output"]
            )
            self.task_graph.add_task(node)

        print(f'Task Graph created with {len(pydantic_response.task_nodes)} Nodes.')

    def _refine_and_update_task_graph(self, add_instructions:str):
        raise RuntimeError(f'failed to run Task Graph: {add_instructions}')

    def initialize_and_populate_task_graph(self, human_input:str):
        state_dict = {'run_id':'1', 'raw_data_path':'data/data.csv', 'previously_done':''}
        test_state: GraphState = state_dict # type: ignore
        self._generate_task_graph(human_input=human_input)
        for node in self.task_graph.nodes.values():
            node.iterate_refining(llm=self.llm, agent_state=test_state)
            if node.status is TaskStatus.FAILED:
                print(f'Iterate refining for Task Node {node.task_id} failed to run in limit {self.max_retries}: {node.action_graph.result}')
                self._refine_and_update_task_graph(node.result or '')

    def process_requirement(self, human_input:str, run_id:str) -> GraphState:
        state_dict = {'run_id':run_id, 'raw_data_path':'data/data.csv', 'previously_done':''}
        state: GraphState = state_dict # type: ignore

        for node in self.task_graph.nodes.values():
            node.iterate_refining(llm=self.llm, agent_state=state)
            if node.status is TaskStatus.FAILED:
                raise RuntimeError(f'Error during iteration: {node.action_graph.result}')

        final_state = self.task_graph.run_workflow(llm=self.llm, agent_state=state)
        return final_state


master_agent = MasterAgent(model="gpt-5-mini-2025-08-07")

In [45]:
master_agent.initialize_and_populate_task_graph(human_input="I need to clean up this data. Make the task short with maximum 2 Nodes and each node should have instruction to generate only small ActionGraph with at most 3 Nodes.")
master_agent.task_graph.print_graph(verbose=True)
print('=== Processing Requirement with Optimized TaskGraph')

final_state = master_agent.process_requirement(human_input='', run_id='1')

Task Graph created with 2 Nodes.
  - ID: 1, Status: success
    Instruction: Generate a small ActionGraph (at most 3 Nodes) that: (a) load raw data from /data, (b) run quick integrity checks (missing values, types, duplicates), (c) produce a brief summary report and save an interim cleaned CSV. Return only the ActionGraph with ≤3 nodes.
    Dependencies: None
  - ID: 2, Status: pending
    Instruction: Generate a small ActionGraph (at most 3 Nodes) that: (a) apply detailed cleaning rules to interim CSV (impute/drop, correct types, normalize), (b) run validation checks/unit tests on cleaned data, (c) save final cleaned dataset. Return only the ActionGraph with ≤3 nodes.
    Dependencies: ['1']
Saved action graphs codes to 'action-graph-codes'
== Running task 1
== Running task 2


In [46]:
final_state

{'run_id': '1', 'raw_data_path': 'data/data.csv', 'previously_done': ''}

In [10]:
import pandas as pd
import numpy as np
import io
from contextlib import redirect_stdout

# Data for the DataFrame
data = {
    'passenger_id': [1, 2, 3, 4, 5, 6],
    'name': ['Owen Harris', 'Florence Briggs', 'Laina Heikkinen', 'Jacques Heath', 'William Allen', 'James Moran'],
    'age': [22, 38, 26, 35, 35, np.nan],
    'cabin': ['C85', np.nan, 'C123', 'E46', 'D', np.nan],
    'fare': [7.25, 71.28, 7.92, 53.1, 8.05, 8.46]
}
df = pd.DataFrame(data)
info_buffer = io.StringIO()

with redirect_stdout(info_buffer):
    df.info()

df_info_string = info_buffer.getvalue()
context = {
    "df_info": df_info_string
}

In [None]:
state_dict = {'run_id':'1', 'raw_data_path':'data.csv', 'previously_done':'', 'next_step':'clean data'}
state: GraphState = state_dict # type: ignore

my_task = TaskNode(
    task_id="2",
    instruction=f"Perform data cleaning: fill missing age values with the median age and drop the 'cabin' column. Save the cleaned data as cleaned_data.csv. You will have access to the universal AgentState {state} which has structure {GraphState}. Save the path to result in the AgentState for others to access.",
    dependencies=["1"],
    task_type=TaskType.EXPLORATION,
    output="cleaned dataframe"
)

my_task.iterate_refining(llm=llm, agent_state=state)

Action Graph Generated with 7 steps.
Error when running action graph
ExecuteResult(success=False, message="NameError: name 'pd' is not defined", final_state={'run_id': '1', 'raw_data_path': 'data.csv', 'previously_done': '', 'next_step': 'clean data'})
Action Graph Generated with 5 steps.
  - ID: 1
    Description: Import required libraries and initialize the AgentState (fixes the NameError for 'pd').
    Code: import pandas as pd
import os

# Initialize or load the AgentState provided in the prompt
agent_state = {'run_id': '1', 'raw_data_path': 'data.csv', 'previously_done': '', 'next_step': 'clean data'}

print('Agent state initialized:', agent_state)
--------------------
  - ID: 2
    Description: Load the raw CSV from the path in AgentState into a DataFrame named df.
    Code: raw_path = agent_state.get('raw_data_path')
if not raw_path:
    raise ValueError('raw_data_path not found in agent_state')

# Read the CSV into df
df = pd.read_csv(raw_path)
print(f'Loaded data from {raw_pat