In [3]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Protocol, Callable, runtime_checkable


@dataclass
class Context:
    """流水线的‘工件’，所有步骤都在它上面读写数据"""

    data: dict[str, Any]


@runtime_checkable
class Step(Protocol):
    """流水线步骤接口"""

    """每个步骤都要实现 run(ctx) -> ctx"""

    def run(self, ctx: Context) -> Context: ...


class Pipeline:
    """流水线"""

    """流水线由步骤组成，每个步骤按顺序执行"""

    def __init__(self, steps: list[Step]):
        self.steps = steps

    def run(self, ctx: Context) -> Context:
        for step in self.steps:
            ctx = step.run(ctx)
        return ctx


##步骤
class AddUser:
    """添加用户"""

    def run(self, ctx: Context) -> Context:
        ctx.data["user"] = {"name": "张三", "age": 18}
        return ctx


class ValidateAge:
    def run(self, ctx: Context) -> Context:
        age = ctx.data["user"]["age"]
        if age < 0:
            raise ValueError("年龄不合法")
        return ctx


class MakeGreeting:
    def run(self, ctx: Context) -> Context:
        name = ctx.data["user"]["name"]
        ctx.data["greeting"] = f"你好，{name}！"
        return ctx


p = Pipeline([AddUser(), ValidateAge(), MakeGreeting()])
out = p.run(Context(data={}))
print(out.data)


{'user': {'name': '张三', 'age': 18}, 'greeting': '你好，张三！'}


In [4]:
StepFn = Callable[[Context], Context]


class FnStep:
    def __init__(self, fn: StepFn):
        self.fn = fn

    def run(self, ctx: Context) -> Context:
        return self.fn(ctx)


def add_user(ctx: Context) -> Context:
    ctx.data["user"] = {"name": "李四", "age": 20}
    return ctx


def greeting(ctx: Context) -> Context:
    ctx.data["greeting"] = f"Hi {ctx.data['user']['name']}"
    return ctx


p = Pipeline([FnStep(add_user), FnStep(greeting)])
print(p.run(Context({})).data)


{'user': {'name': '李四', 'age': 20}, 'greeting': 'Hi 李四'}


In [5]:
# L2 可观察：日志、耗时、trace_id、埋点
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
import uuid
import time


@dataclass
class Context:
    data: dict[str, Any] = field(default_factory=dict)
    trace_id: str = field(default_factory=lambda: uuid.uuid4().hex)
    # timeline 用于记录每个 step 的耗时/状态
    timeline: list[dict[str, Any]] = field(default_factory=list)
    # 可选：埋点用的 tags（比如业务字段、tenant、platform）
    tags: dict[str, str] = field(default_factory=dict)

    def mark(self, step: str, status: str, ms: float, error: str | None = None) -> None:
        self.timeline.append(
            {
                "step": step,
                "status": status,
                "ms": round(ms, 3),
                "error": error,
                "ts": time.time(),
            }
        )


# 日志
import logging

