In [1]:
from pathlib import Path
from typing import Any, Dict
import random
from TELF.pipeline.blocks.base_block import AnimalBlock
from TELF.pipeline.blocks.data_bundle import DataBundle, SAVE_DIR_BUNDLE_KEY, RESULTS_DEFAULT
from TELF.pipeline import RepeatLoopBlock, BlockManager

# Define three dummy blocks


In [2]:
class LoadDataBlock(AnimalBlock):
    """
    Pretend to load an initial list of numbers under 'data'.
    """
    def __init__(self, **kw: Any):
        super().__init__(needs=(), provides=("data",), **kw)

    def run(self, bundle: DataBundle) -> DataBundle:
        initial = [random.randint(0, 100) for _ in range(3)]
        bundle[f"{self.tag}.{self.provides[0]}"] = initial
        print(f"[LoadDataBlock] loaded data = {initial}")
        return bundle

class TrainModelBlock(AnimalBlock):
    """
    Compute a “model” as the mean of bundle['data'].
    Also appends two random numbers to data for the next step.
    Provides 'model' and updated 'data'.
    """
    def __init__(self, **kw: Any):
        super().__init__(needs=("data",), provides=("model","data"), **kw)

    def run(self, bundle: DataBundle) -> DataBundle:
        data = bundle["data"]
        new_data = data + [random.randint(0, 100) for _ in range(2)]
        model = sum(new_data) / len(new_data)
        bundle[f"{self.tag}.{self.provides[0]}"] = model
        # append two new random values
        bundle[f"{self.tag}.{self.provides[1]}"] = new_data
        print(f"[TrainModelBlock] trained model = {model:.2f}, data → {new_data}")
        return bundle

class RemoveFirstDataBlock(AnimalBlock):
    """
    Remove the first element from bundle['data'].
    Needs 'data'; provides new 'data'.
    """
    def __init__(self, **kw: Any):
        super().__init__(needs=("data",), provides=("data",), **kw)

    def run(self, bundle: DataBundle) -> DataBundle:
        data = bundle["data"]
        if not data:
            new_data = []
        else:
            new_data = data[1:]
        bundle[f"{self.tag}.{self.provides[0]}"] = new_data
        print(f"[RemoveFirstDataBlock] dropped first → data now = {new_data}")
        return bundle


# EXAMPLE 1 - Sequential
## 1. Create a RepeatLoopBlock that repeats TrainModelBlock + RemoveFirstDataBlock

In [3]:
repeat_sequential_training = RepeatLoopBlock(
    subblocks=[TrainModelBlock(), RemoveFirstDataBlock()],
    n_iter=3,
    clone=False,              # carry state forward between iterations
    redirect_save_dir=True,   # writes into ./TrainLoop/iter_00/, iter_01/, …
    tag="TrainLoop",
)

[TrainModelBlock] needs → (data)   provides → (model, data)
[RemoveFirstDataBlock] needs → (data)   provides → (data)
[TrainLoop] needs → (data)   provides → (results)


## 2. Build and run the top-level pipeline

In [4]:
pipeline = BlockManager(
    blocks=[
        LoadDataBlock(),
        repeat_sequential_training,
    ],
    databundle=DataBundle({
       SAVE_DIR_BUNDLE_KEY: Path('example_results') / 'sequential_loop',
    }),
    verbose=True,
    capture_output=None,     # print directly to console
)
final_bundle = pipeline()

[LoadDataBlock] needs → (∅)   provides → (data)


Block (tag)                   │ Needs (✓/✗) │ Provides
──────────────────────────────────────────────────────
LoadDataBlock (LoadDataBlock) │             │ ['data']
RepeatLoopBlock (TrainLoop)   │ data        │ ['results']

▶  [1/2] LoadDataBlock …
[LoadDataBlock] loaded data = [90, 73, 50]
✓  [1/2] LoadDataBlock finished in 0.00s
▶  [2/2] TrainLoop …

▶  [1/3] starting iter=0


Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

▶  [1/2] TrainModelBlock …
✓  [1/2] TrainModelBlock finished in 0.00s
▶  [2/2] RemoveFirstDataBlock …
✓  [2/2] RemoveFirstDataBlock finished in 0.00s
✓  [1/3] finished iter=0

