Skip to content

Commit

Permalink
Merge pull request #769 from go-task/fix-signals
Browse files Browse the repository at this point in the history
Fix behavior of interrupt (SIGINT, SIGTERM) signals
  • Loading branch information
andreynering committed Jun 12, 2022
2 parents c9a582f + 7989f73 commit c1466f8
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dist/
# exuberant ctags
tags

/bin
/bin/*
!/bin/.keep
/testdata/vars/v1
/tmp
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Fix behavior of interrupt (SIGINT, SIGTERM) signals. Task will now give time
for the processes running to do cleanup work
([#458](https://github.com/go-task/task/issues/458), [#479](https://github.com/go-task/task/pull/479), [#728](https://github.com/go-task/task/issues/728)).
- Add new `--exit-code` (`-x`) flag that will pass-through the exit form the
command being ran
([#755](https://github.com/go-task/task/pull/755)).
Expand Down
22 changes: 21 additions & 1 deletion Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ tasks:

install:
desc: Installs Task
sources:
- './**/*.go'
cmds:
- go install -v -ldflags="-w -s -X main.version={{.GIT_COMMIT}}" ./cmd/task

Expand All @@ -40,12 +42,30 @@ tasks:

lint:
desc: Runs golangci-lint
sources:
- './**/*.go'
cmds:
- golangci-lint run

sleepit:build:
desc: Builds the sleepit test helper
sources:
- ./cmd/sleepit/**/*.go
generates:
- ./bin/sleepit
cmds:
- go build -o ./bin/sleepit{{exeExt}} ./cmd/sleepit

sleepit:run:
desc: Builds the sleepit test helper
deps: [sleepit:build]
cmds:
- ./bin/sleepit {{.CLI_ARGS}}
silent: true

test:
desc: Runs test suite
deps: [install]
deps: [install, sleepit:build]
cmds:
- go test {{catLines .GO_PACKAGES}}

Expand Down
Empty file added bin/.keep
Empty file.
173 changes: 173 additions & 0 deletions cmd/sleepit/sleepit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// This code is released under the MIT License
// Copyright (c) 2020 Marco Molteni and the timeit contributors.

package main

import (
"flag"
"fmt"
"os"
"os/signal"
"time"
)

const usage = `sleepit: sleep for the specified duration, optionally handling signals
When the line "sleepit: ready" is printed, it means that it is safe to send signals to it
Usage: sleepit <command> [<args>]
Commands
default Use default action: on reception of SIGINT terminate abruptly
handle Handle signals: on reception of SIGINT perform cleanup before exiting
version Show the sleepit version`

var (
// Filled by the linker.
fullVersion = "unknown" // example: v0.0.9-8-g941583d027-dirty
)

func main() {
os.Exit(run(os.Args[1:]))
}

func run(args []string) int {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, usage)
return 2
}

defaultCmd := flag.NewFlagSet("default", flag.ExitOnError)
defaultSleep := defaultCmd.Duration("sleep", 5*time.Second, "Sleep duration")

handleCmd := flag.NewFlagSet("handle", flag.ExitOnError)
handleSleep := handleCmd.Duration("sleep", 5*time.Second, "Sleep duration")
handleCleanup := handleCmd.Duration("cleanup", 5*time.Second, "Cleanup duration")
handleTermAfter := handleCmd.Int("term-after", 0,
"Terminate immediately after `N` signals.\n"+
"Default is to terminate only when the cleanup phase has completed.")

versionCmd := flag.NewFlagSet("version", flag.ExitOnError)

switch args[0] {

case "default":
_ = defaultCmd.Parse(args[1:])
if len(defaultCmd.Args()) > 0 {
fmt.Fprintf(os.Stderr, "default: unexpected arguments: %v\n", defaultCmd.Args())
return 2
}
return supervisor(*defaultSleep, 0, 0, nil)

case "handle":
_ = handleCmd.Parse(args[1:])
if *handleTermAfter == 1 {
fmt.Fprintf(os.Stderr, "handle: term-after cannot be 1\n")
return 2
}
if len(handleCmd.Args()) > 0 {
fmt.Fprintf(os.Stderr, "handle: unexpected arguments: %v\n", handleCmd.Args())
return 2
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt) // Ctrl-C -> SIGINT
return supervisor(*handleSleep, *handleCleanup, *handleTermAfter, sigCh)

case "version":
_ = versionCmd.Parse(args[1:])
if len(versionCmd.Args()) > 0 {
fmt.Fprintf(os.Stderr, "version: unexpected arguments: %v\n", versionCmd.Args())
return 2
}
fmt.Printf("sleepit version %s\n", fullVersion)
return 0

default:
fmt.Fprintln(os.Stderr, usage)
return 2
}
}

