Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nomad Actions #18794

Merged
merged 8 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18794.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
actions: introduces the action concept to jobspecs, the web UI, CLI and API
```
9 changes: 8 additions & 1 deletion api/allocations_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type execSession struct {
task string
tty bool
command []string
action string

stdin io.Reader
stdout io.Writer
Expand Down Expand Up @@ -94,9 +95,15 @@ func (s *execSession) startConnection() (*websocket.Conn, error) {
q.Params["tty"] = strconv.FormatBool(s.tty)
q.Params["task"] = s.task
q.Params["command"] = string(commandBytes)

reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", s.alloc.ID)

if s.action != "" {
q.Params["action"] = s.action
q.Params["allocID"] = s.alloc.ID
q.Params["group"] = s.alloc.TaskGroup
reqPath = fmt.Sprintf("/v1/job/%s/action", s.alloc.JobID)
}

var conn *websocket.Conn

if nodeClient != nil {
Expand Down
29 changes: 29 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package api

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"sort"
"strconv"
Expand Down Expand Up @@ -1514,3 +1516,30 @@ type JobEvaluateRequest struct {
type EvalOptions struct {
ForceReschedule bool
}

// ActionExec is used to run a pre-defined command inside a running task.
// The call blocks until command terminates (or an error occurs), and returns the exit code.
func (j *Jobs) ActionExec(ctx context.Context,
alloc *Allocation, task string, tty bool, command []string,
action string,
stdin io.Reader, stdout, stderr io.Writer,
terminalSizeCh <-chan TerminalSize, q *QueryOptions) (exitCode int, err error) {

s := &execSession{
client: j.client,
alloc: alloc,
task: task,
tty: tty,
command: command,
action: action,

stdin: stdin,
stdout: stdout,
stderr: stderr,

terminalSizeCh: terminalSizeCh,
q: q,
}

return s.run(ctx)
}
8 changes: 8 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,8 @@ type Task struct {

// Workload Identities
Identities []*WorkloadIdentity `hcl:"identity,block"`

Actions []*Action `hcl:"action,block"`
}

func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
Expand Down Expand Up @@ -1167,3 +1169,9 @@ type WorkloadIdentity struct {
ServiceName string `hcl:"service_name,optional"`
TTL time.Duration `mapstructure:"ttl" hcl:"ttl,optional"`
}

type Action struct {
Name string `hcl:"name,label"`
Command string `mapstructure:"command" hcl:"command"`
Args []string `mapstructure:"args" hcl:"args,optional"`
}
29 changes: 27 additions & 2 deletions client/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (a *Allocations) exec(conn io.ReadWriteCloser) {
handleStreamResultError(err, code, encoder)
return
}

a.c.logger.Info("task exec session ended", "exec_id", execID)
}

Expand Down Expand Up @@ -216,6 +215,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
"task", req.Task,
"command", req.Cmd,
"tty", req.Tty,
"action", req.Action,
}
if ident != nil {
if ident.ACLToken != nil {
Expand All @@ -238,7 +238,7 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e

// Check alloc-exec permission.
if err != nil {
return nil, err
return pointer.Of(int64(400)), err
} else if !aclObj.AllowNsOp(alloc.Namespace, acl.NamespaceCapabilityAllocExec) {
return nil, nstructs.ErrPermissionDenied
}
Expand All @@ -247,6 +247,20 @@ func (a *Allocations) execImpl(encoder *codec.Encoder, decoder *codec.Decoder, e
if req.Task == "" {
return pointer.Of(int64(400)), taskNotPresentErr
}

// If an action is present, go find the command and args
if req.Action != "" {
alloc, _ := a.c.GetAlloc(req.AllocID)
jobAction, err := validateActionExists(req.Action, req.Task, alloc)
if err != nil {
return pointer.Of(int64(400)), err
}
if jobAction != nil {
// append both Command and Args
req.Cmd = append([]string{jobAction.Command}, jobAction.Args...)
}
}

if len(req.Cmd) == 0 {
return pointer.Of(int64(400)), errors.New("command is not present")
}
Expand Down Expand Up @@ -343,3 +357,14 @@ func (s *execStream) Recv() (*drivers.ExecTaskStreamingRequestMsg, error) {
err := s.decoder.Decode(&req)
return &req, err
}

func validateActionExists(actionName string, taskName string, alloc *nstructs.Allocation) (*nstructs.Action, error) {
t := alloc.LookupTask(taskName)

for _, action := range t.Actions {
if action.Name == actionName {
return action, nil
}
}
return nil, fmt.Errorf("action %s not found", actionName)
}
3 changes: 3 additions & 0 deletions client/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ type AllocExecRequest struct {
// Cmd is the command to be executed
Cmd []string

// The name of a predefined command to be executed (optional)
Action string

structs.QueryOptions
}

Expand Down
Loading
Loading