forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.go
206 lines (177 loc) · 6.29 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package driver
import (
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
// cgroupsMounted returns true if the cgroups are mounted on a system otherwise
// returns false
func cgroupsMounted(node *structs.Node) bool {
_, ok := node.Attributes["unique.cgroup.mountpoint"]
return ok
}
// createExecutor launches an executor plugin and returns an instance of the
// Executor interface
func createExecutor(w io.Writer, clientConfig *config.Config,
executorConfig *dstructs.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create executor config: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
config := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", string(c)),
}
config.HandshakeConfig = HandshakeConfig
config.Plugins = GetPluginMap(w, clientConfig.LogLevel)
config.MaxPort = clientConfig.ClientMaxPort
config.MinPort = clientConfig.ClientMinPort
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
return executorPlugin, executorClient, nil
}
func createExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = HandshakeConfig
// Setting this to DEBUG since the log level at the executor server process
// is already set, and this effects only the executor client.
config.Plugins = GetPluginMap(w, "DEBUG")
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin, ok := raw.(*ExecutorRPC)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}
// 0.6 Upgrade path: Deregister services from the executor as the Nomad
// client agent now handles all Consul interactions. Ignore errors as
// this shouldn't cause the alloc to fail and there's nothing useful to
// do with them.
executorPlugin.DeregisterServices()
return executorPlugin, executorClient, nil
}
// killProcess kills a process with the given pid
func killProcess(pid int) error {
proc, err := os.FindProcess(pid)
if err != nil {
return err
}
return proc.Kill()
}
// destroyPlugin kills the plugin with the given pid and also kills the user
// process
func destroyPlugin(pluginPid int, userPid int) error {
var merr error
if err := killProcess(pluginPid); err != nil {
merr = multierror.Append(merr, err)
}
if err := killProcess(userPid); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}
// validateCommand validates that the command only has a single value and
// returns a user friendly error message telling them to use the passed
// argField.
func validateCommand(command, argField string) error {
trimmed := strings.TrimSpace(command)
if len(trimmed) == 0 {
return fmt.Errorf("command empty: %q", command)
}
if len(trimmed) != len(command) {
return fmt.Errorf("command contains extra white space: %q", command)
}
return nil
}
// GetKillTimeout returns the kill timeout to use given the tasks desired kill
// timeout and the operator configured max kill timeout.
func GetKillTimeout(desired, max time.Duration) time.Duration {
maxNanos := max.Nanoseconds()
desiredNanos := desired.Nanoseconds()
// Make the minimum time between signal and kill, 1 second.
if desiredNanos <= 0 {
desiredNanos = (1 * time.Second).Nanoseconds()
}
// Protect against max not being set properly.
if maxNanos <= 0 {
maxNanos = (10 * time.Second).Nanoseconds()
}
if desiredNanos < maxNanos {
return time.Duration(desiredNanos)
}
return max
}
// GetAbsolutePath returns the absolute path of the passed binary by resolving
// it in the path and following symlinks.
func GetAbsolutePath(bin string) (string, error) {
lp, err := exec.LookPath(bin)
if err != nil {
return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err)
}
return filepath.EvalSymlinks(lp)
}
// getExecutorUser returns the user of the task, defaulting to
// dstructs.DefaultUnprivilegedUser if none was given.
func getExecutorUser(task *structs.Task) string {
if task.User == "" {
return dstructs.DefaultUnpriviledgedUser
}
return task.User
}
// SetEnvvars sets path and host env vars depending on the FS isolation used.
func SetEnvvars(envBuilder *env.Builder, fsi cstructs.FSIsolation, taskDir *allocdir.TaskDir, conf *config.Config) {
// Set driver-specific environment variables
switch fsi {
case cstructs.FSIsolationNone:
// Use host paths
envBuilder.SetAllocDir(taskDir.SharedAllocDir)
envBuilder.SetTaskLocalDir(taskDir.LocalDir)
envBuilder.SetSecretsDir(taskDir.SecretsDir)
default:
// filesystem isolation; use container paths
envBuilder.SetAllocDir(allocdir.SharedAllocContainerPath)
envBuilder.SetTaskLocalDir(allocdir.TaskLocalContainerPath)
envBuilder.SetSecretsDir(allocdir.TaskSecretsContainerPath)
}
// Set the host environment variables for non-image based drivers
if fsi != cstructs.FSIsolationImage {
filter := strings.Split(conf.ReadDefault("env.blacklist", config.DefaultEnvBlacklist), ",")
envBuilder.SetHostEnvvars(filter)
}
}