Skip to content

Commit

Permalink
#### Version 0.9.9
Browse files Browse the repository at this point in the history
* feature: TaskContext增加TimeoutContext与TimeoutCancel属性,用于超时控制,应用可根据需要从TaskContext获取
* feature: TaskInfo增加SetTimeout,用于设置超时时间,单位为秒
* refactor: 移除TaskInfo.Context()
* 2020-04-25 16:00 at ShangHai
  • Loading branch information
devfeel committed Apr 25, 2020
1 parent 29b4882 commit cbcb4b0
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 157 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* 支持Exception、OnBegin、OnEnd注入点
* 支持单独执行TaskHandler
* 支持代码级重设Task的相关设置
* 支持超时控制
* 内建Task运行计数信息,包含执行与异常计数
* 内建针对Task与Counter的OutputHttpHandler,可方便与WebServer自动集成

Expand Down
28 changes: 28 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package task

import "context"

//Task上下文信息
type TaskContext struct {
TaskID string
TaskData interface{} //用于当前Task全局设置的数据项
Message interface{} //用于每次Task执行上下文消息传输
IsEnd bool //如果设置该属性为true,则停止当次任务的后续执行,一般用在OnBegin中
Error error
Header map[string]interface{}
TimeoutContext context.Context
TimeoutCancel context.CancelFunc
doneChan chan struct{}
}

func (c TaskContext) reset() {
c.TaskID = ""
c.TaskData = nil
c.Message = nil
c.IsEnd = false
c.Error = nil
c.Header = nil
c.TimeoutContext = nil
c.TimeoutCancel = nil
c.doneChan = nil
}
99 changes: 59 additions & 40 deletions crontask.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (task *CronTask) GetConfig() *TaskConfig {
DueTime: task.DueTime,
Interval: 0,
Express: task.RawExpress,
TaskData: task.Context().TaskData,
TaskData: task.TaskData,
}
}

Expand All @@ -58,7 +58,7 @@ func (task *CronTask) Reset(conf *TaskConfig) error {
task.Stop()
task.IsRun = conf.IsRun
if conf.TaskData != nil {
task.Context().TaskData = conf.TaskData
task.TaskData = conf.TaskData
}
if conf.Handler != nil {
task.handler = conf.Handler
Expand Down Expand Up @@ -104,23 +104,20 @@ func (task *CronTask) Start() {
// no recover panic
// support for #6 新增RunOnce方法建议
func (task *CronTask) RunOnce() error {
err := task.handler(task.context)
err := task.handler(task.getTaskContext())
return err
}

// NewCronTask create new cron task
func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle, taskData interface{}) (Task, error) {
context := new(TaskContext)
context.TaskID = taskID
context.TaskData = taskData

task := new(CronTask)
task.initCounters()
task.taskID = context.TaskID
task.taskID = taskID
task.TaskType = TaskType_Cron
task.IsRun = isRun
task.handler = handler
task.RawExpress = express
task.TaskData = taskData
expressList := strings.Split(express, " ")
if len(expressList) != 6 {
return nil, errors.New("express is wrong format => not 6 parts")
Expand All @@ -133,7 +130,6 @@ func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle,
task.time_Second = parseExpress(expressList[0], ExpressType_Second)

task.State = TaskState_Init
task.context = context
return task, nil
}

Expand All @@ -155,48 +151,71 @@ func startCronTask(task *CronTask) {
}

func doCronTask(task *CronTask) {
taskCtx := task.getTaskContext()
defer func() {
task.Context().Header = nil
task.Context().Error = nil
if taskCtx.TimeoutCancel != nil {
taskCtx.TimeoutCancel()
}
task.putTaskContext(taskCtx)
if err := recover(); err != nil {
task.CounterInfo().ErrorCounter.Inc(1)
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID, " cron handler recover error => ", err))
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " cron handler recover error => ", err))
if task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(task.Context(), fmt.Errorf("%v", err))
task.taskService.ExceptionHandler(taskCtx, fmt.Errorf("%v", err))
}
//goroutine panic, restart cron task
startCronTask(task)
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID, " goroutine panic, restart CronTask"))
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " goroutine panic, restart CronTask"))
}
}()
now := time.Now()
task.context.Header = make(map[string]interface{})
if task.time_WeekDay.IsMatch(now) &&
task.time_Month.IsMatch(now) &&
task.time_Day.IsMatch(now) &&
task.time_Hour.IsMatch(now) &&
task.time_Minute.IsMatch(now) &&
task.time_Second.IsMatch(now) {

//inc run counter
task.CounterInfo().RunCounter.Inc(1)
//do log
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
task.taskService.OnBeforeHandler(task.Context())
}
var err error
if !task.Context().IsEnd {
err = task.handler(task.Context())
}
if err != nil {
task.Context().Error = err
task.CounterInfo().ErrorCounter.Inc(1)
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(task.Context(), err)

handler := func() {
defer func() {
if task.Timeout > 0 {
taskCtx.doneChan <- struct{}{}
}
}()
now := time.Now()
if task.time_WeekDay.IsMatch(now) &&
task.time_Month.IsMatch(now) &&
task.time_Day.IsMatch(now) &&
task.time_Hour.IsMatch(now) &&
task.time_Minute.IsMatch(now) &&
task.time_Second.IsMatch(now) {

//inc run counter
task.CounterInfo().RunCounter.Inc(1)
//do log
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
task.taskService.OnBeforeHandler(taskCtx)
}
var err error
if !taskCtx.IsEnd {
err = task.handler(taskCtx)
}
if err != nil {
taskCtx.Error = err
task.CounterInfo().ErrorCounter.Inc(1)
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(taskCtx, err)
}
}
if task.taskService != nil && task.taskService.OnEndHandler != nil {
task.taskService.OnEndHandler(taskCtx)
}
}
if task.taskService != nil && task.taskService.OnEndHandler != nil {
task.taskService.OnEndHandler(task.Context())
}

