-
Notifications
You must be signed in to change notification settings - Fork 404
/
driver.go
316 lines (256 loc) · 10.9 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
// Interface for all container drivers
package drivers
import (
"context"
"io"
"strings"
"github.com/fnproject/fn/api/agent/drivers/stats"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
)
// A DriverCookie identifies a unique request to run a task.
//
// Clients should always call Close() on a DriverCookie after they are done
// with it.
type Cookie interface {
// Close should clean up any resources the cookie was using, or was going to use.
Close(ctx context.Context) error
// Run should execute task on the implementation.
// RunResult captures the result of task execution. This means if task
// execution fails due to a problem in the task, Run() MUST return a valid
// RunResult and nil as the error. The RunResult's Error() and Status()
// should be used to indicate failure.
// If the implementation itself suffers problems (lost of network, out of
// disk etc.), a nil RunResult and an error message is preferred.
//
// Run() MUST monitor the context. task cancellation is indicated by
// cancelling the context.
Run(ctx context.Context) (WaitResult, error)
// Freeze the container to pause running processes
Freeze(ctx context.Context) error
// Unfreeze a frozen container to unpause frozen processes
Unfreeze(ctx context.Context) error
// Validate/Inspect image. Returns true if the image needs
// to be pulled and non-nil error if validation/inspection fails.
ValidateImage(ctx context.Context) (bool, error)
// Pull the image. An image pull requires validation/inspection
// again.
PullImage(ctx context.Context) error
// Create container which can be Run() later
CreateContainer(ctx context.Context) error
// Fetch driver specific container configuration. Use this to
// access the container create options. If Driver.Prepare() is not
// yet called with the cookie, then this can be used to modify container
// create options.
ContainerOptions() interface{}
}
type WaitResult interface {
// Wait may be called to await the result of a container's execution. If the
// provided context is canceled and the container does not return first, the
// resulting status will be 'canceled'. If the provided context times out
// then the resulting status will be 'timeout'.
Wait(context.Context) RunResult
}
// Check if the provided error is retriable. Returns true and a tag reason if the error is retriable.
type RetryErrorChecker func(error) (bool, string)
type Driver interface {
// Create a new cookie with defaults and/or settings from container task.
// Callers should Close the cookie regardless of whether they prepare or run it.
CreateCookie(ctx context.Context, task ContainerTask) (Cookie, error)
// Set image pull retry policy and retriable error checker
SetPullImageRetryPolicy(policy common.BackOffConfig, checker RetryErrorChecker) error
// Get serialized string of fields in call extn that determines if we can use
// an existing hot container slot or pull a new image and start a container
GetSlotKeyExtensions(extn map[string]string) string
// close & shutdown the driver
Close() error
}
// RunResult indicates only the final state of the task.
type RunResult interface {
// Error is an actionable/checkable error from the container, nil if
// Status() returns "success", otherwise non-nil
Error() error
// Status should return the current status of the task.
// Only valid options are {"error", "success", "timeout", "killed", "cancelled"}.
Status() string
}
// Logger Tags for container
type LoggerTag struct {
Name string
Value string
}
// Logger Configuration for container
type LoggerConfig struct {
// Log Sink URL
URL string
// Log Tag Pairs
Tags []LoggerTag
}
// The ContainerTask interface guides container execution across a wide variety of
// container oriented runtimes.
type ContainerTask interface {
// Command returns the command to run within the container.
Command() string
// EnvVars returns environment variable key-value pairs.
EnvVars() map[string]string
// Input feeds the container with data
Input() io.Reader
// The id to assign the container
Id() string
// Image returns the runtime specific image to run.
Image() string
// Driver will write output log from task execution to these writers. Must be
// non-nil. Use io.Discard if log is irrelevant.
Logger() (stdout, stderr io.Writer)
// WriteStat writes a single Stat, implementation need not be thread safe.
WriteStat(context.Context, stats.Stat)
// Volumes returns an array of 2-element tuples indicating storage volume mounts.
// The first element is the path on the host, and the second element is the
// path in the container.
Volumes() [][2]string
// Memory determines the max amount of RAM given to the container to use.
// 0 is unlimited.
Memory() uint64
// CPUs in milli CPU units
CPUs() uint64
// Filesystem size limit for the container, in megabytes.
FsSize() uint64
// PIDs defines the max number of PIDs allowed for the container to use. 0
// is unlimited.
PIDs() uint64
// OpenFiles defines the max number of files that the process in the
// function is allowed to open. Return nil for the default value from the
// host.
OpenFiles() *uint64
// LockedMemory maximum number of bytes of memory that may be locked into
// RAM. Return nil for the default value from the host.
LockedMemory() *uint64
// PendingSignals limit on the number of signals that may be queued. Return
// nil for the default value from the host.
PendingSignals() *uint64
// MessageQueue a limit on the number of bytes that can be allocated for
// POSIX message queues. Return nil for the default value from the host.
MessageQueue() *uint64
// Tmpfs Filesystem size limit for the container, in megabytes.
TmpFsSize() uint64
// WorkDir returns the working directory to use for the task. Empty string
// leaves it unset.
WorkDir() string
// Logger Config to use in driver
LoggerConfig() LoggerConfig
// Close is used to perform cleanup after task execution.
// Close should be safe to call multiple times.
Close()
// AddCloseWrapper is used to add additional cleanup to a task.
// The original close operation is passed to the wrapping factory.
// Implementation need not be thread-safe.
WrapClose(func(closer func()) func())
// Extensions are extra driver specific configuration options. They should be
// more specific but it's easier to be lazy.
Extensions() map[string]string
// UDSAgentPath to use to configure the unix domain socket.
// This is the mount point relative to the agent
// abstractions have leaked so bad at this point it's a monsoon.
UDSAgentPath() string
// UDSDockerPath to use to configure the unix domain socket. the drivers
// This is the mount point relative to the docker host.
UDSDockerPath() string
// UDSDockerDest is the destination mount point for uds path. it is the path
// of the directory where the sock file resides inside of the container.
UDSDockerDest() string
// Returns true if network is disabled.
DisableNet() bool
// BeforeCall is invoked just prior to running an invocation.
// The Task is definitely going to be used for this invocation.
// Invocation extensions are passed to the Before and After calls
BeforeCall(context.Context, *models.Call, CallExtensions) error
// WrapBeforeCall can add additional pre-call behaviour to be added.
// This should be called once per ContainerTaskand applies to all successive calls
// that utilise this task
WrapBeforeCall(func(BeforeCall) BeforeCall)
// AfterCall is invoked just after an invocation is finished,
// providing that BeforeCall returned without an error
AfterCall(context.Context, *models.Call, CallExtensions) error
// WrapBeforeCall can add additional post-call behaviour to be added.
// This should be called once per ContainerTask and applies to all successive calls
// that utilise this task
WrapAfterCall(func(AfterCall) AfterCall)
}
type CallExtensions = map[string]string
type BeforeCall = func(context.Context, *models.Call, CallExtensions) error
type AfterCall = func(context.Context, *models.Call, CallExtensions) error
// TODO: ensure some type is applied to these statuses.
const (
// task statuses
StatusRunning = "running"
StatusSuccess = "success"
StatusError = "error"
StatusTimeout = "timeout"
StatusKilled = "killed"
StatusCancelled = "cancelled"
defaultDomain = "docker.io"
)
type Config struct {
// TODO this should all be driver-specific config and not in the
// driver package itself. fix if we ever one day try something else
Docker string `json:"docker"`
DockerNetworks string `json:"docker_networks"`
DockerLoadFile string `json:"docker_load_file"`
ServerVersion string `json:"server_version"`
PreForkPoolSize uint64 `json:"pre_fork_pool_size"`
PreForkImage string `json:"pre_fork_image"`
PreForkCmd string `json:"pre_fork_cmd"`
PreForkUseOnce uint64 `json:"pre_fork_use_once"`
PreForkNetworks string `json:"pre_fork_networks"`
MaxTmpFsInodes uint64 `json:"max_tmpfs_inodes"`
EnableReadOnlyRootFs bool `json:"enable_readonly_rootfs"`
ContainerLabelTag string `json:"container_label_tag"`
InstanceId string `json:"instance_id"`
ImageCleanMaxSize uint64 `json:"image_clean_max_size"`
ImageCleanExemptTags string `json:"image_clean_exempt_tags"`
ImageEnableVolume bool `json:"image_enable_volume"`
DisableUnprivilegedContainers bool `json:"disable_unprivileged_containers"`
}
// https://github.com/fsouza/go-dockerclient/blob/master/misc.go#L166
func parseRepositoryTag(repoTag string) (repository, tag string) {
parts := strings.Split(repoTag, "@")
var digest string
if len(parts) == 2 {
digest = parts[1]
}
repoTag = parts[0]
n := strings.LastIndex(repoTag, ":")
if n < 0 {
return repoTag, digest
}
if digest != "" {
return repoTag[:n], digest
}
if tag := repoTag[n+1:]; !strings.Contains(tag, "/") {
return repoTag[:n], tag
}
return repoTag, digest
}
func ParseImage(image string) (registry, repo, tag string) {
// registry = defaultDomain // TODO uneasy about this, but it seems wise
repo, tag = parseRepositoryTag(image)
// Officially sanctioned at https://github.com/moby/moby/blob/4f0d95fa6ee7f865597c03b9e63702cdcb0f7067/registry/service.go#L155 to deal with "Official Repositories".
// Without this, token auth fails.
// Registries must exist at root (https://github.com/moby/moby/issues/7067#issuecomment-54302847)
// This cannot support the `library/` shortcut for private registries.
parts := strings.SplitN(repo, "/", 2)
switch len(parts) {
case 1:
repo = "library/" + repo
default:
// detect if repo has a hostname, otherwise leave it
if strings.Contains(parts[0], ".") || strings.Contains(parts[0], ":") || parts[0] == "localhost" {
registry = parts[0]
repo = parts[1]
}
}
if tag == "" {
tag = "latest"
}
return registry, repo, tag
}