Skip to content

Commit

Permalink
Implementation of job restart logic
Browse files Browse the repository at this point in the history
This feature is useful because there are instances where you wish to
restart the existing job definition, triggering any update/canary logic
that exists.

There are two ways to restart a job:

`nomad run -restart jobfile.nomad`

If the job already exists, a restart/update will be triggered even if
the job definition is identical to the previous version.  If the job
does not exist it will be registered and executed.

`nomad restart jobid`

This will restart an existing job.  This creates a new job version and
triggers the update/canary logic.
  • Loading branch information
maihde committed Mar 29, 2018
1 parent e1c4b88 commit ff83219
Show file tree
Hide file tree
Showing 12 changed files with 574 additions and 2 deletions.
23 changes: 23 additions & 0 deletions api/jobs.go
Expand Up @@ -56,6 +56,7 @@ type RegisterOptions struct {
EnforceIndex bool
ModifyIndex uint64
PolicyOverride bool
RestartJob bool
}

// Register is used to register a new job. It returns the ID
Expand Down Expand Up @@ -85,6 +86,9 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*
if opts.PolicyOverride {
req.PolicyOverride = true
}
if opts.RestartJob {
req.RestartJob = true
}
}

var resp JobRegisterResponse
Expand Down Expand Up @@ -200,6 +204,15 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W
return resp.EvalID, wm, nil
}

func (j *Jobs) Restart(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobRestartResponse
wm, err := j.client.write(fmt.Sprintf("/v1/job/%v/restart", jobID), nil, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}

// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobRegisterResponse
Expand Down Expand Up @@ -848,6 +861,7 @@ type JobRegisterRequest struct {
EnforceIndex bool
JobModifyIndex uint64
PolicyOverride bool
RestartJob bool

WriteRequest
}
Expand All @@ -858,6 +872,7 @@ type RegisterJobRequest struct {
EnforceIndex bool `json:",omitempty"`
JobModifyIndex uint64 `json:",omitempty"`
PolicyOverride bool `json:",omitempty"`
RestartJob bool `json:",omitempty"`
}

// JobRegisterResponse is used to respond to a job registration
Expand All @@ -881,6 +896,14 @@ type JobDeregisterResponse struct {
QueryMeta
}

// JobDeregisterResponse is used to respond to a job deregistration
type JobRestartResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}

