In [1]:
import hashlib
import json
import re
import threading
import time
from collections import defaultdict
import logging
import colorlog
import os
import subprocess
import tempfile
import json
import sys
import shutil

In [2]:
def get_logger():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    if not logger.handlers:
        handler = logging.StreamHandler()  # Log to console

        # Define color formatter
        formatter = colorlog.ColoredFormatter(
            "%(log_color)s%(asctime)s - %(levelname)s ==> %(message)s",
            log_colors={
                'DEBUG': 'bold_cyan',
                'INFO': 'bold_green',
                'WARNING': 'bold_yellow',
                'ERROR': 'bold_red',
                'CRITICAL': 'bold_red'
            }
        )
        
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger

LOGGER = get_logger()

In [3]:
class PythonEnvironmentManager:
    def __init__(self, venv_path: str, create_env: bool = False):
        self.venv_path = venv_path
        if create_env:
            self.create_virtualenv()

        self.python_bin = os.path.join(venv_path, "bin", "python") if os.name != "nt" else os.path.join(venv_path, "Scripts", "python.exe")

        if not os.path.exists(self.python_bin):
            raise FileNotFoundError(f"Python executable not found in virtualenv: {self.python_bin}")
        LOGGER.info(f"Using Python from virtual environment: {self.python_bin}")

    def create_virtualenv(self):
        if os.path.exists(self.venv_path):
            LOGGER.warning(f"Virtual environment already exists at {self.venv_path}. Skipping creation.")
            return

        try:
            subprocess.check_call([sys.executable, "-m", "venv", self.venv_path])
            LOGGER.info(f"Created virtual environment at {self.venv_path}")
        except subprocess.CalledProcessError as e:
            LOGGER.critical(f"Failed to create virtual environment: {e}")
            raise

    def install_dependencies(self, packages):
        if isinstance(packages, str):
            packages = [packages]

        try:
            subprocess.check_call([self.python_bin, "-m", "pip", "install"] + packages)
            LOGGER.info(f"Installed dependencies: {packages}")
        except subprocess.CalledProcessError as e:
            LOGGER.critical(f"Failed to install dependencies: {e}")
            raise

    def execute_python_code(self, function_body: str, arguments: dict) -> dict:
        # Wrap the function in a script with a fixed "result" dict output
        code = f"""
import json
{function_body}

if __name__ == "__main__":
    args = {json.dumps(arguments)}
    result = function(**args)
    print(json.dumps(result))
"""

        with tempfile.NamedTemporaryFile("w", suffix=".py", delete=False) as f:
            script_path = f.name
            f.write(code)

        try:
            process = subprocess.run(
                [self.python_bin, script_path],
                capture_output=True,
                text=True,
                timeout=30
            )

            if process.returncode != 0:
                LOGGER.critical(f"Error in subprocess:\n{process.stderr}")
                raise RuntimeError(f"Python code failed: {process.stderr}")

            output = process.stdout.strip()
            LOGGER.debug(f"Subprocess output: {output}")
            result = json.loads(output)
            result = {str(k): str(v) for k, v in result.items()}
            return result

        except Exception as e:
            LOGGER.critical(f"Exception during execution: {e}")
            raise
        finally:
            os.unlink(script_path)

