Skip to content

Sunk011/MultiTaskFlow

 
 

Repository files navigation

MultiTaskFlow 多任务流管理工具

MultiTaskFlow 是一个轻量级的多任务流管理工具,用于按顺序执行和监控一系列任务。它可以帮助您管理数据处理、模型训练、评估等一系列需要顺序执行的任务,并提供实时状态更新和执行结果跟踪。

功能特点

  • 基于YAML配置文件定义任务流
  • 支持Python脚本和Shell命令的执行
  • 提供任务状态实时监控
  • 自动执行失败任务的重试逻辑
  • 支持任务之间的依赖关系
  • 完整的日志记录和任务执行历史
  • 进程PID跟踪与管理
  • 优雅的信号处理和任务终止
  • 支持静默模式,可跳过消息通知

安装方法

要求

  • Python 3.7+
  • PyYAML
  • 其他依赖库(如有)

配置消息推送令牌和静默模式

消息推送令牌

在使用消息推送功能前,需要配置 MSG_PUSH_TOKEN 环境变量。以下是配置方法:

1. 永久配置(推荐)

~/.bashrc~/.zshrc 文件中添加:

# MultiTaskFlow 消息推送配置
export MSG_PUSH_TOKEN="your_pushplus_token_here"

然后重新加载配置:

source ~/.bashrc  # 或 source ~/.zshrc
2. 临时配置

在运行命令前设置:

MSG_PUSH_TOKEN=your_token python your_script.py
3. 开发模式配置

在项目根目录创建 .env 文件:

echo "MSG_PUSH_TOKEN=your_token" > .env

静默模式配置

如果您不希望收到消息通知,可以启用静默模式:

1. 永久配置静默模式

~/.bashrc~/.zshrc 文件中添加:

# MultiTaskFlow 静默模式配置
export MTF_SILENT_MODE=true

然后重新加载配置:

source ~/.bashrc  # 或 source ~/.zshrc
2. 临时配置静默模式

在运行命令前设置:

MTF_SILENT_MODE=true taskflow your_tasks.yaml
3. 开发模式配置静默模式

在项目根目录创建或编辑 .env 文件:

echo "MTF_SILENT_MODE=true" >> .env

获取 Token

  1. 访问 PushPlus 官网
  2. 注册并登录
  3. 在个人中心获取您的 token

(方法1)从PyPI安装

# 使用pip直接安装
pip install multitaskflow

(方法2)从源码安装

# 克隆仓库
git clone https://github.com/Polaris-F/MultiTaskFlow.git
cd MultiTaskFlow

# 方法1: 使用pip直接安装
pip install .

# 方法2: 开发模式安装
pip install -e .

(方法3)构建离线包方法

如果您想构建wheel包或源码分发包,可以使用以下命令:

# 安装构建工具
pip install build

# 构建分发包
python -m build

# 构建的包会在dist/目录下生成

使用方法

如果需要使用 消息接收功能,请访问 https://www.pushplus.plus/ 获取您的token

1. 创建任务配置文件

创建一个YAML格式的任务配置文件,定义您要执行的任务序列:

# tasks.yaml 示例
- name: "任务1-数据准备"
  command: "python scripts/prepare_data.py --input data/raw --output data/processed"
  status: "pending"

- name: "任务2-模型训练"
  command: "python scripts/train_model.py --data data/processed --epochs 10"
  status: "pending"

- name: "任务3-结果评估"
  command: "python scripts/evaluate.py --model-path models/latest.pt"
  status: "pending"

2. (方法一)使用Python API (推荐使用方法二、三)

在您的Python代码中使用MultiTaskFlow:

from multitaskflow import TaskFlow

# 创建任务流管理器
task_manager = TaskFlow("path/to/your/tasks.yaml")

# 启动任务执行
task_manager.run()

# 您也可以动态添加任务
task_manager.add_task_by_config(
    name="额外任务", 
    command="echo '这是一个动态添加的任务'"
)

2. (方法二)使用命令行工具【使用场景:不需要后台运行,可实时查看输出】

安装后,您可以直接使用taskflow命令行工具:

# 使用配置文件运行任务流
taskflow path/to/your/tasks.yaml

# 使用默认配置
# 如果不提供配置文件路径,将在examples/tasks.yaml创建示例配置
taskflow

# 查看帮助
taskflow --help

2. (方法三)使用sh脚本工具【使用场景:需要后台运行,通过log查看输出】

首先taskflowPro.sh修改脚本中 TASK_CONFIG为任务流yaml路径

chmod +x taskflowPro.sh
./taskflowPro.sh start  # 开始运行
./taskflowPro.sh stop   # 结束运行

效果展示

