Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Commit

Permalink
Merge pull request #109 from Netflix/change-state-machine
Browse files Browse the repository at this point in the history
Change the executor state machine
  • Loading branch information
sargun committed May 15, 2018
2 parents 11b8afb + 1e71bfc commit 805f6d2
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 174 deletions.
10 changes: 1 addition & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
)

const (
defaultStatusCheckFrequency = 5 * time.Second
defaultLogUploadThreshold = 6 * time.Hour
defaultLogUploadCheckInterval = 15 * time.Minute
defaultStdioLogCheckInterval = 1 * time.Minute
Expand All @@ -31,9 +30,7 @@ type Config struct { // nolint: maligned
DisableMetrics bool
// LogUpload returns settings about the log uploader
//LogUpload logUpload
// StatusCheckFrequency returns duration between the periods the executor will poll Dockerd
StatusCheckFrequency time.Duration
LogsTmpDir string
LogsTmpDir string
// Stack returns the stack configuration variable
Stack string
// Docker returns the Docker-specific configuration settings
Expand Down Expand Up @@ -95,11 +92,6 @@ func NewConfig() (*Config, []cli.Flag) {
EnvVar: "DISABLE_METRICS,SHORT_CIRCUIT_QUITELITE",
Destination: &cfg.DisableMetrics,
},
cli.DurationFlag{
Name: "status-check-frequency",
Destination: &cfg.StatusCheckFrequency,
Value: defaultStatusCheckFrequency,
},
cli.StringFlag{
Name: "logs-tmp-dir",
Value: defaultLogsTmpDir,
Expand Down
1 change: 0 additions & 1 deletion executor/mock/jobrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func NewJobRunner() *JobRunner {
if err != nil {
panic(err)
}
cfg.StatusCheckFrequency = time.Second * 1
cfg.KeepLocalFileAfterUpload = true
cfg.MetatronEnabled = false

Expand Down
98 changes: 54 additions & 44 deletions executor/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,18 @@ no_launchguard:
}

r.updateStatus(ctx, titusdriver.Starting, "starting")
logDir, err := r.runtime.Start(ctx, r.container)
logDir, details, statusChan, err := r.runtime.Start(ctx, r.container)
if err != nil { // nolint: vetshadow
r.metrics.Counter("titus.executor.launchTaskFailed", 1, nil)
r.logger.Info("start container: ", err)

switch err.(type) {
case *runtimeTypes.BadEntryPointError:
r.logger.Info("Returning TaskState_TASK_FAILED for task: ", err)
r.updateStatus(ctx, titusdriver.Failed, err.Error())
r.updateStatusWithDetails(ctx, titusdriver.Failed, err.Error(), details)
default:
r.logger.Info("Returning TASK_LOST for task: ", err)
r.updateStatus(ctx, titusdriver.Lost, err.Error())
r.updateStatusWithDetails(ctx, titusdriver.Lost, err.Error(), details)
}
return
}
Expand All @@ -301,43 +301,50 @@ no_launchguard:
err = r.maybeSetupExternalLogger(ctx, logDir)
if err != nil {
r.logger.Error("Unable to setup logging for container: ", err)
r.updateStatus(ctx, titusdriver.Lost, err.Error())
r.updateStatusWithDetails(ctx, titusdriver.Lost, err.Error(), details)
return
}
} else {
r.logger.Info("Not starting external logger")
}

// TODO(fabio): Start should return Details
details, err := r.runtime.Details(r.container)
if err != nil {
r.logger.Error("Error fetching details for task: ", err)
r.updateStatus(ctx, titusdriver.Lost, err.Error())
return
} else if details == nil {
r.logger.Error("Unable to fetch task details")
if details == nil {
r.logger.Fatal("Unable to fetch task details")
}
r.metrics.Counter("titus.executor.taskLaunched", 1, nil)
r.updateStatusWithDetails(ctx, titusdriver.Running, "running", details)

// report metrics for startup time, docker image size
r.metrics.Timer("titus.executor.containerStartTime", time.Since(startTime), r.container.ImageTagForMetrics())
r.monitorContainer(ctx, startTime, statusChan, details)
}

ticks := time.NewTicker(r.config.StatusCheckFrequency)
defer ticks.Stop()
func (r *Runner) monitorContainer(ctx context.Context, startTime time.Time, statusChan <-chan runtimeTypes.StatusMessage, details *runtimeTypes.Details) { // nolint: gocyclo
lastMessage := ""
runningSent := false

for {
select {
case <-ticks.C:
status, err := r.runtime.Status(r.container)
if err != nil {
r.logger.Error("Status result error: ", err)
case statusMessage, ok := <-statusChan:
msg := statusMessage.Msg
if !ok {
r.updateStatusWithDetails(ctx, titusdriver.Lost, "Lost connection to runtime driver", details)
return
}
shouldQuit, titusTaskStatus, msg := parseStatus(status, err)
if shouldQuit {
r.logger.Info("Status: ", titusTaskStatus.String())
// TODO: Generate Update
r.updateStatus(ctx, titusTaskStatus, msg)
r.logger.WithField("statusMessage", statusMessage).Info("Processing msg")

switch statusMessage.Status {
case runtimeTypes.StatusRunning:
r.handleTaskRunningMessage(ctx, msg, &lastMessage, &runningSent, startTime, details)
// Error code 0
case runtimeTypes.StatusFinished:
if msg == "" {
msg = "finished"
}
r.updateStatusWithDetails(ctx, titusdriver.Finished, msg, details)
return
case runtimeTypes.StatusFailed:
r.updateStatusWithDetails(ctx, titusdriver.Failed, msg, details)
return
default:
r.updateStatusWithDetails(ctx, titusdriver.Lost, msg, details)
return
}
case <-r.killChan:
Expand All @@ -349,6 +356,27 @@ no_launchguard:
}
}

func (r *Runner) handleTaskRunningMessage(ctx context.Context, msg string, lastMessage *string, runningSent *bool, startTime time.Time, details *runtimeTypes.Details) {
// no need to Update the status if task is running and the message is the same as the last one
// The first time this is called *runningSent should be false, so it'll always trigger
if msg == *lastMessage && *runningSent {
return
}

// The msg for the first runningSent will always be "running"
if !(*runningSent) {
if msg == "" {
msg = "running"
}
r.metrics.Timer("titus.executor.containerStartTime", time.Since(startTime), r.container.ImageTagForMetrics())
}

r.updateStatusWithDetails(ctx, titusdriver.Running, msg, details)
*runningSent = true
*lastMessage = msg

}

func (r *Runner) handleShutdown(ctx context.Context) { // nolint: gocyclo
r.logger.Debug("Handling shutdown")
launchGuardCtx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -429,21 +457,6 @@ func (r *Runner) wasKilled() bool {
}
}

func parseStatus(status runtimeTypes.Status, err error) (bool, titusdriver.TitusTaskState, string) {

switch status {
case runtimeTypes.StatusRunning:
// no need to Update the status if task is running
return false, titusdriver.Running, ""
case runtimeTypes.StatusFinished:
return true, titusdriver.Finished, "finished"
case runtimeTypes.StatusFailed:
return true, titusdriver.Failed, err.Error()
default:
return true, titusdriver.Lost, err.Error()
}
}

func (r *Runner) maybeSetupExternalLogger(ctx context.Context, logDir string) error {
var err error

Expand Down Expand Up @@ -529,9 +542,6 @@ func (r *Runner) updateStatus(ctx context.Context, status titusdriver.TitusTaskS
func (r *Runner) updateStatusWithDetails(ctx context.Context, status titusdriver.TitusTaskState, msg string, details *runtimeTypes.Details) {
r.lastStatus = status
l := r.logger.WithField("msg", msg).WithField("taskStatus", status)
if details != nil {
l = l.WithField("details", details)
}
select {
case r.UpdatesChan <- Update{
TaskID: r.container.TaskID,
Expand Down
Loading

0 comments on commit 805f6d2

Please sign in to comment.