-
Notifications
You must be signed in to change notification settings - Fork 405
/
image_puller.go
183 lines (146 loc) · 4.52 KB
/
image_puller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package docker
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fsouza/go-dockerclient"
)
// ImagePuller is an abstraction layer to handle concurrent docker-pulls. Docker internally
// does not handle concurrency very well. Only layer-blob pulls have a listener/follow serialization
// leaving manifest/config fetches out. For instance, a single layer image, when merely two docker-pulls
// are initiated at the same time, this causes 9 HTTP-GET requests from docker to the repository where
// 4 extra & unnecessary HTTP GETs are initiated. Below is a simple listener/follower serialization, where
// any new requests are added as listeners to the ongoing docker-pull requests.
type ImagePuller interface {
PullImage(ctx context.Context, cfg *docker.AuthConfiguration, img, repo, tag string) chan error
SetRetryPolicy(policy common.BackOffConfig, checker drivers.RetryErrorChecker) error
}
type transfer struct {
ctx context.Context // oldest context
key string
cfg *docker.AuthConfiguration
img string
repo string
tag string
listeners []chan error
}
type imagePuller struct {
docker dockerClient
lock sync.Mutex
transfers map[string]*transfer
// backoff/retry settings
isRetriable drivers.RetryErrorChecker
backOffCfg common.BackOffConfig
}
func NewImagePuller(docker dockerClient) ImagePuller {
c := imagePuller{
docker: docker,
transfers: make(map[string]*transfer),
isRetriable: func(error) (bool, string) { return false, "" },
}
return &c
}
func (i *imagePuller) SetRetryPolicy(policy common.BackOffConfig, checker drivers.RetryErrorChecker) error {
i.isRetriable = checker
i.backOffCfg = policy
return nil
}
// newTransfer initiates a new docker-pull if there's no active docker-pull present for the same image.
func (i *imagePuller) newTransfer(ctx context.Context, cfg *docker.AuthConfiguration, img, repo, tag string) chan error {
key := fmt.Sprintf("%s %s %+v", repo, tag, cfg)
i.lock.Lock()
trx, ok := i.transfers[key]
if !ok {
trx = &transfer{
ctx: ctx,
key: key,
cfg: cfg,
img: img,
repo: repo,
tag: tag,
listeners: make([]chan error, 0, 1),
}
i.transfers[key] = trx
}
errC := make(chan error, 1)
trx.listeners = append(trx.listeners, errC)
i.lock.Unlock()
// First time call for this image/key, start a docker-pull
if !ok {
go i.startTransfer(trx)
}
return errC
}
func (i *imagePuller) pullWithRetry(trx *transfer) error {
backoff := common.NewBackOff(i.backOffCfg)
for {
err := i.docker.PullImage(docker.PullImageOptions{Repository: trx.repo, Tag: trx.tag, Context: trx.ctx}, *trx.cfg)
ok, reason := i.isRetriable(err)
if !ok {
return err
}
delay, ok := backoff.NextBackOff()
if !ok {
return err
}
select {
case <-time.After(delay):
recordRetry(trx.ctx, "docker_pull_image", reason)
case <-trx.ctx.Done():
return trx.ctx.Err()
}
}
}
func (i *imagePuller) startTransfer(trx *transfer) {
var ferr error
err := i.pullWithRetry(trx)
if err != nil {
common.Logger(trx.ctx).WithError(err).Info("Failed to pull image")
// TODO need to inspect for hub or network errors and pick; for now, assume
// 500 if not a docker error
msg := err.Error()
code := http.StatusBadGateway
if dErr, ok := err.(*docker.Error); ok {
msg = dockerMsg(dErr)
if dErr.Status >= 400 && dErr.Status < 500 {
code = dErr.Status // decap 4xx errors
}
}
err := models.NewAPIError(code, fmt.Errorf("Failed to pull image '%s': %s", trx.img, msg))
ferr = models.NewFuncError(err)
}
i.lock.Lock()
defer i.lock.Unlock()
// notify any listeners
for _, ch := range trx.listeners {
if ferr != nil {
ch <- ferr
}
close(ch)
}
// unregister the docker-pull
delete(i.transfers, trx.key)
}
func (i *imagePuller) PullImage(ctx context.Context, cfg *docker.AuthConfiguration, img, repo, tag string) chan error {
return i.newTransfer(ctx, cfg, img, repo, tag)
}
// removes docker err formatting: 'API Error (code) {"message":"..."}'
func dockerMsg(derr *docker.Error) string {
// derr.Message is a JSON response from docker, which has a "message" field we want to extract if possible.
// this is pretty lame, but it is what it is
var v struct {
Msg string `json:"message"`
}
err := json.Unmarshal([]byte(derr.Message), &v)
if err != nil {
// If message was not valid JSON, the raw body is still better than nothing.
return derr.Message
}
return v.Msg
}