In [1]:
import sys
import os

sys.path.append(os.path.abspath("../.."))

from tiny_graph.graph.executable import Graph
from tiny_graph.constants import START, END

## Testing execution plan conversion


In [None]:
from rich import print as rprint

rprint(simple_graph._convert_execution_plan())

## Testing execution


In [16]:
import time
from datetime import datetime

egraph = Graph()


@egraph.node()
def a():
    time.sleep(1)
    print("a \n", datetime.now())


@egraph.node()
def b():
    time.sleep(1)
    print("b \n", datetime.now())


@egraph.node()
def c():
    time.sleep(1)
    print("c \n", datetime.now())


@egraph.node()
def d():
    time.sleep(1)
    print("d \n", datetime.now())


@egraph.node()
def e():
    time.sleep(1)
    print("e \n", datetime.now())


@egraph.node()
def f():
    time.sleep(1)
    print("f", datetime.now())


@egraph.node()
def g():
    time.sleep(1)
    print("g \n", datetime.now())


egraph.add_edge(START, "a")
egraph.add_edge("a", "b")
egraph.add_edge("b", "c")
egraph.add_edge("b", "d")
egraph.add_edge("d", "e")
egraph.add_edge("c", "f")
egraph.add_edge("f", "g")
egraph.add_edge("e", "g")
egraph.add_edge("g", END)

egraph.compile()
egraph.visualize()

In [None]:
egraph.execute()

In [10]:
egraph = Graph()


@egraph.node()
def a():
    time.sleep(1)
    print("a", datetime.now())


@egraph.node()
def b():
    time.sleep(1)
    print("b", datetime.now())


@egraph.node()
def c():
    time.sleep(1)
    print("c", datetime.now())


@egraph.node()
def d():
    time.sleep(1)
    print("d", datetime.now())


@egraph.node()
def e():
    time.sleep(1)
    print("e", datetime.now())


@egraph.node()
def f():
    time.sleep(1)
    print("f", datetime.now())


@egraph.node()
def g():
    time.sleep(1)
    time.sleep(1)
    print("g", datetime.now())


egraph.add_edge(START, "a")
egraph.add_edge("a", "b")
egraph.add_edge("b", "c")
egraph.add_edge("c", "d")
egraph.add_edge("d", "e")
egraph.add_edge("e", "f")
egraph.add_edge("f", "g")
egraph.add_edge("g", END)

egraph.compile()
egraph.visualize()

In [None]:
egraph.execute()

In [None]:
from rich import print as rprint

# Create a test graph with a mix of parallel and sequential paths
test_graph = Graph()


@test_graph.node()
def start_task():
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Starting workflow \n")
    time.sleep(1)


@test_graph.node()
def parallel_task_1():
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Starting parallel task 1 \n")
    time.sleep(4)
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Finished parallel task 1 \n")


@test_graph.node()
def parallel_task_2():
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Starting parallel task 2 \n")
    time.sleep(4)
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Finished parallel task 2 \n")


@test_graph.node()
def final_task():
    print(f"[{datetime.now().strftime('%H:%M:%S.%f')}] Running final task \n")
    time.sleep(1)


# Create a workflow with parallel execution
test_graph.add_edge(START, "start_task")
test_graph.add_edge("start_task", "parallel_task_1")
test_graph.add_edge("start_task", "parallel_task_2")
test_graph.add_edge("parallel_task_1", "final_task")
test_graph.add_edge("parallel_task_2", "final_task")
test_graph.add_edge("final_task", END)

# Compile and execute
test_graph.compile()
print("\nExecution Plan:")
rprint(test_graph.execution_plan)
print("\nStarting execution:")
start_time = time.time()
test_graph.execute()
end_time = time.time()
print(f"\nTotal execution time: {end_time - start_time:.2f} seconds")

## Testing state


In [None]:
from pydantic import BaseModel


class State(BaseModel):
    a: int
    b: int


s = State(a=1, b=2)
basic_graph = Graph(state=s)


@basic_graph.node()
def slow_task():
    time.sleep(3)  # Task that takes too long


