Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在RunBefore中如果有错误,则任务阻塞 #58

Open
AHHH32 opened this issue May 20, 2024 · 3 comments
Open

在RunBefore中如果有错误,则任务阻塞 #58

AHHH32 opened this issue May 20, 2024 · 3 comments

Comments

@AHHH32
Copy link

AHHH32 commented May 20, 2024

我试图在RunBefore函数中获取TaskInstance相关参数,但返回结果taskIns1为空指针,然后程序就会卡在fmt.Println("taskIns1.2", taskIns1.TaskID),没有显示的报错,附完整代码

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/shiningrush/fastflow"
	mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
	"github.com/shiningrush/fastflow/pkg/entity"
	"github.com/shiningrush/fastflow/pkg/entity/run"
	"github.com/shiningrush/fastflow/pkg/mod"
	mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}

func (a *PrintAction) RunBefore(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("-------- Run action before")
	ctx1 := ctx.Context()
	taskIns1, _ := ctx1.Value("running-task").(*entity.TaskInstance)
	fmt.Println("taskIns1.1", taskIns1)
	fmt.Println("taskIns1.2", taskIns1.TaskID)
	return nil
}

func (a *PrintAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action after")
	return nil
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action start: ", time.Now())
	taskIns, _ := entity.CtxRunningTaskIns(ctx.Context())
	fmt.Println(taskIns.TaskID)

	return nil
}

func main() {
	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

	// init keeper, it used to e
	keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
		Key: "worker-1",
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:mongodb@127.0.0.1:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := keeper.Init(); err != nil {
		log.Fatal(fmt.Errorf("init keeper failed: %w", err))
	}

	// init store
	st := mongoStore.NewStore(&mongoStore.StoreOption{
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:mongodb@127.0.0.1:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := st.Init(); err != nil {
		log.Fatal(fmt.Errorf("init store failed: %w", err))
	}

	go createDagAndInstance()

	// start fastflow
	if err := fastflow.Start(&fastflow.InitialOption{
		Keeper: keeper,
		Store:  st,
		// use yaml to define dag
		// 所有的yaml文件都会被执行
		ReadDagFromDir: "./",
	}); err != nil {
		panic(fmt.Sprintf("init fastflow failed: %s", err))
	}
}

func createDagAndInstance() {
	// wait fast start completed
	time.Sleep(time.Second)

	// run some dag instance
	// for i := 0; i < 10; i++ {
	_, err := mod.GetCommander().RunDag("test-dag", nil)
	if err != nil {
		log.Fatal(err)
	}
	// time.Sleep(time.Second * 10)
	// }
}
id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
    isIgnoreFiles:
      act: skip #you can set "skip" or "block"
      conditions:
      - source: vars # source could be "vars" or "share-data"
        key: "fileName"
        op: "in"
        values: ["warn.txt", "error.txt", "file1.txt"]
- id: "task2"
  actionName: "PrintAction"
  dependOn: ["task1"]
- id: "task3"
  actionName: "PrintAction"
  dependOn: ["task1"]
@ShiningRush
Copy link
Owner

可以检查下有没有panic日志,你是在单机模式下运行的吗

@AHHH32
Copy link
Author

AHHH32 commented May 21, 2024

可以检查下有没有panic日志,你是在单机模式下运行的吗

单机模式运行,没有输出日志什么的

@ShiningRush
Copy link
Owner

好的,我周末尝试本地复现下看看

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants