forked from couchbase/cbft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pindex_bleve_scorch_rollback.go
120 lines (99 loc) · 3.68 KB
/
pindex_bleve_scorch_rollback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (c) 2018 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
package cbft
import (
"encoding/binary"
"fmt"
"strconv"
"github.com/blevesearch/bleve/index/scorch"
"github.com/couchbase/cbgt"
log "github.com/couchbase/clog"
)
// Scorch implementation sketch: fetch all available rollback points.
// Determine which point is of relevance to get to at or before the
// wanted rollbackSeq and vBucketUUID. If found, rollback to that
// particular point.
func (t *BleveDest) partialScorchRollbackLOCKED(sh *scorch.Scorch,
partition string, vBucketUUID, rollbackSeq uint64) (bool, bool, error) {
seqMaxKey := []byte(partition)
// get vBucketMap/Opaque key
var vBucketMapKey []byte
if t.partitions[partition] != nil {
vBucketMapKey = t.partitions[partition].partitionOpaque
}
totSnapshotsExamined := 0
defer func() {
log.Printf("pindex_bleve_scorch_rollback: path: %s, totSnapshotExamined: %d",
t.path, totSnapshotsExamined)
}()
rollbackPoints, err := sh.RollbackPoints()
if err != nil {
return false, false, err
}
for _, rollbackPoint := range rollbackPoints {
totSnapshotsExamined++
var tryRevert bool
tryRevert, err = scorchSnapshotAtOrBeforeSeq(t.path, rollbackPoint, seqMaxKey,
vBucketMapKey, rollbackSeq, vBucketUUID)
if err != nil {
return false, false, err
}
if tryRevert {
log.Printf("pindex_bleve_scorch_rollback: trying revert, path: %s", t.path)
// Close the bleve index.
t.closeLOCKED()
err = sh.Rollback(rollbackPoint)
return true, err == nil, err
}
}
return false, false, err
}
// scorchSnapshotAtOrBeforeSeq returns true if the snapshot represents a seq
// number at or before the given seq number with a matching vBucket UUID.
func scorchSnapshotAtOrBeforeSeq(path string, rbp *scorch.RollbackPoint,
seqMaxKey, vBucketMapKey []byte,
seqMaxWant, vBucketUUIDWant uint64) (bool, error) {
v := rbp.GetInternal(seqMaxKey)
if v == nil {
return false, nil
}
if len(v) != 8 {
return false, fmt.Errorf("wrong len seqMaxKey: %s, v: %s", seqMaxKey, v)
}
seqMaxCurr := binary.BigEndian.Uint64(v[0:8])
// when no vBucketUUIDWant is given from Rollback
// then fallback to seqMaxCurr checks
if vBucketUUIDWant == 0 {
log.Printf("pindex_bleve_scorch_rollback: examining snapshot, path: %s,"+
" seqMaxKey: %s, seqMaxCurr: %d, seqMaxWant: %d",
path, seqMaxKey, seqMaxCurr, seqMaxWant)
return seqMaxCurr <= seqMaxWant, nil
}
// get the vBucketUUID
vbMap := rbp.GetInternal(vBucketMapKey)
if vbMap == nil {
log.Printf("pindex_bleve_scorch_rollback: No vBucketMap for vBucketMapKey: %s",
vBucketMapKey)
return false, nil
}
vBucketUUIDCurr, err := strconv.ParseUint(cbgt.ParseOpaqueToUUID(vbMap), 10, 64)
if err != nil {
log.Printf("pindex_bleve_scorch_rollback: ParseOpaqueToUUID failed for "+
"vbMap: %s, err: %s", vbMap, err)
return false, err
}
log.Printf("pindex_bleve_scorch_rollback: examining snapshot, path: %s,"+
" seqMaxKey: %s, seqMaxCurr: %d, seqMaxWant: %d"+
" vBucketMapKey: %s, vBucketUUIDCurr: %d, vBucketUUIDWant: %d",
path, seqMaxKey, seqMaxCurr, seqMaxWant, vBucketMapKey,
vBucketUUIDCurr, vBucketUUIDWant)
return (seqMaxCurr <= seqMaxWant && vBucketUUIDCurr == vBucketUUIDWant), nil
}