▶  [2/3] starting iter=1


Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

▶  [1/2] TrainModelBlock …
✓  [1/2] TrainModelBlock finished in 0.00s
▶  [2/2] RemoveFirstDataBlock …
✓  [2/2] RemoveFirstDataBlock finished in 0.00s
✓  [2/3] finished iter=1

▶  [3/3] starting iter=2


Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

▶  [1/2] TrainModelBlock …
✓  [1/2] TrainModelBlock finished in 0.00s
▶  [2/2] RemoveFirstDataBlock …
✓  [2/2] RemoveFirstDataBlock finished in 0.00s
✓  [3/3] finished iter=2
✓  [2/2] TrainLoop finished in 0.01s


## 3. Inspect the repeat results

In [5]:
results = final_bundle[f"{repeat_sequential_training.tag}.{RESULTS_DEFAULT}"]
print("\n=== RepeatLoop Results ===")
for idx, sub_dict in enumerate(results):
    model = sub_dict["model"]
    data = sub_dict["data"]
    print(f"Iteration {idx:02d} → model = {model:.2f}, data = {data}")


=== RepeatLoop Results ===
Iteration 00 → model = 66.80, data = [73, 50, 25, 96]
Iteration 01 → model = 56.50, data = [50, 25, 96, 82, 13]
Iteration 02 → model = 62.43, data = [25, 96, 82, 13, 74, 97]


# EXAMPLE 2 - Parallel

## 1. Create a RepeatLoopBlock that runs 4 iterations in parallel, each cloning the base DataBundle so they don't interfere.

In [6]:
repeat_parallel_training = RepeatLoopBlock(
    subblocks=[TrainModelBlock(), RemoveFirstDataBlock()],
    n_iter=4,
    clone=True,              # each iteration gets its own, independent copy
    parallel=True,           # run all 4 iterations concurrently
    max_workers=4,           # up to 4 threads
    redirect_save_dir=True,  # writes into ./ParallelTrainLoop/iter_00/, etc.
    tag="ParallelTrainLoop",
)

[TrainModelBlock] needs → (data)   provides → (model, data)
[RemoveFirstDataBlock] needs → (data)   provides → (data)
[ParallelTrainLoop] needs → (data)   provides → (results)


## 2. Build and run the top-level pipeline

In [7]:
pipeline = BlockManager(
    blocks=[
        LoadDataBlock(),
        repeat_parallel_training,
    ],
    databundle=DataBundle({
       SAVE_DIR_BUNDLE_KEY: Path('example_results') / 'parallel_loop',
    }),
    verbose=True,
    capture_output=None,     # print directly to console
)
final_bundle = pipeline()


[LoadDataBlock] needs → (∅)   provides → (data)


Block (tag)                         │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────
LoadDataBlock (LoadDataBlock)       │             │ ['data']
RepeatLoopBlock (ParallelTrainLoop) │ data        │ ['results']

▶  [1/2] LoadDataBlock …
[LoadDataBlock] loaded data = [80, 51, 45]
✓  [1/2] LoadDataBlock finished in 0.00s
▶  [2/2] ParallelTrainLoop …


Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']



Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

Block (tag)                                 │ Needs (✓/✗) │ Provides
────────────────────────────────────────────────────────────────────
TrainModelBlock (TrainModelBlock)           │ data        │ ['model', 'data']
RemoveFirstDataBlock (RemoveFirstDataBlock) │ data        │ ['data']

▶  [1/2] TrainModelBlock …
▶  [1/2] TrainModelBlock …▶  [1/2] TrainModelBlock …



[TrainModelBlock] trained model = 63.40, data → [80, 51, 45, 82, 59]✓  [1/2] TrainModelBlock finished in 0.00s



## 3. Inspect the parallel-repeat results

In [8]:
results = final_bundle[f"{repeat_parallel_training.tag}.{RESULTS_DEFAULT}"]
print("\n=== ParallelRepeatLoop Results ===")
for idx, sub_dict in enumerate(results):
    model = sub_dict["model"]
    data = sub_dict["data"]
    print(f"Iteration {idx:02d} → model = {model:.2f}, data = {data}")