logger = logging.getLogger("pipeline")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter(
    fmt="%(asctime)s %(levelname)s trace_id=%(trace_id)s %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)


class TraceLoggerAdapter(logging.LoggerAdapter):
    def process(self, msg, kwargs):
        extra = kwargs.setdefault("extra", {})
        extra["trace_id"] = self.extra.get("trace_id", "-")
        return msg, kwargs


# 用法
# logger = TraceLoggerAdapter(logger, {"trace_id": "123"})

from typing import Protocol, Any
import time


class Step(Protocol):
    def run(self, ctx: Context) -> Context: ...


from collections import defaultdict


class MetricsSink:
    def __init__(self):
        self.counters = defaultdict(int)
        self.timings_ms: dict[str, list[float]] = defaultdict(list)

    def inc(self, name: str, value: int = 1, **labels: str) -> None:
        key = self._key(name, labels)
        self.counters[key] += value

    def timing(self, name: str, ms: float, **labels: str) -> None:
        key = self._key(name, labels)
        self.timings_ms[key].append(ms)

    def _key(self, name: str, labels: dict[str, str]) -> str:
        if not labels:
            return name
        items = ",".join(f"{k}={v}" for k, v in sorted(labels.items()))
        return f"{name}{{{items}}}"


class InstrumentedPipeline:
    def __init__(
        self,
        steps: list[Step],
        metrics: MetricsSink | None = None,
        name: str = "pipeline",
    ):
        self.steps = steps
        self.metrics = metrics
        self.name = name

    def run(self, ctx: Context) -> Context:
        log = TraceLoggerAdapter(logger, {"trace_id": ctx.trace_id})

        for step in self.steps:
            step_name = step.__class__.__name__
            s0 = time.perf_counter()
            try:
                ctx = step.run(ctx)
            except Exception as e:
                ms = (time.perf_counter() - s0) * 1000
                ctx.mark(step_name, "error", ms, error=str(e))
                if self.metrics:
                    self.metrics.inc(
                        "pipeline_step_total", status="error", step=step_name
                    )
                    self.metrics.timing(
                        "pipeline_step_ms", ms, status="error", step=step_name
                    )
                log.exception(f"step error name={step_name} ms={ms:.2f}")
                raise
            else:
                ms = (time.perf_counter() - s0) * 1000
                ctx.mark(step_name, "ok", ms)
                if self.metrics:
                    self.metrics.inc("pipeline_step_total", status="ok", step=step_name)
                    self.metrics.timing(
                        "pipeline_step_ms", ms, status="ok", step=step_name
                    )
                log.info(f"step end name={step_name} ms={ms:.2f}")

        return ctx


class AddUser:
    def run(self, ctx: Context) -> Context:
        ctx.data["user"] = {"name": "张三", "age": 18}
        return ctx


class SlowStep:
    def run(self, ctx: Context) -> Context:
        import time

        time.sleep(0.12)
        return ctx


class Boom:
    def run(self, ctx: Context) -> Context:
        if ctx.data.get("boom"):
            raise RuntimeError("模拟异常")
        return ctx


m = MetricsSink()
p = InstrumentedPipeline([AddUser(), SlowStep(), Boom()], metrics=m)

ctx = Context({"boom": False})
out = p.run(ctx)

print("timeline:", out.timeline)
print("metrics counters:")
for k, v in m.counters.items():
    print(" ", k, v)


2025-12-16 10:42:56,424 INFO trace_id=540d3df704a74cb695b8cb9c9aae571a step end name=AddUser ms=0.00
2025-12-16 10:42:56,546 INFO trace_id=540d3df704a74cb695b8cb9c9aae571a step end name=SlowStep ms=120.55
2025-12-16 10:42:56,546 INFO trace_id=540d3df704a74cb695b8cb9c9aae571a step end name=Boom ms=0.00


timeline: [{'step': 'AddUser', 'status': 'ok', 'ms': 0.001, 'error': None, 'ts': 1765852976.424784}, {'step': 'SlowStep', 'status': 'ok', 'ms': 120.552, 'error': None, 'ts': 1765852976.5461514}, {'step': 'Boom', 'status': 'ok', 'ms': 0.002, 'error': None, 'ts': 1765852976.5468924}]
metrics counters:
  pipeline_step_total{status=ok,step=AddUser} 1
  pipeline_step_total{status=ok,step=SlowStep} 1
  pipeline_step_total{status=ok,step=Boom} 1


In [1]:
class CounterStep:
    def __init__(self):
        self.count = 0

    def run(self, x):
        self.count += 1
        print("count =", self.count)
        return x


def run(steps):
    x = None
    for step in steps:
        x = step.run(x)


steps = [CounterStep(), CounterStep()]


for _ in range(3):
    run(steps)


count = 1
count = 1
count = 2
count = 2
count = 3
count = 3
