# CPU graph

An example of how to run several CPU intensive nodes in parallel and aggregate the results.

## Example worker

The example worker code is [here](./example_workers/cpu_worker/main.py).
It uses an encryption function as a CPU intensive worker function.

## Generating stubs

Since this worker uses the Tierkreis Python library, we can automatically generate stub files using the following command.
The stub files will provide us with type hints in the graph building process later on.

In [None]:
# Generate stubs file.
!cd ./example_workers/cpu_worker && uv run main.py --stubs-path ../../cpu_worker_stubs.py
# Format and lint generated file.
!uv run ruff format cpu_worker_stubs.py
!uv run ruff check --fix cpu_worker_stubs.py

We also place stub files for the Tierkreis builtins worker in this directory also.

In [None]:
# Generate stubs file.
!cd ../tierkreis/tierkreis/controller/builtins/ && uv run main.py --stubs-path ../../../../examples/tkr_builtins.py
# Format and lint generated file.
!uv run ruff format tkr_builtins.py
!uv run ruff check --fix tkr_builtins.py

## Writing a graph

We can import this stub file to help create our graph.

In [None]:
from typing import NamedTuple
from tierkreis.controller.data.core import EmptyModel, TKRRef
from tierkreis.controller.data.graph import GraphBuilder
from cpu_worker_stubs import encrypt, encryptOutput
from tkr_builtins import mean


def map_body() -> GraphBuilder[TKRRef[str], encryptOutput]:
    g = GraphBuilder(TKRRef[str])
    result = g.fn(encrypt(plaintext=g.inputs, work_factor=g.const(2**14)))
    return g.outputs(result)


class GraphOutputs(NamedTuple):
    average_time_taken: TKRRef[float]
    ciphertexts: TKRRef[list[str]]


def graph() -> GraphBuilder[EmptyModel, GraphOutputs]:
    g = GraphBuilder()
    plaintexts_list = g.const([f"plaintext+{n}" for n in range(20)])
    plaintexts = g.unfold_list(plaintexts_list)
    results = g.map(g.graph_const(map_body()), plaintexts)

    ciphertexts = results.map(lambda x: x.ciphertext)
    ciphertexts_list = g.fold_list(ciphertexts)

    times = results.map(lambda x: x.time_taken)
    times_list = g.fold_list(times)

    av = g.fn(mean(values=times_list))
    out = GraphOutputs(ciphertexts=ciphertexts_list, average_time_taken=av)

    return g.outputs(out)

## Running the graph

In order to run a graph we need to choose a storage backend and executor.
In this example we choose a simple filestorage backend and the UV executor.
For the UV executor the registry path should be a folder continaing all the  workers we use.

Then we pass the storage, executor, graph into the `run_graph` function.
At this point we have the option to pass additional graph inputs.

In [None]:
import json
from pathlib import Path
import time
from uuid import UUID
from tierkreis.controller import run_graph
from tierkreis.controller.data.location import Loc
from tierkreis.controller.executor.uv_executor import UvExecutor
from tierkreis.controller.storage.filestorage import ControllerFileStorage

storage = ControllerFileStorage(UUID(int=2048), "cpu_graph", do_cleanup=True)
executor = UvExecutor(
    registry_path=Path("./example_workers"), logs_path=storage.logs_path
)
start = time.time()
run_graph(storage, executor, graph().data, {})
total_time = time.time() - start

av = json.loads(storage.read_output(Loc(), "average_time_taken"))
ciphertexts = json.loads(storage.read_output(Loc(), "ciphertexts"))
print(f"Encrypted 20 plaintexts in {total_time:1g}s with mean encryption time {av:1g}")

We should see that the total time to encrypt the 20 plaintexts is quite close to the time taken for the whole workflow, which indicates that the encryptions were run in parallel.