Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Properly handle running the VPC tool in wait-for-sg-lock-timeout mode
Browse files Browse the repository at this point in the history
  • Loading branch information
sargun committed May 29, 2018
1 parent 01f7151 commit ced4ed3
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 60 deletions.
161 changes: 103 additions & 58 deletions executor/runtime/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ const (

// Config represents the configuration for the Docker titus runtime
type Config struct { // nolint: maligned
userNamespaceFDEnabled bool
cfsBandwidthPeriod uint64
cfsBandwidthMode string
tiniVerbosity int
batchSize int
burst bool
securityConvergenceTimeout time.Duration
pidLimit int
prepareTimeout time.Duration
startTimeout time.Duration
debugAllocate bool
bumpTiniSchedPriority bool
userNamespaceFDEnabled bool
cfsBandwidthPeriod uint64
cfsBandwidthMode string
tiniVerbosity int
batchSize int
burst bool
securityConvergenceTimeout time.Duration
pidLimit int
prepareTimeout time.Duration
startTimeout time.Duration
bumpTiniSchedPriority bool
waitForSecurityGroupLockTimeout time.Duration
}

// NewConfig generates a configuration, with a set of flags tied to it for the docker runtime
Expand Down Expand Up @@ -145,9 +145,10 @@ func NewConfig() (*Config, []cli.Flag) {
Value: time.Minute * 10,
Destination: &cfg.startTimeout,
},
cli.BoolFlag{
Name: "titus.executor.debugAllocate",
Destination: &cfg.debugAllocate,
cli.DurationFlag{
Name: "titus.executor.waitForSecurityGroupLockTimeout",
Value: time.Minute * 1,
Destination: &cfg.waitForSecurityGroupLockTimeout,
},
// Allow the usage of a realtime scheduling policy to be optional on systems that don't have it properly configured
// by default, i.e.: docker-for-mac.
Expand Down Expand Up @@ -649,8 +650,13 @@ func doDockerPull(ctx context.Context, metrics metrics.Reporter, client *docker.
}
}

func vpcToolPath() string {
myPath := os.Args[0]
return filepath.Join(filepath.Dir(myPath), "titus-vpc-tool")
}

// This will setup c.Allocation
func prepareNetworkDriver(cfg Config, c *runtimeTypes.Container) error {
func prepareNetworkDriver(parentCtx context.Context, cfg Config, c *runtimeTypes.Container) error { // nolint: gocyclo
log.Printf("Configuring VPC network for %s", c.TaskID)

args := []string{
Expand All @@ -661,14 +667,29 @@ func prepareNetworkDriver(cfg Config, c *runtimeTypes.Container) error {
"--batch-size", strconv.Itoa(cfg.batchSize),
}

// This blocks, and ignores kills.
if !c.TitusInfo.GetIgnoreLaunchGuard() {
args = append(args, "--wait-for-sg-lock-timeout", cfg.waitForSecurityGroupLockTimeout.String())
}

// This channel indicates when allocation is done, successful or not
allocateDone := make(chan struct{})
defer close(allocateDone)

// This ctx should only be cancelled when
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-allocateDone:
case <-parentCtx.Done():
cancel()
}
}()
// We intentionally don't use context here, because context only KILLs.
// Instead we rely on the idea of the cleanup function below.
if cfg.debugAllocate {
args = append([]string{"-e", "trace=file,network,desc", "-e", "trace=!pselect6,futex,utimensat", "-s", "8192", "-tt", "-f", "-o", "allocate.trace", "/apps/titus-executor/bin/titus-vpc-tool"}, args...)
c.AllocationCommand = exec.Command("/usr/bin/strace", args...) // nolint: gas
} else {
c.AllocationCommand = exec.Command("/apps/titus-executor/bin/titus-vpc-tool", args...) // nolint: gas
}

c.AllocationCommand = exec.CommandContext(ctx, vpcToolPath(), args...) // nolint: gas
c.AllocationCommandStatus = make(chan *exec.ExitError)

stdoutPipe, err := c.AllocationCommand.StdoutPipe()
c.AllocationCommand.Stderr = os.Stderr
Expand All @@ -680,28 +701,33 @@ func prepareNetworkDriver(cfg Config, c *runtimeTypes.Container) error {
return err
}

c.RegisterRuntimeCleanup(func() error {
_ = c.AllocationCommand.Process.Signal(unix.SIGTERM)
killTimer := time.AfterFunc(5*time.Minute, func() {
_ = c.AllocationCommand.Process.Kill()
})
_ = c.AllocationCommand.Wait()
killTimer.Stop()
_ = c.AllocationCommand.Process.Kill()
return nil
})
cancelTimer := time.AfterFunc(5*time.Minute, func() {
log.Warning("timed out trying to allocate network")
_ = c.AllocationCommand.Process.Kill()
})
if err := json.NewDecoder(stdoutPipe).Decode(&c.Allocation); err != nil {
_ = c.AllocationCommand.Process.Kill()
// This should kill the process
cancel()
return fmt.Errorf("Unable to read json from pipe: %+v", err)
}
if !cancelTimer.Stop() {
// Ruh roh, we failed to stop the timer in time.
return errors.New("Race condition experienced with stopping VPC allocation")
}

c.RegisterRuntimeCleanup(func() error {
_ = c.AllocationCommand.Process.Signal(unix.SIGTERM)
time.AfterFunc(5*time.Minute, cancel)
defer cancel()
select {
case e := <-c.AllocationCommandStatus:
return e
case <-ctx.Done():
return fmt.Errorf("allocate command: %s", ctx.Err().Error())
}
})

go func() {
e := c.AllocationCommand.Wait()
if exitErr, ok := e.(*exec.ExitError); ok {
c.AllocationCommandStatus <- exitErr
} else {
log.Error("Could not handle exit error of allocation command: ", e)
}
close(c.AllocationCommandStatus)
}()

if !c.Allocation.Success {
_ = c.AllocationCommand.Process.Kill()
Expand All @@ -715,7 +741,7 @@ func prepareNetworkDriver(cfg Config, c *runtimeTypes.Container) error {
return fmt.Errorf("vpc network configuration error: %s; %v", c.Allocation.Error, c.AllocationCommand.Wait())
}

log.Printf("vpc network configuration obtained %+v", c.Allocation)
log.WithField("allocation", c.Allocation).Info("vpc network configuration obtained")

return nil
}
Expand Down Expand Up @@ -761,7 +787,7 @@ func (r *DockerRuntime) Prepare(parentCtx context.Context, c *runtimeTypes.Conta

if r.cfg.UseNewNetworkDriver {
group.Go(func() error {
return prepareNetworkDriver(r.dockerCfg, c)
return prepareNetworkDriver(errGroupCtx, r.dockerCfg, c)
})
} else {
// Don't call out to network driver for local development
Expand Down Expand Up @@ -1567,7 +1593,7 @@ func setupNetworkingArgs(burst bool, c *runtimeTypes.Container) []string {
return args
}

func setupNetworking(burst bool, c *runtimeTypes.Container, cred ucred) error {
func setupNetworking(burst bool, c *runtimeTypes.Container, cred ucred) error { // nolint: gocyclo
log.Info("Setting up container network")
var result vpcTypes.WiringStatus

Expand All @@ -1577,16 +1603,21 @@ func setupNetworking(burst bool, c *runtimeTypes.Container, cred ucred) error {
}
defer shouldClose(netnsFile)

c.SetupCommand = exec.Command("/apps/titus-executor/bin/titus-vpc-tool", setupNetworkingArgs(burst, c)...) // nolint: gas
// This ctx isn't directly descendant from the parent context. It'll be called iff the command successfully starts
// in the runtime cleanup function, or in
ctx, cancel := context.WithCancel(context.Background()) // nolint: vet

c.SetupCommand = exec.CommandContext(ctx, vpcToolPath(), setupNetworkingArgs(burst, c)...) // nolint: gas
c.SetupCommandStatus = make(chan *exec.ExitError)
stdin, err := c.SetupCommand.StdinPipe()
if err != nil {
return err
return err // nolint: vet
}
stdout, err := c.SetupCommand.StdoutPipe()
if err != nil {
return err
}
c.AllocationCommand.Stderr = os.Stderr
c.SetupCommand.Stderr = os.Stderr
c.SetupCommand.ExtraFiles = []*os.File{netnsFile}

err = c.SetupCommand.Start()
Expand All @@ -1595,33 +1626,47 @@ func setupNetworking(burst bool, c *runtimeTypes.Container, cred ucred) error {
}

c.RegisterRuntimeCleanup(func() error {
defer cancel()
_ = c.SetupCommand.Process.Signal(unix.SIGTERM)
killTimer := time.AfterFunc(1*time.Minute, func() {
_ = c.SetupCommand.Process.Kill()
})
_ = c.SetupCommand.Wait()
killTimer.Stop()
_ = c.SetupCommand.Process.Kill()
return nil
time.AfterFunc(1*time.Minute, cancel)
select {
case e := <-c.SetupCommandStatus:
return e
case <-ctx.Done():
return fmt.Errorf("Setup Command: %s", ctx.Err().Error())
}
})

go func() {
defer close(c.SetupCommandStatus)
e := c.AllocationCommand.Wait()
if e == nil {
return
}
if exitErr, ok := e.(*exec.ExitError); ok {
c.SetupCommandStatus <- exitErr
} else {
log.Error("Could not handle exit error of setup command: ", e)
}
}()

cancelTimer := time.AfterFunc(5*time.Minute, func() {
log.Warning("timed out trying to setup container network")
_ = c.SetupCommand.Process.Kill()
cancel()
})
if err := json.NewEncoder(stdin).Encode(c.Allocation); err != nil {
_ = c.SetupCommand.Process.Kill()
cancel()
return err
}
if err := json.NewDecoder(stdout).Decode(&result); err != nil {
_ = c.SetupCommand.Process.Kill()
cancel()
return fmt.Errorf("Unable to read json from pipe during setup-container: %+v", err)
}
if !cancelTimer.Stop() {
return errors.New("Race condition experienced with container network setup")
}
if !result.Success {
_ = c.SetupCommand.Process.Kill()
cancel()
return fmt.Errorf("Network setup error: %s", result.Error)
}

Expand Down
6 changes: 4 additions & 2 deletions executor/runtime/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ type Container struct {
// GPU devices
GPUInfo GPUContainer

AllocationCommand *exec.Cmd
SetupCommand *exec.Cmd
AllocationCommand *exec.Cmd
AllocationCommandStatus chan *exec.ExitError
SetupCommand *exec.Cmd
SetupCommandStatus chan *exec.ExitError

Config config.Config
}
Expand Down

0 comments on commit ced4ed3

Please sign in to comment.