@basic_graph.node()
def normal_task():
    pass


basic_graph.add_edge(START, "slow_task")
basic_graph.add_edge("slow_task", "normal_task")
basic_graph.add_edge("normal_task", END)
basic_graph.compile()

basic_graph.state_schema

In [2]:
from pydantic import BaseModel
from tiny_graph.models.base import GraphState
from typing import Dict
import time
from tiny_graph.buffer.factory import History, Incremental, LastValue


# Define a state model with different buffer types
class TestState(GraphState):
    counter: Incremental[int]  # Will accumulate values
    metrics: History[Dict[str, float]]  # Will keep history of all updates
    current_status: LastValue[str]  # Will only keep last value


# Initialize the graph with state
state = TestState(counter=0, metrics={}, current_status="")

test_graph = Graph(state=state)


# Test Incremental Buffer
@test_graph.node()
def add_to_counter(state):
    time.sleep(0.5)
    return {"counter": 5}


@test_graph.node()
def add_more_to_counter(state):
    time.sleep(0.5)
    return {"counter": 3}


# Test History Buffer
@test_graph.node()
def add_metrics(state):
    time.sleep(0.5)
    return {"metrics": {"accuracy": 0.95, "loss": 0.1}}


@test_graph.node()
def update_metrics(state):
    time.sleep(0.5)
    return {"metrics": {"loss": 0.05, "precision": 0.88}}


# Test LastValue Buffer
@test_graph.node()
def set_status_running(state):
    time.sleep(0.5)
    return {"current_status": "running"}


@test_graph.node()
def set_status_complete(state):
    time.sleep(0.5)
    return {"current_status": "complete"}


# Create the workflow
test_graph.add_edge(START, "add_to_counter")
test_graph.add_edge("add_to_counter", "add_more_to_counter")
test_graph.add_edge("add_more_to_counter", "add_metrics")
test_graph.add_edge("add_metrics", "update_metrics")
test_graph.add_edge("update_metrics", "set_status_running")
test_graph.add_edge("set_status_running", "set_status_complete")
test_graph.add_edge("set_status_complete", END)

# Compile and execute
test_graph.compile()
test_graph.execute()

# Print final state
print("\nFinal State:")
print(f"Counter: {test_graph.state.counter}")  # Should be 8 (5 + 3)
print(
    f"Metrics History: {test_graph.state.metrics}"
)  # Should contain both metric updates
print(f"Current Status: {test_graph.state.current_status}")  # Should be "complete"

Incremental
History
LastValue

Final State:
Counter: 0
Metrics History: {}
Current Status: 


In [None]:
isinstance(test_graph.state, (BaseModel, GraphState))

In [None]:
test_graph.state_schema

In [3]:
test_graph.buffers

{'counter': <tiny_graph.buffer.incremental.IncrementalBuffer at 0x108dc5bd0>,
 'metrics': <tiny_graph.buffer.history.HistoryBuffer at 0x108d7aad0>,
 'current_status': <tiny_graph.buffer.last_value.LastValueBuffer at 0x108d7aa50>}

In [4]:
test_graph.state_schema

{'counter': tiny_graph.buffer.factory.Incremental[int],
 'metrics': tiny_graph.buffer.factory.History[Dict],
 'current_status': tiny_graph.buffer.factory.LastValue[str]}

In [5]:
# printing all buffers
for buffer in test_graph.buffers.values():
    print(buffer.field_name)
    print(buffer.value)

counter
8
metrics
[{'accuracy': 0.95, 'loss': 0.1}, {'loss': 0.05, 'precision': 0.88}]
current_status
complete


In [None]:
from tiny_graph.buffer.factory import BufferTypeMarker

issubclass(test_graph.state_schema["counter"], BufferTypeMarker)

In [None]:
test_graph.state_schema["counter"].__bases__[0].__name__

In [None]:
test_graph.state_schema["metrics"].__get_pydantic_core_schema__()

In [None]:
test_graph.state_schema

In [None]:
test_graph.state_schema["metrics"]._inner_type

In [None]:
get_args(test_graph.state_schema["counter"])