/
recovery.go
320 lines (277 loc) · 10.5 KB
/
recovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package consumer
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"strings"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3"
"go.gazette.dev/core/broker/client"
pb "go.gazette.dev/core/broker/protocol"
pc "go.gazette.dev/core/consumer/protocol"
"go.gazette.dev/core/consumer/recoverylog"
"go.gazette.dev/core/labels"
"go.gazette.dev/core/message"
"google.golang.org/grpc/codes"
)
type fetchedHints struct {
log pb.Journal
txnResp *clientv3.TxnResponse
hints []*recoverylog.FSMHints
}
// fetchHints retrieves and decodes all FSMHints for the ShardSpec.
// Nil values will be returned where hint values have not been written. It also
// returns a TxnResponse holding each of the hints values, which can be used for
// transactional updates of hints.
func fetchHints(ctx context.Context, spec *pc.ShardSpec, etcd *clientv3.Client) (out fetchedHints, err error) {
var ops = []clientv3.Op{clientv3.OpGet(spec.HintPrimaryKey())}
for _, hk := range spec.HintBackupKeys() {
ops = append(ops, clientv3.OpGet(hk))
}
out.log = spec.RecoveryLog()
if out.txnResp, err = etcd.Txn(ctx).If().Then(ops...).Commit(); err != nil {
err = errors.WithMessage(err, "fetching ShardSpec.HintKeys")
return
}
for i := range out.txnResp.Responses {
var kvs = out.txnResp.Responses[i].GetResponseRange().Kvs
if len(kvs) == 0 {
out.hints = append(out.hints, nil) // No FSMHint at this key.
continue
}
var h = new(recoverylog.FSMHints)
// Sense whether JSON or proto encoding is used by testing for opening '{'.
if kvs[0].Value[0] != '{' {
if err = h.Unmarshal(kvs[0].Value); err != nil {
err = errors.WithMessage(err, "hints.Unmarshal")
}
} else {
if err = json.Unmarshal(kvs[0].Value, h); err != nil {
err = errors.WithMessage(err, "json.Unmarshal(hints)")
}
}
if err != nil {
// Pass.
} else if _, err = recoverylog.NewFSM(*h); err != nil {
err = errors.WithMessage(err, "validating FSMHints")
} else if h.Log != out.log {
err = errors.Errorf("hints.Log %s != ShardSpec.RecoveryLog %s", h.Log, out.log)
}
if err != nil {
return
}
out.hints = append(out.hints, h)
}
return
}
// pickFirstHints retrieves the first hints from |hints|.
// If there are no primary hints available the most recent backup hints will be returned.
// If there are no hints available an empty set of hints is returned.
func pickFirstHints(hints *pc.GetHintsResponse, log pb.Journal) recoverylog.FSMHints {
if hints.PrimaryHints.Hints != nil {
return *hints.PrimaryHints.Hints
}
for _, h := range hints.BackupHints {
if h.Hints != nil {
return *h.Hints
}
}
return recoverylog.FSMHints{Log: log}
}
// storeRecordedHints writes FSMHints into the primary hint key of the spec.
func storeRecordedHints(s *shard, hints recoverylog.FSMHints) error {
var key = s.Spec().HintPrimaryKey()
var asn = s.Assignment()
var val, err = json.Marshal(hints)
if err != nil {
return errors.WithMessage(err, "json.Marshal(hints)")
}
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
/*
var val, err = hints.Marshal()
if val, err = hints.Marshal(); err != nil {
return errors.WithMessage(err, "hints.Marshal")
}
*/
_, err = s.svc.Etcd.Txn(s.ctx).
// Verify our Assignment is still in effect (eg, we're still primary), then write |hints| to HintPrimaryKey.
// Compare CreateRevision to allow for a raced ReplicaState update.
If(clientv3.Compare(clientv3.CreateRevision(string(asn.Raw.Key)), "=", asn.Raw.CreateRevision)).
Then(clientv3.OpPut(key, string(val))).
Commit()
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.Unavailable {
// Recorded hints are advisory and can generally tolerate omitted
// updates. It's also annoying for temporary Etcd partitions to abort
// an otherwise-fine shard primary. So, log but allow shard processing
// to continue; we'll retry on the next hints flush interval.
log.WithFields(log.Fields{"key": key, "err": err}).
Warn("failed to store recorded FSMHints (will retry)")
} else if err != nil {
return err
}
return nil
}
// storeRecoveredHints writes the FSMHints into the first backup hint key of the spec,
// rotating hints previously stored under that key to the second backup hint key,
// and so on as a single transaction.
func storeRecoveredHints(s *shard, hints recoverylog.FSMHints) error {
var (
spec = s.Spec()
asn = s.Assignment()
backups = spec.HintBackupKeys()
)
var h, err = fetchHints(s.ctx, spec, s.svc.Etcd)
if err != nil {
return err
}
// |hints| is serialized and written to backups[1]. In the same txn,
// rotate the current value at backups[1] => backups[2], and so on.
var val []byte
if val, err = json.Marshal(hints); err != nil {
return errors.WithMessage(err, "json.Marshal(hints)")
}
// TODO(johnny): Switch over to hints.Marshal() when proto decode support is deployed.
/*
if val, err = hints.Marshal(); err != nil {
return errors.WithMessage(err, "hints.Marshal")
}
*/
var cmp []clientv3.Cmp
var ops []clientv3.Op
// The txn responses returned from fetchHints are structured such that the first response will
// be the primary response and the subsequent responses are the backup responses, this slice
// represents just the backup responses.
var backupResponses = h.txnResp.Responses[1:]
// Verify our Assignment is still in effect (eg, we're still primary).
cmp = append(cmp, clientv3.Compare(clientv3.CreateRevision(string(asn.Raw.Key)),
"=", asn.Raw.CreateRevision))
for i := 0; i != len(backups) && val != nil; i++ {
ops = append(ops, clientv3.OpPut(backups[i], string(val)))
if kvs := backupResponses[i].GetResponseRange().Kvs; len(kvs) == 0 {
// Verify there is still no current key/value at this hints key slot.
cmp = append(cmp, clientv3.Compare(clientv3.ModRevision(backups[i]), "=", 0))
val = nil
} else {
// Verify the key/value at this hints key slot is unchanged.
// Retain its value to rotate into the next slot (if one exists).
cmp = append(cmp, clientv3.Compare(clientv3.ModRevision(backups[i]), "=", kvs[0].ModRevision))
val = kvs[0].Value
}
}
var resp *clientv3.TxnResponse
resp, err = s.svc.Etcd.Txn(s.ctx).If(cmp...).Then(ops...).Commit()
if err == nil && !resp.Succeeded {
err = errors.New("unexpected Etcd transaction failure")
}
return err
}
// beginRecovery fetches FSMHints and recovers them into a temporary directory.
func beginRecovery(s *shard) error {
var err error
var spec = s.Spec()
s.recovery.hints, err = s.svc.GetHints(s.ctx, &pc.GetHintsRequest{
Shard: spec.Id,
})
if err == nil && s.recovery.hints.Status != pc.Status_OK {
err = fmt.Errorf(s.recovery.hints.Status.String())
}
if err != nil {
return fmt.Errorf("GetHints: %w", err)
}
var pickedHints = pickFirstHints(s.recovery.hints, s.recovery.log)
// Verify the |pickedHints| recovery log exists, and is of the correct Content-Type.
if logSpec, err := client.GetJournal(s.ctx, s.ajc, pickedHints.Log); err != nil {
return errors.WithMessage(err, "fetching log spec")
} else if ct := logSpec.LabelSet.ValueOf(labels.ContentType); ct != labels.ContentType_RecoveryLog {
return errors.Errorf("expected label %s value %s (got %v)", labels.ContentType, labels.ContentType_RecoveryLog, ct)
}
// Create local temporary directory into which we recover.
var dir string
if dir, err = ioutil.TempDir("", strings.ReplaceAll(spec.Id.String(), "/", "_")+"-"); err != nil {
return errors.WithMessage(err, "creating shard working directory")
}
log.WithFields(log.Fields{
"dir": dir,
"log": pickedHints.Log,
"id": spec.Id,
}).Info("began recovering shard store from log")
// Finally, play back the log.
if err = s.recovery.player.Play(s.ctx, pickedHints, dir, s.ajc); err != nil {
return errors.WithMessagef(err, "playing log %s", pickedHints.Log)
}
return nil
}
// completeRecovery injects a new AuthorID into the log to complete playback,
// initializes an Application Store & restores its Checkpoint, and persists
// recovered FSMHints.
func completeRecovery(s *shard) (_ pc.Checkpoint, err error) {
var (
recoveredHints recoverylog.FSMHints
cp pc.Checkpoint
)
if s.recovery.log != "" {
// Instruct our player to inject a log hand-off to our generated |author|.
var author = recoverylog.NewRandomAuthor()
s.recovery.player.InjectHandoff(author)
select {
case <-s.recovery.player.Done():
// Pass.
case <-s.ctx.Done():
err = s.ctx.Err()
return
}
var recovered = s.recovery.player.Resolved
if recovered.FSM == nil {
err = errors.Errorf("completeRecovery aborting due to log playback failure")
return
}
// We've completed log playback, and we're likely the most recent shard
// primary to do so. Snapshot our recovered hints. We'll sanity-check that
// we can open the recovered store & restore its Checkpoint, and only then
// persist these |recoveredHints|.
recoveredHints = recovered.FSM.BuildHints(recovered.FSM.LastLog)
// Initialize a *Recorder around the recovered file-system. Recorder
// fences its append operations around |author| so that another process
// completing InjectHandoff will cause appends of this Recorder to fail.
s.recovery.recorder = recoverylog.NewRecorder(
s.recovery.log, recovered.FSM, author, recovered.Dir, s.ajc)
}
if s.store, err = s.svc.App.NewStore(s, s.recovery.recorder); err != nil {
return cp, errors.WithMessage(err, "app.NewStore")
} else if cp, err = s.store.RestoreCheckpoint(s); err != nil {
return cp, errors.WithMessage(err, "store.RestoreCheckpoint")
}
// Store |recoveredHints| as a backup.
// For some workflows, the recoveredHints.Log may not equal our own log,
// in which case we omit this step.
// For example, Flow's shard split workflow instruments GetHints to
// return hints of the LHS split source which are recovered here.
if s.recovery.log != "" && s.recovery.log == recoveredHints.Log {
for attempt := 0; true; attempt++ {
if err = storeRecoveredHints(s, recoveredHints); err == nil {
break
}
// Storing recovered hints can sometimes fail, eg due to a
// shard assignment race.
log.WithFields(log.Fields{
"attempt": attempt,
"err": err,
"log": recoveredHints.Log,
}).Warn("failed to store recovered hints (will retry)")
select {
case <-s.ctx.Done():
return cp, s.ctx.Err()
case <-time.After(backoff(attempt)):
// Pass.
}
}
}
s.publisher = message.NewPublisher(s.ajc, &s.clock)
s.sequencer = message.NewSequencer(pc.FlattenProducerStates(cp), messageRingSize)
close(s.storeReadyCh) // Unblocks Resolve().
return cp, nil
}