In [632]:
class GraphNode:
    def __init__(self, nodeName, systemInstructions, userPrompt, pythonCode, outputSchema, **kwargs):
        
        self.nodeName = nodeName # str: Name of the node
        self.systemInstructions = systemInstructions # str: Instructions for the system or the system prompt
        self.userPrompt = userPrompt # str: User prompt or question can include reference to other nodes
        self.pythonCode = pythonCode # str: Python code to be executed
        self.outputSchema = outputSchema # Dict: Schema for the output of the node. Dictionary to a dictionary with string keys and values
        self.kwargs = kwargs # Additional keyword arguments for flexibility

        self._validate() # Validate the node's properties
        self.id = self.hash() # str: Unique identifier for the node

        self._compiled = False # bool: Flag to indicate if the node has been compiled
        self._parents = [] # List: Parent nodes
        self._children = [] # List: Child nodes

        self._inputs = {} # It is a mutable mapping of input names to their values where the keys are the output keys of the parent nodes' outputs
        self._outputs = {} # It is a mutable mapping of output names to their values where the keys are the output keys of the current node's outputs

        self.status = "pending" # str: Status of the node, can be "pending", "running" or "completed" or "waiting"

        if nodeName=="inputs":
            self.status = "running"
            self._outputs = {**kwargs}
            self.status = "completed"
    
    def _validate(self):
        # Validate the node's properties
        if not isinstance(self.nodeName, str):
            LOGGER.error("nodeName must be a string. Location: GraphNode._validate")
            raise ValueError("nodeName must be a string")
        if not isinstance(self.systemInstructions, str):
            LOGGER.error("systemInstructions must be a string. Location: GraphNode._validate")
            raise ValueError("systemInstructions must be a string")
        if not isinstance(self.userPrompt, str):
            LOGGER.error("userPrompt must be a string. Location: GraphNode._validate")
            raise ValueError("userPrompt must be a string")
        if not isinstance(self.pythonCode, dict):
            LOGGER.error("pythonCode must be a dict format with a argument and the function_body as the value. Location: GraphNode._validate")
            raise ValueError("pythonCode must be a dict format with a argument and the function_body as the value")
        
        if self.pythonCode != {}:
            if not isinstance(self.pythonCode.get("argument"), dict):
                LOGGER.error("pythonCode must have an 'argument' key with a dict value of named arguments. Location: GraphNode._validate")
                raise ValueError("pythonCode must have an 'argument' key with a dict value of named arguments")
            if not all(isinstance(k, str) and isinstance(v, str) for k, v in self.pythonCode["argument"].items()):
                LOGGER.error("pythonCode['argument'] must be a dictionary with string keys and values. Location: GraphNode._validate")
                raise ValueError("pythonCode['argument'] must be a dictionary with string keys and values")
            if not isinstance(self.pythonCode.get("function_body"), str):
                LOGGER.error("pythonCode must have a 'function_body' key with a string value. Location: GraphNode._validate")
                raise ValueError("pythonCode must have a 'function_body' key with a string value")
        
        if not isinstance(self.outputSchema, dict):
            LOGGER.error("outputSchema must be a dictionary. Location: GraphNode._validate")
            raise ValueError("outputSchema must be a dictionary")
        if not all(isinstance(k, str) and isinstance(v, str) for k, v in self.outputSchema.items()):
            LOGGER.error("outputSchema must be a dictionary with string keys and values. Location: GraphNode._validate")
            raise ValueError("outputSchema must be a dictionary with string keys and values")
        if not isinstance(self.kwargs, dict):
            LOGGER.error("kwargs must be a dictionary. Location: GraphNode._validate")
            raise ValueError("kwargs must be a dictionary")
        for key, value in self.kwargs.items():
            if not isinstance(key, str):
                LOGGER.error(f"Key '{key}' in kwargs must be a string. Location: GraphNode._validate")
                raise ValueError(f"Key '{key}' in kwargs must be a string")
            if not isinstance(value, str):
                LOGGER.error(f"Value '{value}' in kwargs must be a string. Location: GraphNode._validate")
                raise ValueError(f"Value '{value}' in kwargs must be a string")
            
    
    def hash(self):
        # Generate a hash for the node based on its properties
        node_string = f"{self.nodeName}{self.systemInstructions}{self.userPrompt}{self.pythonCode}{str(self.outputSchema)}{str(self.kwargs)}"
        return hashlib.sha256(node_string.encode()).hexdigest()
    
    def resolve_parent_nodes(self, nodePool):
        self._parents = []
        pattern = r'@\[(\w+)\.(\w+)\]'
        
        references = re.findall(pattern, self.systemInstructions)
        references += re.findall(pattern, self.userPrompt)
        references += re.findall(pattern, self.pythonCode.get("function_body", ""))
        references += re.findall(pattern, str(self.pythonCode.get("argument", {})))
        
        for node_name, output_key in references:
            self._parents.append([node_name, output_key])
            nodePool[node_name]._children.append(self.nodeName)
            # Remove duplicates
            nodePool[node_name]._children = list(set(nodePool[node_name]._children))
            
        return self._parents
    
    def resolve_references(self, input_str, nodePool):
        # Replace references in the input string with actual values from the node pool
        for node_name, output_key in self._parents:
            if node_name in nodePool and output_key in nodePool[node_name].outputSchema:
                try:
                    # Parent key
                    parent_key = nodePool[node_name]._outputs[output_key]
                    # Replace the reference with the actual value from the node pool
                    input_str = input_str.replace(f"@[{node_name}.{output_key}]", parent_key)
                    # Add the current node as a child of the referenced node
                    if f"@[{node_name}.{output_key}]" in input_str:
                        self._inputs[f"@[{node_name}.{output_key}]"] = parent_key
                except Exception as e:
                    LOGGER.error(f"Error replacing reference @{node_name}.{output_key}: {e}. Location: GraphNode.resolve_references")
                    # If there's an error, keep the original string
        return input_str
    

    def get_current_state(self, nodePool):
        nodeName = self.nodeName
        systemInstructions = self.resolve_references(self.systemInstructions, nodePool)
        userPrompt = self.resolve_references(self.userPrompt, nodePool)
        pythonCode = self.resolve_references(self.pythonCode.get("function_body", ""), nodePool)
        argument = self.pythonCode.get("argument", {})
        for key, value in argument.items():
            argument[key] = self.resolve_references(value, nodePool)
        outputs = self._outputs

        state = {
            "nodeName": nodeName,
            "systemInstructions": systemInstructions,
            "userPrompt": userPrompt,
            "pythonCode": {
                "function_body": pythonCode,
                "argument": argument
            },
            "outputs": outputs
        }

        return state

    def check_parent_status(self, nodePool):
        # Check if all parent nodes are completed
        for parent in self._parents:
            node_name, output_key = parent
            if node_name in nodePool:
                parent_node = nodePool[node_name]
                if parent_node.status != "completed":
                    LOGGER.warning(f"Parent node {node_name} of {self.nodeName} is not completed. Location: GraphNode.check_parent_status")
                    return False
            else:
                LOGGER.error(f"Parent node {node_name} not found in node pool. Location: GraphNode.check_parent_status")
                return False
        return True
    
    def _validate_output(self, result):
        if result is None:
            LOGGER.error("Result is None. Location: GraphNode._validate_output")
            raise ValueError("Result is None.")
        # Validate the output against the output schema
        for key, value in self.outputSchema.items():
            if key not in result:
                LOGGER.error(f"Output key '{key}' not found in result. Location: GraphNode._validate_output")
                raise ValueError(f"Output key '{key}' not found in result.")
            if not isinstance(result[key], type(value)):
                raise ValueError(f"Output key '{key}' has incorrect type. Expected {type(value)}, got {type(result[key])}.")
        return True


    def execute(self, nodePool, python_env_manager: PythonEnvironmentManager):
        if not self.check_parent_status(nodePool):
            LOGGER.warning(f"Parent nodes are not completed for {self.nodeName}. Location: GraphNode.execute")
            return None
        self.status = "running"
        # Execute the Python code with the resolved arguments
        state = self.get_current_state(nodePool)
        systemInstructions = state["systemInstructions"]
        userPrompt = state["userPrompt"]
        pythonFunctionBody = state["pythonCode"]["function_body"]
        pythonCodeArgument = state["pythonCode"]["argument"]

        if pythonFunctionBody != "" and python_env_manager is not None:
            # Prepare the arguments for output
            result = python_env_manager.execute_python_code(pythonFunctionBody, pythonCodeArgument)
            # Check if the result matches the output schema
            if not self._validate_output(result):
                LOGGER.error(f"Output does not match the schema for {self.nodeName}. Location: GraphNode.execute")
                return None
            # Store the result in _outputs
            self._outputs = result
        else:
            # If no Python code is provided, use the system instructions and user prompt (TODO: Implement this LLM logic)
            self._outputs = {
                k: f"LLM/{userPrompt}" for k, v in self.outputSchema.items()
            }
            
        self.status = "completed"
        return self._outputs

    def to_dict(self):
        # Convert the node to a dictionary representation
        key_value_pairs = {
            "nodeName": self.nodeName,
            "systemInstructions": self.systemInstructions,
            "userPrompt": self.userPrompt,
            "pythonCode": self.pythonCode,
            "outputSchema": self.outputSchema,
            "kwargs": self.kwargs,
            "id": self.id,
            "_compiled": self._compiled,
            "_parents": self._parents,
            "_children": self._children,
            "_inputs": self._inputs,
            "_outputs": self._outputs,
            "status": self.status
        }

        return key_value_pairs

