/
artifact_batch_creator.go
126 lines (103 loc) · 3.63 KB
/
artifact_batch_creator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package agent
import (
"context"
"time"
"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/logger"
"github.com/buildkite/roko"
)
type ArtifactBatchCreatorConfig struct {
// The ID of the Job that these artifacts belong to
JobID string
// All the artifacts that need to be created
Artifacts []*api.Artifact
// Where the artifacts are being uploaded to on the command line
UploadDestination string
// CreateArtifactsTimeout, sets a context.WithTimeout around the CreateArtifacts API.
// If it's zero, there's no context timeout and the default HTTP timeout will prevail.
CreateArtifactsTimeout time.Duration
}
type ArtifactBatchCreator struct {
// The creation config
conf ArtifactBatchCreatorConfig
// The logger instance to use
logger logger.Logger
// The APIClient that will be used when uploading jobs
apiClient APIClient
}
func NewArtifactBatchCreator(l logger.Logger, ac APIClient, c ArtifactBatchCreatorConfig) *ArtifactBatchCreator {
return &ArtifactBatchCreator{
logger: l,
conf: c,
apiClient: ac,
}
}
func (a *ArtifactBatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
length := len(a.conf.Artifacts)
chunks := 30
// Split into the artifacts into chunks so we're not uploading a ton of
// files at once.
for i := 0; i < length; i += chunks {
j := i + chunks
if length < j {
j = length
}
// The artifacts that will be uploaded in this chunk
theseArtifacts := a.conf.Artifacts[i:j]
// An ID is required so Buildkite can ensure this create
// operation is idompotent (if we try and upload the same ID
// twice, it'll just return the previous data and skip the
// upload)
batch := &api.ArtifactBatch{
ID: api.NewUUID(),
Artifacts: theseArtifacts,
UploadDestination: a.conf.UploadDestination,
}
a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length)
timeout := a.conf.CreateArtifactsTimeout
// Retry the batch upload a couple of times
r := roko.NewRetrier(
roko.WithMaxAttempts(10),
roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)),
)
creation, err := roko.DoFunc(ctx, r, func(r *roko.Retrier) (*api.ArtifactBatchCreateResponse, error) {
ctxTimeout := ctx
if timeout != 0 {
var cancel func()
ctxTimeout, cancel = context.WithTimeout(ctx, a.conf.CreateArtifactsTimeout)
defer cancel()
}
creation, resp, err := a.apiClient.CreateArtifacts(ctxTimeout, a.conf.JobID, batch)
// the server returns a 403 code if the artifact has exceeded the service quota
// Break the retry on any 4xx code except for 429 Too Many Requests.
if resp != nil && (resp.StatusCode != 429 && resp.StatusCode >= 400 && resp.StatusCode <= 499) {
a.logger.Warn("Artifact creation failed with status code %d, breaking the retry loop", resp.StatusCode)
r.Break()
}
if err != nil {
a.logger.Warn("%s (%s)", err, r)
}
// after four attempts (0, 1, 2, 3)...
if r.AttemptCount() == 3 {
// The short timeout has given us fast feedback on the first couple of attempts,
// but perhaps the server needs more time to complete the request, so fall back to
// the default HTTP client timeout.
a.logger.Debug("CreateArtifacts timeout (%s) removed for subsequent attempts", timeout)
timeout = 0
}
return creation, err
})
// Did the batch creation eventually fail?
if err != nil {
return nil, err
}
// Save the id and instructions to each artifact
index := 0
for _, id := range creation.ArtifactIDs {
theseArtifacts[index].ID = id
theseArtifacts[index].UploadInstructions = creation.UploadInstructions
index += 1
}
}
return a.conf.Artifacts, nil
}