Roal = Routing + Orchestration + Analytics + Logistics
代表框架的核心能力:数据路由、流程编排、分析处理、物流式流转。
Roal 是一个纯数据流转与任务编排框架,采用 Python 语言实现。
核心信条:框架不关心数据从哪来、到哪去、长什么样,只负责在任务之间传递数据、编排执行顺序、管理任务生命周期。
开发者拥有绝对自由:
- 数据源可以是文件、数据库、消息队列、HTTP 请求、硬件传感器、随机数生成器
- 数据格式可以是字符串、JSON、Protobuf、Avro、二进制字节流、Pandas DataFrame、NumPy 数组
- 处理逻辑可以是任何 Python 代码
框架职责边界:开发者写好 Task,框架负责串联执行。
Task 是框架中最基础的抽象,代表一个独立的处理单元。
开发者视角:Task 就是一个可调用对象(函数、lambda、类的 __call__ 方法),接收一个输入,产出一个输出。输入输出的类型完全由开发者决定。
框架视角:Task 是一个黑盒,框架只知道它有输入和输出,不关心内部实现。框架负责在合适的时机调用它,并把输出传递给下一个 Task。
定义方式:
- 普通函数
- 异步函数(async def)
- 生成器函数(yield)
- 实现了
__call__的类实例 - 使用
@task装饰器增强的任意可调用对象
Pipeline 是多个 Task 的有序组合,定义了数据从产生到消费的完整路径。
构成元素:
- 节点:每个 Task 是一个节点
- 边:表示数据流向,从上游 Task 的输出指向下游 Task 的输入
- 路由规则:定义数据如何从上游分发到下游
支持的拓扑结构:
- 串联:A → B → C
- 分流:A 的输出同时发给 B 和 C(广播)
- 合并:B 和 C 的输出都发给 D
- 条件分支:根据数据内容决定发送给哪个下游
- 扇出扇入:一对多再聚合
- 哈希路由:根据数据字段的哈希值选择下游
TaskContext 是框架暴露给开发者的服务接口,让 Task 在执行时能够获取框架提供的能力。
提供的服务:
- 配置获取:获取 Task 的配置参数,支持类型转换和默认值
- 状态存储:可选的 KV 存储,用于保存跨数据记录的 Task 状态
- 指标上报:向框架上报自定义指标(计数器、直方图、仪表)
- 日志记录:自动携带流水线 ID、Task 名称、数据 ID 的结构化日志
- 元数据访问:获取当前 Pipeline 的名称、运行 ID、启动时间等信息
- 数据属性:获取当前数据的元数据(来源、时间戳、序列号)
pip install roal# 安装 Redis 支持
pip install roal[redis]
# 安装异步文件 I/O 支持
pip install roal[aio]
# 安装 Kafka 支持
pip install roal[kafka]
# 安装 PostgreSQL 支持
pip install roal[postgres]
# 安装所有可选依赖
pip install roal[redis,aio,kafka,postgres]from roal import task, TaskContext
import json
@task(name="parse_json", workers=4)
async def parse_json(data: bytes, ctx: TaskContext) -> dict:
return json.loads(data)
@task
async def filter_errors(record: dict, ctx: TaskContext) -> dict:
if record.get("level") == "error":
return record
return None # 过滤非错误记录
@task
async def send_alert(error: dict, ctx: TaskContext) -> None:
print(f"Alert: {error['message']}")from roal import Pipeline
pipeline = Pipeline("log_processor")
pipeline.source(read_logs) \ # 假设 read_logs 是一个数据源 Task
.then(parse_json) \
.fork() \
.to(filter_errors, send_alert) \
.to(extract_fields, store_to_db) \
.merge() \
.then(generate_stats) \
.sink(write_output)
# 运行 Pipeline
pipeline.run()- 数据血缘追踪:框架为每一条数据记录分配全局唯一的 ID,并记录它经过的每一个 Task。
- 可观测性:自动采集吞吐量、延迟、队列深度等指标,支持 Prometheus 集成。
- 断点续传:框架定期保存每个 Task 的消费位点,支持从故障中恢复。
- 动态配置:支持运行时修改 Task 配置,无需重启 Pipeline。
- 测试支持:提供内存数据源和数据汇,简化单元测试。
- 实时日志分析:实时监控应用日志,错误日志发送告警,访问日志统计 QPS。
- 数据 ETL 管道:从业务库抽取数据,清洗转换后写入数仓,同时更新缓存。
- 物联网数据处理:接收设备传感器数据,异常检测,持久化,下发指令。
- AI 推理流水线:接收图片,预处理,模型推理,后处理,返回结果。
- 事件驱动工作流:用户操作触发一系列后续处理。
- 核心依赖:asyncio, uvloop, structlog, prometheus_client, PyYAML
- Python 版本:3.9+
- 操作系统:Linux / macOS / Windows
- 嵌入式部署:作为 Python 库直接嵌入现有应用。
- 独立服务:提供可选的 HTTP API 服务层。
- 容器化部署:提供官方 Docker 镜像。
- Kubernetes 部署:支持 K8s 部署,利用 Horizontal Pod Autoscaler 实现自动扩缩容。
| 特性 | Roal | Apache Flink | Apache Airflow | Prefect | Bytewax |
|---|---|---|---|---|---|
| 语言 | Python | Java/Python | Python | Python | Python |
| 编程模型 | 纯函数式 Task | 富算子 | DAG 定义 | 函数式 Flow | 数据流 |
| 部署复杂度 | 极低,pip install | 高,需要集群 | 中,需要调度器 | 低 | 低 |
| 学习曲线 | 几分钟 | 数周 | 数天 | 数小时 | 数小时 |
| 实时处理 | 原生支持 | 原生支持 | 分钟级调度 | 支持 | 原生支持 |
| 状态管理 | 可选,开发者控制 | 强制,框架管理 | 有限(XCom) | 支持 | 支持 |
| Python 生态集成 | 无缝 | 通过 PyFlink | 一般 | 良好 | 良好 |
| 适用场景 | 通用数据编排 | 大规模流计算 | 定时任务调度 | 数据工程 | 流处理 |
- 简单至上:API 应该让开发者在 5 分钟内理解并开始使用
- 零侵入:不强制继承基类,普通函数即可成为 Task
- 显式优于隐式:框架行为可预测,避免魔法
- 渐进式复杂度:简单场景只需几行代码,复杂场景提供完整扩展能力
- Pythonic:遵循 Python 社区惯例,利用语言特性简化设计
- 测试友好:提供一流测试支持,鼓励 TDD
roal/
├── pyproject.toml # 项目配置和依赖
├── README.md
├── docs/ # 文档
│ ├── getting-started.md
│ ├── core-concepts.md
│ ├── advanced/
│ └── api-reference/
├── roal/ # 核心源码
│ ├── __init__.py
│ ├── api/ # 公开 API
│ │ ├── __init__.py
│ │ ├── task.py # Task 协议和装饰器
│ │ ├── pipeline.py # Pipeline 类
│ │ └── context.py # TaskContext 类
│ ├── core/ # 内部实现
│ │ ├── dag.py # DAG 构建和拓扑排序
│ │ ├── router.py # 路由策略实现
│ │ └── executor.py # 执行引擎
│ ├── runtime/ # 运行时
│ │ ├── worker.py # Worker 协程管理
│ │ ├── queue.py # 异步队列封装
│ │ └── backpressure.py # 背压控制
│ ├── state/ # 状态管理
│ │ ├── base.py # StateStore 抽象
│ │ ├── memory.py # 内存存储
│ │ ├── redis.py # Redis 存储
│ │ └── file.py # 文件存储
│ ├── checkpoint/ # 断点续传
│ │ ├── manager.py
│ │ └── stores.py
│ ├── monitoring/ # 监控指标
│ │ ├── metrics.py
│ │ ├── tracing.py
│ │ └── prometheus.py
│ └── testing/ # 测试工具
│ ├── harness.py
│ └── fixtures.py
├── examples/ # 示例代码
│ ├── wordcount/
│ ├── log_analysis/
│ ├── etl_pipeline/
│ └── iot_processing/
└── tests/ # 测试
├── unit/
├── integration/
└── performance/
| 版本 | 里程碑 | 核心功能 |
|---|---|---|
| 0.1.0 | MVP | 基础 Task 和 Pipeline,支持串联和分支,asyncio 执行器 |
| 0.2.0 | 路由增强 | 条件路由、哈希路由、广播路由,Worker 并发 |
| 0.3.0 | 状态管理 | StateStore 抽象,内存/Redis 实现 |
| 0.4.0 | 可观测性 | 指标收集,Prometheus 集成,结构化日志 |
| 0.5.0 | 容错能力 | 断点续传,重试策略,死信队列 |
| 0.6.0 | 动态能力 | 运行时配置更新,动态扩缩容 |
| 0.7.0 | 进程扩展 | 多进程支持,CPU 密集型任务隔离 |
| 0.8.0 | API 服务 | HTTP API,Web UI 基础 |
| 0.9.0 | 生态集成 | 官方连接器(Kafka、Redis、PostgreSQL) |
| 1.0.0 | 稳定版 | API 稳定,生产就绪,完整文档 |
Roal 采用开放治理模式,欢迎社区贡献:
- 代码贡献:Fork → Feature Branch → Pull Request
- 文档改进:直接提交 PR 到 docs 目录
- Bug 报告:使用 GitHub Issues,附带最小复现示例
- 功能建议:先在 Discussions 中讨论,达成共识后再实现
开发环境搭建:
git clone https://github.com/RestRegular/roal.git
cd roal
python -m venv venv
source venv/bin/activate
pip install -e ".[dev]"
pytestRoal —— 让数据流转像呼吸一样自然