func supervisor(
sleep time.Duration,
cleanup time.Duration,
termAfter int,
sigCh <-chan os.Signal,
) int {
fmt.Printf("sleepit: ready\n")
fmt.Printf("sleepit: PID=%d sleep=%v cleanup=%v\n",
os.Getpid(), sleep, cleanup)

cancelWork := make(chan struct{})
workerDone := worker(cancelWork, sleep, "work")

cancelCleaner := make(chan struct{})
var cleanerDone <-chan struct{}

sigCount := 0
for {
select {
case sig := <-sigCh:
sigCount++
fmt.Printf("sleepit: got signal=%s count=%d\n", sig, sigCount)
if sigCount == 1 {
// since `cancelWork` is unbuffered, sending will be synchronous:
// we are ensured that the worker has terminated before starting cleanup.
// This is important in some real-life situations.
cancelWork <- struct{}{}
cleanerDone = worker(cancelCleaner, cleanup, "cleanup")
}
if sigCount == termAfter {
cancelCleaner <- struct{}{}
return 4
}
case <-workerDone:
return 0
case <-cleanerDone:
return 3
}
}
}

// Start a worker goroutine and return immediately a `workerDone` channel.
// The goroutine will prepend its prints with the prefix `name`.
// The goroutine will simulate some work and will terminate when one of the following
// conditions happens:
// 1. When `howlong` is elapsed. This case will be signaled on the `workerDone` channel.
// 2. When something happens on channel `canceled`. Note that this simulates real-life,
// so cancellation is not instantaneous: if the caller wants a synchronous cancel,
// it should send a message; if instead it wants an asynchronous cancel, it should
// close the channel.
func worker(
canceled <-chan struct{},
howlong time.Duration,
name string,
) <-chan struct{} {
workerDone := make(chan struct{})
deadline := time.Now().Add(howlong)
go func() {
fmt.Printf("sleepit: %s started\n", name)
for {
select {
case <-canceled:
fmt.Printf("sleepit: %s canceled\n", name)
return
default:
if doSomeWork(deadline) {
fmt.Printf("sleepit: %s done\n", name) // <== NOTE THIS LINE
workerDone <- struct{}{}
return
}
}
}
}()
return workerDone
}

// Do some work and then return, so that the caller can decide wether to continue or not.
// Return true when all work is done.
func doSomeWork(deadline time.Time) bool {
if time.Now().After(deadline) {
return true
}
timeout := 100 * time.Millisecond
time.Sleep(timeout)
return false
}
19 changes: 3 additions & 16 deletions cmd/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ import (
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"runtime/debug"
"strings"
"syscall"

"github.com/spf13/pflag"
"mvdan.cc/sh/v3/syntax"
Expand Down Expand Up @@ -204,11 +202,12 @@ func main() {
globals.Set("CLI_ARGS", taskfile.Var{Static: cliArgs})
e.Taskfile.Vars.Merge(globals)

ctx := context.Background()
if !watch {
ctx = getSignalContext()
e.InterceptInterruptSignals()
}

ctx := context.Background()

if status {
if err := e.Status(ctx, calls...); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -249,18 +248,6 @@ func getArgs() ([]string, string, error) {
return args[:doubleDashPos], strings.Join(quotedCliArgs, " "), nil
}

func getSignalContext() context.Context {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-ch
log.Printf("task: signal received: %s", sig)
cancel()
}()
return ctx
}

func getVersion() string {
if version != "" {
return version
Expand Down
2 changes: 2 additions & 0 deletions internal/execext/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

"mvdan.cc/sh/v3/expand"
"mvdan.cc/sh/v3/interp"
Expand Down Expand Up @@ -48,6 +49,7 @@ func RunCommand(ctx context.Context, opts *RunCommandOptions) error {
r, err := interp.New(
interp.Params("-e"),
interp.Env(expand.ListEnviron(environ...)),
interp.ExecHandler(interp.DefaultExecHandler(15*time.Second)),
interp.OpenHandler(openHandler),
interp.StdIO(opts.Stdin, opts.Stdout, opts.Stderr),
dirOption(opts.Dir),
Expand Down
31 changes: 31 additions & 0 deletions signals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package task

import (
"os"
"os/signal"
"syscall"

"github.com/go-task/task/v3/internal/logger"
)

// NOTE(@andreynering): This function intercepts SIGINT and SIGTERM signals
// so the Task process is not killed immediatelly and processes running have
// time to do cleanup work.
func (e *Executor) InterceptInterruptSignals() {
ch := make(chan os.Signal, 3)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

go func() {
for i := 1; i <= 3; i++ {
sig := <-ch

if i < 3 {
e.Logger.Outf(logger.Yellow, `task: Signal received: "%s"`, sig)
continue
}

e.Logger.Errf(logger.Red, `task: Signal received for the third time: "%s". Forcing shutdown`, sig)
os.Exit(1)
}
}()
}
Loading

0 comments on commit c1466f8

Please sign in to comment.