-
Notifications
You must be signed in to change notification settings - Fork 602
/
docker_container_engine.go
309 lines (256 loc) · 7.6 KB
/
docker_container_engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package engine
import (
"bufio"
"encoding/json"
"errors"
"io"
"os"
"sync"
"github.com/aws/amazon-ecs-agent/agent/api"
"github.com/aws/amazon-ecs-agent/agent/utils"
dockerparsers "github.com/docker/docker/pkg/parsers"
dockerregistry "github.com/docker/docker/registry"
docker "github.com/fsouza/go-dockerclient"
)
// Interface to make testing it easier
type DockerClient interface {
ContainerEvents() (<-chan DockerContainerChangeEvent, <-chan error)
PullImage(image string) error
CreateContainer(*docker.Config, string) (string, error)
StartContainer(string, *docker.HostConfig) error
StopContainer(string) error
GetContainerName(string) (string, error)
InspectContainer(string) (*docker.Container, error)
client() (*docker.Client, error)
}
// Implements DockerClient
type DockerGoClient struct{}
// dockerClient is a singleton
var dockerclient *docker.Client
// pullLock is a temporary workaround for a devicemapper issue. See: https://github.com/docker/docker/issues/9718
var pullLock sync.Mutex
type DockerImageResponse struct {
Images []docker.APIImages
}
func NewDockerGoClient() (*DockerGoClient, error) {
dg := &DockerGoClient{}
client, err := dg.client()
if err != nil {
log.Error("Unable to create a docker client!", "err", err)
return dg, err
}
// Even if we have a dockerclient, the daemon might not be running. Ping it
// to ensure it's up.
err = client.Ping()
return dg, err
}
func (dg *DockerGoClient) PullImage(image string) error {
log.Info("Pulling image", "image", image)
client, err := dg.client()
if err != nil {
return err
}
// The following lines of code are taken, in whole or part, from the docker
// source code. Please see the NOTICE file in the root of the project for
// attribution
// https://github.com/docker/docker/blob/246ec5dd067fc17be5196ae29956e3368b167ccf/api/client/commands.go#L1180
taglessRemote, tag := dockerparsers.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
hostname, _, err := dockerregistry.ResolveRepositoryName(taglessRemote)
if err != nil {
return err
}
// TODO, authconfig
// End of docker-attributed code
// Workaround for devicemapper bug. See:
// https://github.com/docker/docker/issues/9718
pullLock.Lock()
defer pullLock.Unlock()
pullDebugOut, pullWriter := io.Pipe()
opts := docker.PullImageOptions{
Repository: taglessRemote,
Registry: hostname,
Tag: tag,
OutputStream: pullWriter,
}
go func() {
reader := bufio.NewReader(pullDebugOut)
var line []byte
var err error
line, _, err = reader.ReadLine()
for err == nil {
log.Debug("Pulling image", "image", image, "status", string(line[:]))
line, _, err = reader.ReadLine()
}
if err != nil {
log.Error("Error reading pull image status", "image", image, "err", err)
}
}()
err = client.PullImage(opts, docker.AuthConfiguration{})
return err
}
func (dg *DockerGoClient) CreateContainer(config *docker.Config, name string) (string, error) {
client, err := dg.client()
if err != nil {
return "", err
}
// Ensure this image was pulled so this can be a quick operation (taskEngine
// is blocked on this)
_, err = client.InspectImage(config.Image)
if err != nil {
return "", err
}
// TODO, race condition here: images should not be able to be deleted
// between that inspect and the CreateContainer below
containerOptions := docker.CreateContainerOptions{Config: config, Name: name}
dockerContainer, err := client.CreateContainer(containerOptions)
if err != nil {
return "", err
}
return dockerContainer.ID, nil
}
func (dg *DockerGoClient) StartContainer(id string, hostConfig *docker.HostConfig) error {
client, err := dg.client()
if err != nil {
return err
}
err = client.StartContainer(id, hostConfig)
if err != nil {
return err
}
return nil
}
func dockerStateToState(state docker.State) api.ContainerStatus {
if state.Running {
return api.ContainerRunning
}
return api.ContainerStopped
}
func (dg *DockerGoClient) DescribeContainer(dockerId string) (api.ContainerStatus, error) {
client, err := dg.client()
if err != nil {
return api.ContainerStatusUnknown, err
}
if len(dockerId) == 0 {
return api.ContainerStatusUnknown, errors.New("Invalid container id: ''")
}
dockerContainer, err := client.InspectContainer(dockerId)
if err != nil {
return api.ContainerStatusUnknown, err
}
return dockerStateToState(dockerContainer.State), nil
}
func (dg *DockerGoClient) InspectContainer(dockerId string) (*docker.Container, error) {
client, err := dg.client()
if err != nil {
return nil, err
}
return client.InspectContainer(dockerId)
}
// DescribeDockerImages takes no arguments, and returns a JSON-encoded string of all of the images located on the host
func (dg *DockerGoClient) DescribeDockerImages() (string, error) {
client, err := dg.client()
if err != nil {
return "", err
}
imgs, err := client.ListImages(true)
if err != nil {
return "", err
}
response := DockerImageResponse{Images: imgs}
output, err := json.Marshal(response)
if err != nil {
return "", err
}
return string(output), nil
}
func (dg *DockerGoClient) StopContainer(dockerId string) error {
client, err := dg.client()
if err != nil {
return err
}
return client.StopContainer(dockerId, DEFAULT_TIMEOUT_SECONDS)
}
func (dg *DockerGoClient) StopContainerById(id string) error {
client, err := dg.client()
if err != nil {
return err
}
return client.StopContainer(id, DEFAULT_TIMEOUT_SECONDS)
}
func (dg *DockerGoClient) GetContainerName(id string) (string, error) {
client, err := dg.client()
if err != nil {
return "", err
}
container, err := client.InspectContainer(id)
if err != nil {
return "", err
}
return container.Name, nil
}
// client returns the last used client if one has worked in the past, or a newly
// created one if one has not been created yet
func (dg *DockerGoClient) client() (*docker.Client, error) {
if dockerclient != nil {
return dockerclient, nil
}
// Re-read the env in case they corrected it
endpoint := utils.DefaultIfBlank(os.Getenv(DOCKER_ENDPOINT_ENV_VARIABLE), DOCKER_DEFAULT_ENDPOINT)
client, err := docker.NewVersionedClient(endpoint, "1.15")
if err != nil {
log.Error("Unable to conect to docker client. Ensure daemon is running", "endpoint", endpoint, "err", err)
return nil, err
}
dockerclient = client
return dockerclient, err
}
// Listen to the docker event stream for container changes and pass them up
func (dg *DockerGoClient) ContainerEvents() (<-chan DockerContainerChangeEvent, <-chan error) {
errc := make(chan error)
client, err := dg.client()
if err != nil {
errc <- err
return nil, errc
}
events := make(chan *docker.APIEvents)
err = client.AddEventListener(events)
if err != nil {
errc <- errors.New("Unable to listen for docker events: " + err.Error())
return nil, errc
}
changedContainers := make(chan DockerContainerChangeEvent)
go func() {
for event := range events {
log.Debug("Got event from docker daemon", "event", event)
containerId := event.ID
image := event.From
var status api.ContainerStatus
switch event.Status {
case "create":
status = api.ContainerCreated
case "start":
status = api.ContainerRunning
case "stop":
status = api.ContainerStopped
case "die":
status = api.ContainerDead
case "kill":
status = api.ContainerDead
case "destroy":
case "pause":
case "unpause":
case "export":
// Image events
case "untag":
case "delete":
default:
log.Warn("Unknown status event! Maybe docker updated? ", "status", event.Status)
}
changedContainers <- DockerContainerChangeEvent{DockerId: containerId, Image: image, Status: status}
}
}()
return changedContainers, errc
}