if task.Timeout > 0 {
go handler()
select {
case <-taskCtx.TimeoutContext.Done():
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), "do handler timeout."))
case <-taskCtx.doneChan:
return
}
} else {
handler()
}

}
2 changes: 1 addition & 1 deletion example/normal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func main() {
if exists {
err = t.RunOnce()
if err != nil {
fmt.Println(t.Context(), "RunOnce error =>", err)
fmt.Println(t.GetConfig(), "RunOnce error =>", err)
}
}

Expand Down
73 changes: 51 additions & 22 deletions example/timeout/main.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,64 @@
package main

import (
"context"
"fmt"
. "github.com/devfeel/dottask"
"time"
)

func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
query(ctx)
var service *TaskService

time.Sleep(time.Hour)
}
var firstLoopTimeout = 0
var firstCronTimeout = 0

var patchLoop = 0
var patchCron = 0

func query(ctx context.Context) {
notifyChan := make(chan string)
doing := func() {
func Job_Timeout_Test(ctx *TaskContext) error {
patchCron += 1
patch := patchLoop

if firstLoopTimeout <= 0 {
firstLoopTimeout = 1
time.Sleep(time.Second * 15)
} else {
time.Sleep(time.Second)
fmt.Println("1:", time.Now())
time.Sleep(time.Second * 3)
fmt.Println("2:", time.Now())
time.Sleep(time.Second * 5)
fmt.Println("3:", time.Now())
notifyChan <- "1"
}
go doing()
select {
case <-notifyChan:
fmt.Println("done")
case <-ctx.Done():
fmt.Println("timeout")

fmt.Println(time.Now().String(), " => Job_Timeout_Test", patch)
return nil
}

func Loop_Timeout_Test(ctx *TaskContext) error {
patchLoop += 1
patch := patchLoop
if firstCronTimeout <= 0 {
firstCronTimeout = 1
time.Sleep(time.Second * 20)
}

fmt.Println(time.Now().String(), " => Loop_Timeout_Test", patch)
return nil
}

func main() {
service = StartNewService()

t1, err := service.CreateCronTask("test-timeout-cron", true, "*/10 * * * * *", Job_Timeout_Test, nil)
if err != nil {
fmt.Println("service.CreateCronTask error! => ", err.Error())
}
t1.SetTimeout(5)
t2, err := service.CreateLoopTask("test-timeout-loop", true, 0, 10000, Loop_Timeout_Test, nil)
if err != nil {
fmt.Println("service.CreateLoopTask error! => ", err.Error())
}
t2.SetTimeout(5)

service.StartAllTask()

for {
time.Sleep(time.Hour)
}

}

0 comments on commit cbcb4b0

Please sign in to comment.