您可以运行我们提供的演示脚本,查看任务管理和消息接收的实际效果。演示脚本模拟了一个完整的深度学习工作流,包括数据预处理、模型训练、模型评估和数据归档等步骤。

运行演示脚本

# 安装完成后,直接运行示例脚本
python -m multitaskflow.examples.demo

# 或使用命令行工具
taskflow examples/tasks.yaml

演示内容

演示脚本将依次执行以下任务:

  1. 数据预处理 - 模拟数据集加载、清洗和处理过程
  2. 模型训练-阶段1 - 模拟第一阶段模型训练过程
  3. 模型评估-阶段1 - 模拟对第一阶段训练模型的评估
  4. 模型训练-阶段2 - 模拟基于第一阶段模型继续训练
  5. 模型评估-阶段2 - 模拟对第二阶段训练模型的评估
  6. 数据归档 - 模拟模型和结果数据的归档过程

每个任务都会显示详细的执行进度和模拟输出,让您直观了解MultiTaskFlow的任务管理能力。所有演示任务都是模拟执行,不会创建实际文件或占用大量资源。

期望效果

运行示例后,您将看到:

  • 任务管理器启动和初始化过程
  • 任务状态的实时更新(等待中→执行中→完成/失败)
  • 每个任务的详细输出和进度信息
  • 任务完成后的状态汇总

通过观察演示效果,您可以了解MultiTaskFlow如何帮助管理复杂的多步骤工作流程,以及它如何提供清晰的任务执行状态和结果反馈。

运行效果截图

任务管理和执行效果

实际运行时在控制台中会看到详细的输出,显示任务状态和进度信息

高级功能(TODO)

任务配置选项

任务配置文件支持以下选项:

- name: "示例任务"
  command: "python script.py"
  status: "pending"  # pending, running, completed, failed
  retry: 3  # 失败后重试次数 (TODO)
  timeout: 3600  # 任务超时时间(秒)(TODO)
  depends_on: ["前置任务名称"]  # 依赖的任务 (TODO)

静默模式

MultiTaskFlow 支持静默模式,在此模式下不会发送任何消息通知。这对于以下场景非常有用:

  • 生产环境部署:在生产环境中运行时,可能不需要消息通知
  • 调试阶段:开发和调试过程中避免频繁接收通知
  • 批量任务:执行大量批处理任务时,只关注最终结果而非每个任务
  • CI/CD 流程:在自动化构建流水线中使用,避免触发过多通知

启用静默模式

静默模式通过环境变量 MTF_SILENT_MODE 控制:

# 启用静默模式
export MTF_SILENT_MODE=true

# 临时启用
MTF_SILENT_MODE=true taskflow tasks.yaml

支持的值:

  • 设为 true, 1, yes, on 表示启用静默模式
  • 不设置或设为其他值表示禁用静默模式

静默模式的工作原理

当启用静默模式时:

  1. 所有任务执行完成后不会发送消息通知
  2. 任务管理器完成时不会发送总结报告
  3. 所有操作和结果仍会记录在日志文件中
  4. 控制台输出不受影响,仍然会显示所有信息

注意,静默模式只影响消息通知行为,不会改变任务的实际执行过程。

自定义通知

您可以配置系统在任务状态变更时发送通知:

from multitaskflow import TaskFlow, Msg_push

# 创建消息推送实例
notifier = Msg_push(
    webhook_url="your_webhook_url",
    channel="your_channel"
)

# 创建带通知功能的任务流管理器
task_manager = TaskFlow(
    "tasks.yaml",
    msg_push=notifier
)

自定义与扩展

MultiTaskFlow设计为可扩展的,您可以:

  • 自定义任务状态处理逻辑
  • 添加新的任务类型
  • 扩展监控和报告功能

自定义任务处理器示例

from multitaskflow import TaskFlow

class CustomTaskFlow(TaskFlow):
    def process_task_output(self, task, output):
        # 自定义输出处理逻辑
        print(f"处理任务 {task.name} 的输出: {output}")
        # 继续处理...
        super().process_task_output(task, output)

常见问题(FAQ)

Q: XXXX?

贡献指南

欢迎贡献代码、报告问题或提出新功能建议!

  1. Fork 这个仓库
  2. 创建您的特性分支 (git checkout -b feature/amazing-feature)
  3. 提交您的更改 (git commit -m 'Add some amazing feature')
  4. 推送到分支 (git push origin feature/amazing-feature)
  5. 打开一个 Pull Request

版本历史

  • 1.0.0 - 2024-03-15
    • 首次发布
    • 基本任务管理功能
    • 命令行工具支持

许可证

本项目采用MIT许可证 - 详情请查看 LICENSE 文件

作者与致谢

  • 主要开发者: Polaris
  • 感谢所有贡献者和使用者的宝贵反馈

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 84.2%
  • Shell 15.8%