-
Notifications
You must be signed in to change notification settings - Fork 3
/
retry.go
261 lines (224 loc) · 8.74 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright 2023 The Authors (see AUTHORS file)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retry
import (
"context"
"errors"
"fmt"
"net/http"
"strconv"
"time"
"github.com/google/go-github/v61/github"
"github.com/sethvargo/go-gcslock"
"github.com/abcxyz/pkg/logging"
)
const (
acceptedMessage = "Accepted"
errAcquireLock = "Failed to acquire GCS lock."
errDeliveryEventExists = "Failed to check if event exists"
errWriteCheckpoint = "Failed to write checkpoint."
errRetrieveCheckpoint = "Failed to retrieve checkpoint."
errCallingGitHub = "Failed to call GitHub."
)
// eventIdentifier represents the required information used by the retry
// service for handling a GitHub event.
type eventIdentifier struct {
eventID int64
guid string
}
// handleRetry handles calling GitHub APIs to search and retry for failed
// events.
func (s *Server) handleRetry() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
now := time.Now().UTC()
ctx := r.Context()
logger := logging.FromContext(ctx)
if err := s.gcsLock.Acquire(ctx, s.lockTTL); err != nil {
var lockErr *gcslock.LockHeldError
if errors.As(err, &lockErr) {
logger.InfoContext(ctx, "lock is already acquired by another execution",
"code", http.StatusOK,
"body", errAcquireLock,
"method", "Acquire",
"error", lockErr.Error(),
)
// unable to obtain the lock, return a 200 so the scheduler doesnt attempt to reinvoke
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, http.StatusText(http.StatusOK))
return
}
logger.ErrorContext(ctx, "failed to call cloud storage",
"code", http.StatusInternalServerError,
"body", errAcquireLock,
"method", "Acquire",
"error", err.Error())
// unknown error
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
// read the last checkpoint from checkpoint table
prevCheckpoint, err := s.datastore.RetrieveCheckpointID(ctx, s.checkpointTableID)
if err != nil {
logger.ErrorContext(ctx, "failed to call RetrieveCheckpointID",
"code", http.StatusInternalServerError,
"body", errRetrieveCheckpoint,
"method", "RetrieveCheckpointID",
"error", err,
)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
logger.InfoContext(ctx, "retrieved last checkpoint", "prev_checkpoint", prevCheckpoint)
var totalEventCount int
var redeliveredEventCount int
var firstCheckpoint string
var cursor string
newCheckpoint := prevCheckpoint
// store all observed failures in memory from the latest event up to the prevCheckpoint
var failedEventsHistory []*eventIdentifier
var found bool
// the first run of this service will not have a cursor therefore we must
// ensure we run the loop at least once
for ok := true; ok; ok = (cursor != "" && !found) {
// call list deliveries API, first call is intentionally an empty string
deliveries, res, err := s.github.ListDeliveries(ctx, &github.ListCursorOptions{
Cursor: cursor,
PerPage: 100,
})
if err != nil {
logger.ErrorContext(ctx, "failed to call ListDeliveries",
"code", http.StatusInternalServerError,
"body", errCallingGitHub,
"method", "RedeliverEvent",
"error", err,
)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
// in anticipation of the happy path, store the first event to advance the
// cursor
if firstCheckpoint == "" {
firstCheckpoint = strconv.FormatInt(*deliveries[0].ID, 10)
}
logger.InfoContext(ctx, "retrieve deliveries from GitHub",
"cursor", cursor,
"size", len(deliveries))
// update the cursor
cursor = res.Cursor
// for each failed delivery, redeliver
for i := 0; i < len(deliveries); i++ {
// append to the total events counter
totalEventCount += 1
event := deliveries[i]
// reached the last checkpoint, all events equal to and older than this
// one have already been processed
if prevCheckpoint == strconv.FormatInt(*event.ID, 10) {
found = true
break
}
// check payload and see if its been successfully delivered, if so skip over it
if *event.StatusCode >= 200 && *event.StatusCode <= 299 {
continue
}
failedEventsHistory = append(failedEventsHistory, &eventIdentifier{eventID: *event.ID, guid: *event.GUID})
}
}
failedEventCount := len(failedEventsHistory)
// work backwards from the list of failed events then attempt redelivery and
// increment the newCheckpoint in an effort to close the gap to the most
// recent event, this should alleviate pressure on future runs
for i := failedEventCount - 1; failedEventCount > 0 && i >= 0; i-- {
eventIdentifier := failedEventsHistory[i]
if err := s.github.RedeliverEvent(ctx, eventIdentifier.eventID); err != nil {
var acceptedErr *github.AcceptedError
if !errors.As(err, &acceptedErr) {
// found an unaccepted error, check if its already in the events table
exists, err := s.datastore.DeliveryEventExists(ctx, s.eventsTableID, eventIdentifier.guid)
if err != nil {
logger.ErrorContext(ctx, "failed to call BigQuery",
"method", "DeliveryEventExists",
"code", http.StatusInternalServerError,
"body", errDeliveryEventExists,
"error", err,
)
if newCheckpoint != prevCheckpoint {
s.writeMostRecentCheckpoint(ctx, w, newCheckpoint, prevCheckpoint, now,
totalEventCount, failedEventCount, redeliveredEventCount)
}
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
if !exists {
logger.ErrorContext(ctx, "failed to redeliver event, stop processing",
"code", http.StatusInternalServerError,
"body", errCallingGitHub,
"method", "RedeliverEvent",
"guid", eventIdentifier.guid,
"error", err,
"total_event_count", totalEventCount,
"failed_event_count", failedEventCount,
)
if newCheckpoint != prevCheckpoint {
s.writeMostRecentCheckpoint(ctx, w, newCheckpoint, prevCheckpoint, now,
totalEventCount, failedEventCount, redeliveredEventCount)
}
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
}
}
logger.InfoContext(ctx, "detected a failed event and successfully redelivered", "event_id", eventIdentifier.eventID)
redeliveredEventCount += 1
newCheckpoint = strconv.FormatInt(eventIdentifier.eventID, 10)
}
// advance the checkpoint to the first entry read on this run to avoid
// redundant processing
newCheckpoint = firstCheckpoint
s.writeMostRecentCheckpoint(ctx, w, newCheckpoint, prevCheckpoint, now,
totalEventCount, failedEventCount, redeliveredEventCount)
logger.InfoContext(ctx, "successful",
"code", http.StatusAccepted,
"body", acceptedMessage,
"total_event_count", totalEventCount,
"failed_event_count", failedEventCount,
"redelivered_event_count", redeliveredEventCount,
)
w.WriteHeader(http.StatusAccepted)
fmt.Fprint(w, http.StatusText(http.StatusAccepted))
})
}
// writeMostRecentCheckpoint is a helper function to write to the checkpoint
// table with the last successfully processed checkpoint denoted by
// newCheckpoint.
func (s *Server) writeMostRecentCheckpoint(ctx context.Context, w http.ResponseWriter,
newCheckpoint, prevCheckpoint string, now time.Time, totalEventCount, failedEventCount, redeliveredEventCount int,
) {
logging.FromContext(ctx).InfoContext(ctx, "write new checkpoint",
"prev_checkpoint", prevCheckpoint,
"new_checkpoint", newCheckpoint)
createdAt := now.Format(time.DateTime)
if err := s.datastore.WriteCheckpointID(ctx, s.checkpointTableID, newCheckpoint, createdAt); err != nil {
logging.FromContext(ctx).ErrorContext(ctx, "failed to call WriteCheckpointID",
"code", http.StatusInternalServerError,
"body", errWriteCheckpoint,
"method", "RedeliverEvent",
"error", err,
"total_event_count", totalEventCount,
"failed_event_count", failedEventCount,
"redelivered_event_count", redeliveredEventCount,
)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
}