In [1]:
from neopipe.task import FunctionSyncTask, ClassSyncTask
from neopipe.result import Result, Ok, Err
import inspect
from typing import get_origin

In [2]:
@FunctionSyncTask.decorator()
def start(_: Result[int, str]) -> Result[int, str]:
    return Ok(10)

@FunctionSyncTask.decorator()
def increment(x: Result[int, str]) -> Result[int, str]:
    return Ok(x.unwrap() + 1)

@FunctionSyncTask.decorator()
def fail_if_even(x: Result) -> Result[int, str]:
    if x.unwrap() % 2 == 0:
        return Err("Even number not allowed")
    return Ok(x)

# ---------------------------
# ClassSyncTask example
# ---------------------------

class MultiplyTask(ClassSyncTask[int, str]):
    def __init__(self, multiplier: int):
        super().__init__()
        self.multiplier = multiplier

    def execute(self, value: Result) -> Result[int, str]:
        return Ok(value.unwrap() * self.multiplier)

In [3]:
multiply_task = MultiplyTask(3)

In [4]:
multiply_task(Ok(5))

2025-04-16 21:50:31 - neopipe.task - INFO - [MultiplyTask] Attempt 1 - Task ID: 4d383711-39ef-4d57-abb6-a97153fd217a
2025-04-16 21:50:31 - neopipe.task - INFO - [MultiplyTask] Success on attempt 1


Ok(15)

In [9]:
sig = inspect.signature(increment.execute)
# sig
params = list(sig.parameters.values())
non_self_params = [p for p in params if p.name != "self"]

In [19]:
non_self_params[0].annotation

neopipe.result.Result[~T, ~E]

In [None]:
get_origin(non_self_params[0].annotation) is Result

In [5]:
next_sig = inspect.signature(increment.execute)
next_params = [p for p in next_sig.parameters.values() if p.name != "self"]

In [6]:
expected_input_type = next_params[0].annotation

In [7]:
expected_input_type

neopipe.result.Result[~T, ~E]

In [6]:
sig_class = inspect.signature(MultiplyTask.execute)
sig_class

<Signature (self, value: neopipe.result.Result) -> neopipe.result.Result[int, str]>

In [7]:
inspect.signature(increment.execute)

<Signature (input_result: neopipe.result.Result[~T, ~E]) -> neopipe.result.Result[~U, ~E]>