type JobPlanRequest struct {
Job *Job
Diff bool
Expand Down
20 changes: 20 additions & 0 deletions command/agent/job_endpoint.go
Expand Up @@ -79,6 +79,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/stable"):
jobName := strings.TrimSuffix(path, "/stable")
return s.jobStable(resp, req, jobName)
case strings.HasSuffix(path, "/restart"):
jobName := strings.TrimSuffix(path, "/restart")
return s.jobRestart(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -363,6 +366,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
PolicyOverride: args.PolicyOverride,
RestartJob: args.RestartJob,
WriteRequest: structs.WriteRequest{
Region: args.WriteRequest.Region,
AuthToken: args.WriteRequest.SecretID,
Expand All @@ -379,6 +383,22 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
return out, nil
}

func (s *HTTPServer) jobRestart(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

args := structs.JobRestartRequest{
JobID: jobName,
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.JobRestartResponse
if err := s.agent.RPC("Job.Restart", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}

func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

Expand Down
5 changes: 5 additions & 0 deletions command/commands.go
Expand Up @@ -554,6 +554,11 @@ func Commands(metaPtr *Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"restart": func() (cli.Command, error) {
return &RestartCommand{
Meta: meta,
}, nil
},
"stop": func() (cli.Command, error) {
return &JobStopCommand{
Meta: meta,
Expand Down
12 changes: 11 additions & 1 deletion command/job_run.go
Expand Up @@ -82,6 +82,10 @@ Run Options:
-policy-override
Sets the flag to force override any soft mandatory Sentinel policies.
-restart
Restarts an existing job as if it were updated. If the job does not exist,
it is registered as normal.
-vault-token
If set, the passed Vault token is stored in the job before sending to the
Nomad servers. This allows passing the Vault token without storing it in
Expand All @@ -107,6 +111,7 @@ func (c *JobRunCommand) AutocompleteFlags() complete.Flags {
"-vault-token": complete.PredictAnything,
"-output": complete.PredictNothing,
"-policy-override": complete.PredictNothing,
"-restart": complete.PredictNothing,
})
}

Expand All @@ -115,7 +120,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor {
}

func (c *JobRunCommand) Run(args []string) int {
var detach, verbose, output, override bool
var detach, verbose, output, override, restart bool
var checkIndexStr, vaultToken string

flags := c.Meta.FlagSet("job run", FlagSetClient)
Expand All @@ -124,6 +129,7 @@ func (c *JobRunCommand) Run(args []string) int {
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.BoolVar(&override, "policy-override", false, "")
flags.BoolVar(&restart, "restart", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")
flags.StringVar(&vaultToken, "vault-token", "", "")

Expand Down Expand Up @@ -217,6 +223,10 @@ func (c *JobRunCommand) Run(args []string) int {
if override {
opts.PolicyOverride = true
}
if restart {
c.Ui.Output(fmt.Sprintf("Setting restart"))
opts.RestartJob = true
}

// Submit the job
resp, _, err := client.Jobs().RegisterOpts(job, opts, nil)
Expand Down
172 changes: 172 additions & 0 deletions command/restart.go
@@ -0,0 +1,172 @@
package command

import (
"fmt"
"strings"

"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)

type RestartCommand struct {
Meta
}

func (c *RestartCommand) Help() string {
helpText := `
Usage: nomad restart [options] <job>
Restart an existing job. This command is used to signal allocations to
restart for the given job ID. Upon successful restart an interactive monitor
session will start to display log lines as the job unwinds its allocations
and completes restarts. It is safe to exit the monitor early using
ctrl+c.
General Options:
` + generalOptionsUsage() + `
Restart Options:
-detach
Return immediately instead of entering monitor mode. After the
restart command is submitted, a new evaluation ID is printed to the
screen, which can be used to examine the evaluation using the eval-status
command.
-yes
Automatic yes to prompts.
-verbose
Display full information.
`
return strings.TrimSpace(helpText)
}

func (c *RestartCommand) Synopsis() string {
return "Restart a running job"
}

func (c *RestartCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-yes": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

func (c *RestartCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}

resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Jobs]
})
}

func (c *RestartCommand) Run(args []string) int {
var detach, verbose, autoYes bool

flags := c.Meta.FlagSet("restart", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&autoYes, "yes", false, "")

if err := flags.Parse(args); err != nil {
return 1
}

// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}

// Check that we got exactly one job
args = flags.Args()
if len(args) != 1 {
c.Ui.Error(c.Help())
return 1
}
jobID := args[0]

// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}

// Check if the job exists
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}
if len(jobs) == 0 {
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID))
return 1
}
if len(jobs) > 1 && strings.TrimSpace(jobID) != jobs[0].ID {
c.Ui.Error(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", createStatusListOutput(jobs)))
return 1
}
// Prefix lookup matched a single job
job, _, err := client.Jobs().Info(jobs[0].ID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}

// Confirm the restart if the job was a prefix match.
if jobID != *job.ID && !autoYes {
question := fmt.Sprintf("Are you sure you want to restart job %q? [y/N]", *job.ID)
answer, err := c.Ui.Ask(question)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1
}

if answer == "" || strings.ToLower(answer)[0] == 'n' {
// No case
c.Ui.Output("Cancelling job restart")
return 0
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
// Non exact match yes
c.Ui.Output("For confirmation, an exact ‘y’ is required.")
return 0
} else if answer != "y" {
c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1
}
}

// Invoke the restart
evalID, _, err := client.Jobs().Restart(*job.ID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error restart job: %s", err))
return 1
}

// If we are restartping a periodic job there won't be an evalID.
if evalID == "" {
return 0
}

if detach {
c.Ui.Output(evalID)
return 0
}

// Start monitoring the restart eval
mon := newMonitor(c.Ui, client, length)
return mon.monitor(evalID, false)
}
1 change: 1 addition & 0 deletions main.go
Expand Up @@ -46,6 +46,7 @@ var (
// Common commands are grouped separately to call them out to operators.
commonCommands = []string{
"run",
"restart",
"stop",
"status",
"alloc",
Expand Down
17 changes: 17 additions & 0 deletions nomad/fsm.go
Expand Up @@ -196,6 +196,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyDrainUpdate(buf[1:], log.Index)
case structs.JobRegisterRequestType:
return n.applyUpsertJob(buf[1:], log.Index)
case structs.JobRestartRequestType:
return n.applyRestartJob(buf[1:], log.Index)
case structs.JobDeregisterRequestType:
return n.applyDeregisterJob(buf[1:], log.Index)
case structs.EvalUpdateRequestType:
Expand Down Expand Up @@ -475,6 +477,21 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return nil
}

func (n *nomadFSM) applyRestartJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "restart_job"}, time.Now())
var req structs.JobRestartRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.RestartJob(index, req.Namespace, req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: RestartJob failed: %v", err)
return err
}

return nil
}

func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_job"}, time.Now())
var req structs.JobDeregisterRequest
Expand Down

0 comments on commit ff83219

Please sign in to comment.