Skip to content

Commit

Permalink
fix: goroutine leak for tasker
Browse files Browse the repository at this point in the history
Signed-off-by: rfyiamcool <rfyiamcool@163.com>
  • Loading branch information
rfyiamcool committed Mar 14, 2024
1 parent b2e2c14 commit bc068a5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 16 deletions.
39 changes: 25 additions & 14 deletions pkg/tasker/tasker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,17 @@ func New(opt Option) *Tasker {
logger = log.New(file, "", log.LstdFlags)
}

return &Tasker{Log: logger, loc: loc, gron: gron, exprs: exprs, tasks: tasks, verbose: opt.Verbose}
ctx, cancel := context.WithCancel(context.Background())
return &Tasker{
Log: logger,
loc: loc,
gron: gron,
exprs: exprs,
tasks: tasks,
verbose: opt.Verbose,
ctx: ctx,
ctxCancel: cancel,
}
}

// WithContext adds a parent context to the Tasker struct
Expand All @@ -105,14 +115,6 @@ func (t *Tasker) WithContext(ctx context.Context) *Tasker {
return t
}

func (t *Tasker) ctxDone() {
<-t.ctx.Done()
if t.verbose {
t.Log.Printf("[tasker] received signal on context.Done, aborting")
}
t.abort = true
}

// Taskify creates TaskFunc out of plain command wrt given options.
func (t *Tasker) Taskify(cmd string, opt Option) TaskFunc {
sh := Shell(opt.Shell)
Expand Down Expand Up @@ -259,6 +261,11 @@ func (t *Tasker) Run() {

// Stop the task manager.
func (t *Tasker) Stop() {
t.stop()
}

func (t *Tasker) stop() {
t.ctxCancel()
t.abort = true
}

Expand All @@ -282,16 +289,20 @@ func (t *Tasker) doSetup() {
break
}
}
if t.ctx != nil {
go t.ctxDone()
}

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)

go func() {
<-sig
t.abort = true
select {
case <-sig:
case <-t.ctx.Done():
if t.verbose {
t.Log.Printf("[tasker] received signal on context.Done, aborting")
}
}

t.stop()
}()
}

Expand Down
24 changes: 22 additions & 2 deletions pkg/tasker/tasker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestConcurrency(t *testing.T) {
}

func TestStopTasker(t *testing.T) {
t.Run("Run", func(t *testing.T) {
t.Run("call stop()", func(t *testing.T) {
taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"})

var incr int
Expand All @@ -216,7 +216,27 @@ func TestStopTasker(t *testing.T) {
taskr.Stop()
}()
taskr.Run()
fmt.Println(incr)

if incr != 1 {
t.Errorf("the task should run 1x, not %dx", incr)
}
})

t.Run("cancel context", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
taskr := New(Option{Verbose: true, Out: "../../test/tasker.out"}).WithContext(ctx)

var incr int
taskr.Task("* * * * * *", func(ctx context.Context) (int, error) {
incr++
return 0, nil
}, false)

go func() {
time.Sleep(2 * time.Second)
cancel()
}()
taskr.Run()

if incr != 1 {
t.Errorf("the task should run 1x, not %dx", incr)
Expand Down

0 comments on commit bc068a5

Please sign in to comment.