In [4]:
python_env_manager = PythonEnvironmentManager(venv_path="runner_envs/venv", create_env=True)

[1;32m2025-05-17 22:45:21,313 - INFO ==> Using Python from virtual environment: runner_envs/venv/bin/python[0m


In [10]:
python_env_manager.execute_python_code(
    function_body="""
def function(arg1):
    return {"caps":arg1.upper()}
""",
    arguments={"arg1": "afadfae"}
)

[1;36m2025-05-17 22:47:38,053 - DEBUG ==> Subprocess output: {"caps": "AFADFAE"}[0m


{'caps': 'AFADFAE'}

In [11]:
class Graph:
    def __init__(self, timeout=10, venv_path=None, create_env=False, python_packages=[], save_dir=None):
        self.nodePool = {
        } # Dictionary to hold all nodes
        self.save_dir = save_dir
        self.graph_id = None
        self.conditions = defaultdict(threading.Condition)
        self.lock = threading.Lock()
        self.timeout = timeout # Timeout for node execution

        self.venv_path = venv_path
        self.python_packages = python_packages
        self.create_env = create_env

        if self.venv_path is not None:
            self.python_env_manager = PythonEnvironmentManager(self.venv_path, self.create_env)
            if python_packages:
                self.python_env_manager.install_dependencies(python_packages)
        else:
            self.python_env_manager = None

    def addInput(self, inputFields):
        # Add input fields to the graph
        if not isinstance(inputFields, dict):
            raise ValueError("Input fields must be a dictionary")
        
        for key, value in inputFields.items():
            if not isinstance(key, str):
                LOGGER.error(f"Key '{key}' in input fields must be a string")
                raise ValueError(f"Key '{key}' in input fields must be a string")
            if not isinstance(value, str):
                LOGGER.error(f"Value '{value}' in input fields must be a string")
                raise ValueError(f"Value '{value}' in input fields must be a string")
        
        new_node = GraphNode("inputs", "Input node", "Input node", {}, {**inputFields}, **inputFields)
        self.nodePool["inputs"] = new_node
        return new_node

    def addNode(self, nodeName, systemInstructions, userPrompt, pythonCode, outputSchema, **kwargs):
        # Add a new node to the graph
        if nodeName in self.nodePool:
            raise ValueError(f"Node with name {nodeName} already exists. Location: Graph.addNode")
        
        new_node = GraphNode(nodeName, systemInstructions, userPrompt, pythonCode, outputSchema, **kwargs)
        self.nodePool[nodeName] = new_node
        return new_node
    
    def getNode(self, nodeName):
        # Retrieve a node from the graph
        if nodeName not in self.nodePool:
            raise ValueError(f"Node with name {nodeName} does not exist. Location: Graph.getNode")
        
        return self.nodePool[nodeName]
    
    def compile(self):
        # Compile the graph by checking dependencies and setting parent-child relationships
        for node in self.nodePool.values():
            self.nodePool[node.nodeName].resolve_parent_nodes(self.nodePool)
            node._compiled = True

        self.check_circular_dependency()

        # Gnerate a unique graph ID from the node hashes
        node_hashes = [node.hash() for node in self.nodePool.values()]
        self.graph_id = hashlib.sha256("".join(node_hashes).encode()).hexdigest()
        LOGGER.info(f"Graph compiled with ID: {self.graph_id}. Location: Graph.compile")
        self.save_graph()
        
    def check_circular_dependency(self):
        # Check for circular dependencies in the graph
        visited = set()
        stack = set()
        def visit(node):
            if node in stack:
                raise ValueError(f"Circular dependency detected: {node}. Location: Graph.check_circular_dependency")
            if node not in visited:
                visited.add(node)
                stack.add(node)
                for child in self.nodePool[node]._children:
                    visit(child)
                stack.remove(node)
        for node in self.nodePool:
            if node not in visited:
                visit(node)
        return True
     
    
    def print_graph(self):
        # Print the graph in a readable format
        for node in self.nodePool.values():
            print(node.to_dict())
        print("\n")

    def _execute_node_with_condition(self, node, timeout, involved_nodes):
        start_time = time.time()
        condition = self.conditions[node.nodeName]

        with condition:
            while not node.check_parent_status(self.nodePool):
                remaining = timeout - (time.time() - start_time)
                if remaining <= 0:
                    LOGGER.warning(f"Timeout reached for node {node.nodeName}. Location: Graph._execute_node_with_condition")
                    raise TimeoutError(f"Node {node.nodeName} timed out.")
                condition.wait(timeout=remaining)

        with self.lock:
            involved_nodes.add(node.nodeName)
            LOGGER.info(f"Running node: {node.nodeName}. Location: Graph._execute_node_with_condition")
            result = node.execute(self.nodePool, self.python_env_manager)
            LOGGER.info(f"Completed node: {node.nodeName} with result: {result} . Location: Graph._execute_node_with_condition")

        # Notify all waiting threads
        for child in node._children:
            if self.nodePool[child].check_parent_status(self.nodePool):
                with self.lock:
                    LOGGER.debug(f"Notifying child node: {child} . Location: Graph._execute_node_with_condition")
                    with self.conditions[child]:
                        self.conditions[child].notify_all()

    def _traverse_nodes(self, start_node):
        visited = set()
        queue = [start_node]
        while queue:
            current = queue.pop(0)
            if current in visited:
                continue
            visited.add(current)
            node = self.nodePool[current]
            queue.extend(node._children)
        return list(visited)
    
    def execute_from_node(self, start_node):
        # Execute the graph from a specific starting node
        if start_node not in self.nodePool:
            LOGGER.error(f"Node with name {start_node} does not exist. Location: Graph.execute_from_node")
            raise ValueError(f"Node with name {start_node} does not exist.")
        
        visited = self._traverse_nodes(start_node)

        # Check if all nodes are compiled
        for node_name in visited:
            node = self.nodePool[node_name]
            if not node._compiled:
                LOGGER.error(f"Node {node_name} is not compiled. Location: Graph.execute_from_node")
                raise ValueError(f"Node {node_name} is not compiled.")
            
        # Check if all nodes are completed
        for node_name in visited:
            node = self.nodePool[node_name]
            if node.status == "running" or node.status == "waiting":
                LOGGER.error(f"Node {node_name} is already running or waiting. Location: Graph.execute_from_node")  
                raise ValueError(f"Node {node_name} is already running or waiting.")
            node.status = "pending"
            LOGGER.info(f"Node {node_name} status set to pending.")


        threads = []
        involved_nodes = set()

        try:
            for node_name in visited:
                node = self.nodePool[node_name]
                if node.status == "pending":
                    thread = threading.Thread(target=self._execute_node_with_condition, args=(node, self.timeout, involved_nodes))
                    threads.append(thread)
                    thread.start()
                    LOGGER.info(f"Thread started for node: {node_name} . Location: Graph.execute_from_node")
            
            for thread in threads:
                thread.join()
        except TimeoutError as e:
            LOGGER.critical(f"Timeout error: {e}")
            for node_name in involved_nodes:
                node = self.nodePool[node_name]
                if node.status == "running":
                    node.status = "completed"
                    LOGGER.warning(f"Node {node_name} status set to completed due to timeout. Location: Graph.execute_from_node")
            for node_name in involved_nodes:
                node = self.nodePool[node_name]
                if node.status == "running":
                    node.status = "completed"
                    LOGGER.warning(f"Node {node_name} status set to completed due to timeout. Location: Graph.execute_from_node")
            LOGGER.error(f"Error during execution: {e}")
            
        finally:
            for node_name in visited:
                node = self.nodePool[node_name]
                if node.status == "running":
                    node.status = "completed"
                    LOGGER.info(f"Node {node_name} status set to completed. Location: Graph.execute_from_node")
            LOGGER.info("Execution completed for all nodes. Location: Graph.execute_from_node")

        self.save_graph()


    def save_graph(self):
        # Save the graph to a JSON file
        if not self.save_dir:
            LOGGER.error("Save directory is not set. Location: Graph.save_graph")
            raise ValueError("Save directory is not set. Location: Graph.save_graph")
        file_path = os.path.join(self.save_dir, f"graph.json")
        with open(file_path, "w") as f:
            nodes = {node_name: node.to_dict() for node_name, node in self.nodePool.items()}
            json.dump({"nodes": nodes, "venv_path": self.venv_path, "python_packages": self.python_packages}, f, indent=4)
        LOGGER.info(f"Graph saved to {file_path}. Location: Graph.save_graph")

    def load_graph(self):
        # Load the graph from a JSON file
        file_path = os.path.join(self.save_dir, f"graph.json")
        try:
            with open(file_path, "r") as f:
                data = json.load(f)
                for node_name, node_data in data["nodes"].items():
                    node = GraphNode(
                        node_data["nodeName"],
                        node_data["systemInstructions"],
                        node_data["userPrompt"],
                        node_data["pythonCode"],
                        node_data["outputSchema"],
                        **node_data["kwargs"]
                    )
                    # Set the node's properties
                    node._compiled = node_data["_compiled"]
                    node._parents = node_data["_parents"]
                    node._children = node_data["_children"]
                    node._inputs = node_data["_inputs"]
                    node._outputs = node_data["_outputs"]
                    node.status = node_data["status"]
                    
                    # Add the node to the node pool
                    self.nodePool[node_name] = node

                self.venv_path = data.get("venv_path", None)
                self.python_packages = data.get("python_packages", [])
                self.create_env = data.get("create_env", False)
                
                if self.venv_path is not None:
                    self.python_env_manager = PythonEnvironmentManager(self.venv_path, self.create_env)
                    if self.python_packages:
                        self.python_env_manager.install_dependencies(self.python_packages)
                
                LOGGER.info(f"Graph loaded from {file_path}. Location: Graph.load_graph")
        except FileNotFoundError:
            LOGGER.error(f"Graph file not found: {file_path}. Location: Graph.load_graph")
            raise
        except json.JSONDecodeError:
            LOGGER.error(f"Error decoding JSON from file: {file_path}. Location: Graph.load_graph")
            raise
        except Exception as e:
            LOGGER.error(f"Error loading graph: {e}. Location: Graph.load_graph")
            raise
        self.compile()
        LOGGER.info(f"Graph compiled after loading from {file_path}. Location: Graph.load_graph")

In [641]:
inputs = {
    "input1": "10",
    "input2": "12",
    "input3": "41"
}

node1 = GraphNode(
    nodeName="Node1",
    systemInstructions="This is a test system instruction for Node1.",
    userPrompt="Calculate the sum of @[inputs.input1] and @[inputs.input2] and @[inputs.input1]^2",
    pythonCode={
        "argument": {
            "arg1": "@[inputs.input1]",
            "arg2": "@[inputs.input2]"
        },
        "function_body": "def function(arg1, arg2):\n    return {'output1': int(arg1) + int(arg2), 'output2': int(arg1)**2}"
    },
    outputSchema={
        "output1": "This is the output1 of Node1 , sum of input1 and input2",
        "output2": "This is the output2 of Node1 , square of input1"
    }
)

node2 = GraphNode(
    nodeName="Node2",
    systemInstructions="This is a test system instruction for Node2.",
    userPrompt="Calculate the sum of @[Node1.output1] and @[inputs.input3]. And also the sum of @[Node1.output1] * @[inputs.input3]",
    pythonCode={
        "argument": {
            "arg1": "@[Node1.output1]",
            "arg2": "@[inputs.input3]"
        },
        "function_body": "import time; \ndef function(arg1, arg2):\n    time.sleep(5)\n    return {'output1': int(arg1) + int(arg2), 'output2': int(arg1) * int(arg2)}"
    },
    outputSchema={
        "output1": "This is the output1 of Node2 , sum of Node1.output1 and input3",
        "output2": "This is the output2 of Node2 , product of Node1.output1 and input3"
    }
)

node3 = GraphNode(
    nodeName="Node3",
    systemInstructions="This is a test system instruction for Node3.",
    userPrompt="Write summary of @[Node1.output1] = @[inputs.input1] + @[inputs.input2] and @[Node2.output1] = @[Node1.output1] + @[inputs.input3]",
    pythonCode={
        "argument": {},
        "function_body": ""
    },
    outputSchema={
        "output1": "This is the output1 of Node3",
    }
)

In [642]:
graph = Graph(timeout=10, venv_path="./runner_envs/venv", create_env=True, python_packages=["numpy"], save_dir="./saved_graphs")


[1;32m2025-05-15 17:26:32,838 - INFO ==> Using Python from virtual environment: ./runner_envs/venv/bin/python[0m
You should consider upgrading via the '/Users/debasmitroy/Desktop/programming/ddruk-lz-0/agent-dag-builder/runner_envs/venv/bin/python -m pip install --upgrade pip' command.
[1;32m2025-05-15 17:26:33,198 - INFO ==> Installed dependencies: ['numpy'][0m




In [602]:

graph.addInput(inputs)

graph.addNode(
    nodeName=node1.nodeName,
    systemInstructions=node1.systemInstructions,
    userPrompt=node1.userPrompt,
    pythonCode=node1.pythonCode,
    outputSchema=node1.outputSchema
)

graph.addNode(  
    nodeName=node2.nodeName,
    systemInstructions=node2.systemInstructions,
    userPrompt=node2.userPrompt,
    pythonCode=node2.pythonCode,
    outputSchema=node2.outputSchema
)

graph.addNode(
    nodeName=node3.nodeName,
    systemInstructions=node3.systemInstructions,
    userPrompt=node3.userPrompt,
    pythonCode=node3.pythonCode,
    outputSchema=node3.outputSchema
)

<__main__.GraphNode at 0x1180d1640>

In [603]:
graph.compile()

[1;32m2025-05-15 16:23:13,371 - INFO ==> Graph compiled with ID: a657b9dfa40e3ebcac5041a8e0926fa4ab3fbb8c155aa3db60d7c88a321e7418. Location: Graph.compile[0m
[1;32m2025-05-15 16:23:13,373 - INFO ==> Graph saved to ./saved_graphs/graph.json. Location: Graph.save_graph[0m


In [604]:
graph.print_graph()

{'nodeName': 'inputs', 'systemInstructions': 'Input node', 'userPrompt': 'Input node', 'pythonCode': {}, 'outputSchema': {'input1': '10', 'input2': '12', 'input3': '41'}, 'kwargs': {'input1': '10', 'input2': '12', 'input3': '41'}, 'id': 'cd473f364a683944ed5158877169fa68b50c0b106d523e8f81e48f13759838a8', '_compiled': True, '_parents': [], '_children': ['Node3', 'Node1', 'Node2'], '_inputs': {}, '_outputs': {'input1': '10', 'input2': '12', 'input3': '41'}, 'status': 'completed'}
{'nodeName': 'Node1', 'systemInstructions': 'This is a test system instruction for Node1.', 'userPrompt': 'Calculate the sum of @[inputs.input1] and @[inputs.input2] and @[inputs.input1]^2', 'pythonCode': {'argument': {'arg1': '@[inputs.input1]', 'arg2': '@[inputs.input2]'}, 'function_body': "def function(arg1, arg2):\n    return {'output1': int(arg1) + int(arg2), 'output2': int(arg1)**2}"}, 'outputSchema': {'output1': 'This is the output1 of Node1 , sum of input1 and input2', 'output2': 'This is the output2 of N

In [605]:
graph.execute_from_node("Node1")

[1;32m2025-05-15 16:23:15,391 - INFO ==> Node Node3 status set to pending.[0m
[1;32m2025-05-15 16:23:15,392 - INFO ==> Node Node1 status set to pending.[0m
[1;32m2025-05-15 16:23:15,393 - INFO ==> Node Node2 status set to pending.[0m
[1;32m2025-05-15 16:23:15,393 - INFO ==> Thread started for node: Node3 . Location: Graph.execute_from_node[0m
[1;32m2025-05-15 16:23:15,394 - INFO ==> Running node: Node1. Location: Graph._execute_node_with_condition[0m
[1;32m2025-05-15 16:23:15,394 - INFO ==> Thread started for node: Node1 . Location: Graph.execute_from_node[0m
[1;32m2025-05-15 16:23:15,395 - INFO ==> Thread started for node: Node2 . Location: Graph.execute_from_node[0m
[1;36m2025-05-15 16:23:15,419 - DEBUG ==> Subprocess output: {"output1": 22, "output2": 100}[0m
[1;32m2025-05-15 16:23:15,420 - INFO ==> Completed node: Node1 with result: {'output1': '22', 'output2': '100'} . Location: Graph._execute_node_with_condition[0m
[1;36m2025-05-15 16:23:15,421 - DEBUG ==> Noti

In [539]:
graph.save_graph()

[1;32m2025-05-15 14:26:07,494 - INFO ==> Graph saved to ./saved_graphs/graph_a657b9dfa40e3ebcac5041a8e0926fa4ab3fbb8c155aa3db60d7c88a321e7418.json. Location: Graph.save_graph[0m


In [540]:
graph.print_graph()

{'nodeName': 'inputs', 'systemInstructions': 'Input node', 'userPrompt': 'Input node', 'pythonCode': {}, 'outputSchema': {'input1': '10', 'input2': '12', 'input3': '41'}, 'kwargs': {'input1': '10', 'input2': '12', 'input3': '41'}, 'id': 'cd473f364a683944ed5158877169fa68b50c0b106d523e8f81e48f13759838a8', '_compiled': True, '_parents': [], '_children': ['Node3', 'Node1', 'Node2'], '_inputs': {}, '_outputs': {'input1': '10', 'input2': '12', 'input3': '41'}, 'status': 'completed'}
{'nodeName': 'Node1', 'systemInstructions': 'This is a test system instruction for Node1.', 'userPrompt': 'Calculate the sum of @[inputs.input1] and @[inputs.input2] and @[inputs.input1]^2', 'pythonCode': {'argument': {'arg1': '10', 'arg2': '12'}, 'function_body': "def function(arg1, arg2):\n    return {'output1': int(arg1) + int(arg2), 'output2': int(arg1)**2}"}, 'outputSchema': {'output1': 'This is the output1 of Node1 , sum of input1 and input2', 'output2': 'This is the output2 of Node1 , square of input1'}, '

## Loading Graph

In [541]:
loaded_graph = Graph(timeout=10, venv_path="./runner_envs/venv", create_env=True, python_packages=["numpy"], save_dir="./saved_graphs")


[1;32m2025-05-15 14:26:08,580 - INFO ==> Using Python from virtual environment: ./runner_envs/venv/bin/python[0m
You should consider upgrading via the '/Users/debasmitroy/Desktop/programming/ddruk-lz-0/agent-dag-builder/runner_envs/venv/bin/python -m pip install --upgrade pip' command.
[1;32m2025-05-15 14:26:08,905 - INFO ==> Installed dependencies: ['numpy'][0m




In [542]:
loaded_graph.load_graph(graph_id="a657b9dfa40e3ebcac5041a8e0926fa4ab3fbb8c155aa3db60d7c88a321e7418")

[1;32m2025-05-15 14:26:09,906 - INFO ==> Graph loaded from ./saved_graphs/graph_a657b9dfa40e3ebcac5041a8e0926fa4ab3fbb8c155aa3db60d7c88a321e7418.json. Location: Graph.load_graph[0m
[1;32m2025-05-15 14:26:09,907 - INFO ==> Graph compiled with ID: c10870e23ea1d1e7253ca8e586fb4a6a5fcfc497876ded26f3d141dc61047a18. Location: Graph.compile[0m
[1;32m2025-05-15 14:26:09,908 - INFO ==> Graph saved to ./saved_graphs/graph_c10870e23ea1d1e7253ca8e586fb4a6a5fcfc497876ded26f3d141dc61047a18.json. Location: Graph.save_graph[0m
[1;32m2025-05-15 14:26:09,909 - INFO ==> Graph compiled after loading from ./saved_graphs/graph_a657b9dfa40e3ebcac5041a8e0926fa4ab3fbb8c155aa3db60d7c88a321e7418.json. Location: Graph.load_graph[0m


In [543]:
loaded_graph.nodePool["Node1"].status

'completed'

In [546]:
loaded_graph.execute_from_node("Node1")

[1;32m2025-05-15 14:48:01,030 - INFO ==> Node Node3 status set to pending.[0m
[1;32m2025-05-15 14:48:01,031 - INFO ==> Node Node1 status set to pending.[0m
[1;32m2025-05-15 14:48:01,032 - INFO ==> Node Node2 status set to pending.[0m
[1;32m2025-05-15 14:48:01,032 - INFO ==> Thread started for node: Node3 . Location: Graph.execute_from_node[0m
[1;32m2025-05-15 14:48:01,033 - INFO ==> Running node: Node1. Location: Graph._execute_node_with_condition[0m
[1;32m2025-05-15 14:48:01,033 - INFO ==> Thread started for node: Node1 . Location: Graph.execute_from_node[0m
[1;32m2025-05-15 14:48:01,035 - INFO ==> Thread started for node: Node2 . Location: Graph.execute_from_node[0m
[1;36m2025-05-15 14:48:01,072 - DEBUG ==> Subprocess output: {"output1": 22, "output2": 100}[0m
[1;32m2025-05-15 14:48:01,073 - INFO ==> Completed node: Node1 with result: {'output1': '22', 'output2': '100'} . Location: Graph._execute_node_with_condition[0m
[1;36m2025-05-15 14:48:01,074 - DEBUG ==> Noti

In [545]:
loaded_graph.nodePool["Node1"].status

'completed'

## Graph Session Manager

In [643]:
class GraphSessionManager:
    def __init__(self, session_root_dir, timeout=10):
        self.timeout = timeout

        self.session_keys = os.listdir(session_root_dir)
        self.session_metadata = {}
        
        for session_key in self.session_keys:
            self.session_metadata[session_key] = {
                'session_id': session_key,
                'graph': None,
                'created_at': time.time(),
                'updated_at': time.time(),
            }
            self.session_metadata[session_key]['graph'] = self.load_graph_into_session(session_key)
            
    def load_graph_into_session(self, session_key):
        graph = Graph(timeout=self.timeout, venv_path=None, create_env=None, python_packages=None, save_dir=f"./saved_graphs/{session_key}/")
        print(f"Loading graph from {session_key}")
        graph.load_graph()
        return graph
    
    def create_session(self, session_id, venv_path, create_env, python_packages):
        if session_id in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} already exists.")
        
        ############# Create a new graph and save it to the session directory
        graph = Graph(timeout=self.timeout, venv_path=venv_path, create_env=create_env, python_packages=python_packages, save_dir=f"./saved_graphs/{session_id}/")
        os.makedirs(os.path.join("./saved_graphs/", session_id), exist_ok=True)
        graph.compile()
        
        self.session_metadata[session_id] = {
            'session_id': session_id,
            'graph': graph,
            'created_at': time.time(),
            'updated_at': time.time(),
        }
        return graph
    
    def add_input_to_session(self, session_id, inputFields):
        if session_id not in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} does not exist.")
        
        graph = self.session_metadata[session_id]['graph']
        new_node = graph.addInput(inputFields)
        graph.compile()
        return new_node
    
    def add_node_to_session(self, session_id, nodeName, systemInstructions, userPrompt, pythonCode, outputSchema, **kwargs):
        if session_id not in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} does not exist.")
        
        graph = self.session_metadata[session_id]['graph']
        new_node = graph.addNode(nodeName, systemInstructions, userPrompt, pythonCode, outputSchema, **kwargs)
        graph.compile()
        return new_node
    
    def execute_session(self, session_id, start_node):
        if session_id not in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} does not exist.")
        
        graph = self.session_metadata[session_id]['graph']
        graph.execute_from_node(start_node)
        return graph
    
    def get_session_graph(self, session_id):
        if session_id not in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} does not exist.")
        
        return self.session_metadata[session_id]['graph']
    
    def delete_session(self, session_id):
        if session_id not in self.session_metadata:
            raise ValueError(f"Session with ID {session_id} does not exist.")
        
        del self.session_metadata[session_id]
        LOGGER.info(f"Session {session_id} deleted.")
        session_dir = os.path.join("./saved_graphs/", session_id)
        if os.path.exists(session_dir):
            shutil.rmtree(session_dir)
            LOGGER.info(f"Session directory {session_dir} deleted.")
        else:
            LOGGER.warning(f"Session directory {session_dir} does not exist.")
        return True
    
    def list_sessions(self):
        return list(self.session_metadata.keys())


In [644]:
session_manager = GraphSessionManager(session_root_dir="./saved_graphs/")

[1;32m2025-05-15 17:26:38,458 - INFO ==> Using Python from virtual environment: ./runner_envs/venv/bin/python[0m


Loading graph from sample_session


You should consider upgrading via the '/Users/debasmitroy/Desktop/programming/ddruk-lz-0/agent-dag-builder/runner_envs/venv/bin/python -m pip install --upgrade pip' command.
[1;32m2025-05-15 17:26:38,698 - INFO ==> Installed dependencies: ['numpy'][0m
[1;32m2025-05-15 17:26:38,699 - INFO ==> Graph loaded from ./saved_graphs/sample_session/graph.json. Location: Graph.load_graph[0m
[1;32m2025-05-15 17:26:38,699 - INFO ==> Graph compiled with ID: c10870e23ea1d1e7253ca8e586fb4a6a5fcfc497876ded26f3d141dc61047a18. Location: Graph.compile[0m
[1;32m2025-05-15 17:26:38,700 - INFO ==> Graph saved to ./saved_graphs/sample_session/graph.json. Location: Graph.save_graph[0m
[1;32m2025-05-15 17:26:38,701 - INFO ==> Graph compiled after loading from ./saved_graphs/sample_session/graph.json. Location: Graph.load_graph[0m
[1;31m2025-05-15 17:26:38,701 - ERROR ==> Error loading graph: [Errno 20] Not a directory: './saved_graphs/graph.json/graph.json'. Location: Graph.load_graph[0m


Loading graph from graph.json


NotADirectoryError: [Errno 20] Not a directory: './saved_graphs/graph.json/graph.json'

In [620]:
session_manager.list_sessions()

[]

In [621]:
sample_graph = session_manager.create_session(session_id="sample_session", 
                                              venv_path="./runner_envs/venv", 
                                              create_env=True, 
                                              python_packages=["numpy"]
                                              )

[1;32m2025-05-15 17:03:58,015 - INFO ==> Using Python from virtual environment: ./runner_envs/venv/bin/python[0m
You should consider upgrading via the '/Users/debasmitroy/Desktop/programming/ddruk-lz-0/agent-dag-builder/runner_envs/venv/bin/python -m pip install --upgrade pip' command.
[1;32m2025-05-15 17:03:58,343 - INFO ==> Installed dependencies: ['numpy'][0m
[1;32m2025-05-15 17:03:58,344 - INFO ==> Graph compiled with ID: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855. Location: Graph.compile[0m
[1;32m2025-05-15 17:03:58,344 - INFO ==> Graph saved to ./saved_graphs/sample_session/graph.json. Location: Graph.save_graph[0m




In [622]:
session_manager.add_input_to_session(
    session_id="sample_session",
    inputFields=inputs
)

session_manager.add_node_to_session(
    session_id="sample_session",
    nodeName=node1.nodeName,
    systemInstructions=node1.systemInstructions,
    userPrompt=node1.userPrompt,
    pythonCode=node1.pythonCode,
    outputSchema=node1.outputSchema
)
session_manager.add_node_to_session(
    session_id="sample_session",
    nodeName=node2.nodeName,
    systemInstructions=node2.systemInstructions,
    userPrompt=node2.userPrompt,
    pythonCode=node2.pythonCode,
    outputSchema=node2.outputSchema
)
session_manager.add_node_to_session(
    session_id="sample_session",
    nodeName=node3.nodeName,
    systemInstructions=node3.systemInstructions,
    userPrompt=node3.userPrompt,
    pythonCode=node3.pythonCode,
    outputSchema=node3.outputSchema
)

[1;32m2025-05-15 17:04:05,325 - INFO ==> Graph compiled with ID: 69437e07646e7665bd8f52a097bdcf7c4781965b07b3265a85bff1c9c9710111. Location: Graph.compile[0m
[1;32m2025-05-15 17:04:05,326 - INFO ==> Graph saved to ./saved_graphs/sample_session/graph.json. Location: Graph.save_graph[0m
[1;32m2025-05-15 17:04:05,326 - INFO ==> Graph compiled with ID: b59535467f2679f21c6d7a0e24ffc6a90652f5a6fd7d970b1cb71bfe0aa65450. Location: Graph.compile[0m
[1;32m2025-05-15 17:04:05,327 - INFO ==> Graph saved to ./saved_graphs/sample_session/graph.json. Location: Graph.save_graph[0m
[1;32m2025-05-15 17:04:05,327 - INFO ==> Graph compiled with ID: 2fd39d19cb1b3c8253ece592983efe2fb5f515a17b7550963ae60b6e278519f3. Location: Graph.compile[0m
[1;32m2025-05-15 17:04:05,327 - INFO ==> Graph saved to ./saved_graphs/sample_session/graph.json. Location: Graph.save_graph[0m
[1;32m2025-05-15 17:04:05,328 - INFO ==> Graph compiled with ID: c10870e23ea1d1e7253ca8e586fb4a6a5fcfc497876ded26f3d141dc61047a18

<__main__.GraphNode at 0x1181aedc0>

In [624]:
session_manager.execute_session(
    session_id="sample_session",
    start_node="Node2"
)

[1;32m2025-05-15 17:04:29,350 - INFO ==> Node Node3 status set to pending.[0m
[1;32m2025-05-15 17:04:29,351 - INFO ==> Node Node2 status set to pending.[0m
[1;32m2025-05-15 17:04:29,353 - INFO ==> Thread started for node: Node3 . Location: Graph.execute_from_node[0m
[1;32m2025-05-15 17:04:29,354 - INFO ==> Running node: Node2. Location: Graph._execute_node_with_condition[0m
[1;32m2025-05-15 17:04:29,355 - INFO ==> Thread started for node: Node2 . Location: Graph.execute_from_node[0m
[1;36m2025-05-15 17:04:34,388 - DEBUG ==> Subprocess output: {"output1": 63, "output2": 902}[0m
[1;32m2025-05-15 17:04:34,389 - INFO ==> Completed node: Node2 with result: {'output1': '63', 'output2': '902'} . Location: Graph._execute_node_with_condition[0m
[1;36m2025-05-15 17:04:34,389 - DEBUG ==> Notifying child node: Node3 . Location: Graph._execute_node_with_condition[0m
[1;32m2025-05-15 17:04:34,389 - INFO ==> Running node: Node3. Location: Graph._execute_node_with_condition[0m
[1;32

<__main__.Graph at 0x11837fc70>