-
Notifications
You must be signed in to change notification settings - Fork 441
/
orphan_filter.go
245 lines (223 loc) · 5.3 KB
/
orphan_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
package filter
import (
"crypto/md5"
"encoding/binary"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"math"
"github.com/alibaba/MongoShake/v2/oplog"
"github.com/alibaba/MongoShake/v2/sharding"
LOG "github.com/vinllen/log4go"
)
const (
// refer to mongo/bson/bsontypes.h of mongodb kernel 4.0
BsonInvalid = -1
BsonMinKey = 0
BsonTypeNumber = 10
BsonTypeString = 15
BsonTypeOid = 35
BsonMaxKey = 100
)
type OrphanFilter struct {
replset string
chunkMap sharding.DBChunkMap
}
func NewOrphanFilter(replset string, chunkMap sharding.DBChunkMap) *OrphanFilter {
return &OrphanFilter{
replset: replset,
chunkMap: chunkMap,
}
}
func (filter *OrphanFilter) Filter(docD bson.D, namespace string) bool {
if filter.chunkMap == nil {
LOG.Warn("chunk map is nil")
return false
}
shardCol, hasChunk := filter.chunkMap[namespace]
if !hasChunk {
return false
}
NextChunk:
for _, chunkRage := range shardCol.Chunks {
// check greater and equal than the minimum of the chunk range
for keyInd, keyName := range shardCol.Keys {
key := oplog.GetKey(docD, keyName)
if key == nil {
LOG.Crashf("OrphanFilter find no shard key[%v] in doc %v", keyName, docD)
}
if shardCol.ShardType == sharding.HashedShard {
key = ComputeHash(key)
}
if chunkLt(key, chunkRage.Mins[keyInd]) {
continue NextChunk
}
if chunkGt(key, chunkRage.Mins[keyInd]) {
break
}
}
// check less than the maximum of the chunk range
for keyInd, keyName := range shardCol.Keys {
key := oplog.GetKey(docD, keyName)
if key == nil {
LOG.Crashf("OrphanFilter find no shard ke[%v] in doc %v", keyName, docD)
}
if shardCol.ShardType == sharding.HashedShard {
key = ComputeHash(key)
}
if chunkGt(key, chunkRage.Maxs[keyInd]) {
continue NextChunk
}
if chunkLt(key, chunkRage.Maxs[keyInd]) {
break
}
if keyInd == len(shardCol.Keys)-1 {
continue NextChunk
}
}
// current key in the chunk, therefore dont filter
return false
}
LOG.Warn("document syncer %v filter orphan document %v with shard key %v in ns[%v]",
filter.replset, docD, shardCol.Keys, namespace)
return true
}
func ComputeHash(data interface{}) int64 {
// refer to mongo/db/hasher.cpp of mongodb kernel 4.0
w := md5.New()
var buf = make([]byte, 4)
binary.LittleEndian.PutUint32(buf, uint32(0))
w.Write(buf)
switch rd := data.(type) {
case string:
binary.LittleEndian.PutUint32(buf, uint32(BsonTypeString))
w.Write(buf)
binary.LittleEndian.PutUint32(buf, uint32(len(rd)+1))
w.Write(buf)
s := []byte(rd)
s = append(s, 0)
w.Write(s)
case int, int64, float64:
var rdu uint64
if rd1, ok := rd.(int); ok {
rdu = uint64(rd1)
} else if rd2, ok := rd.(int64); ok {
rdu = uint64(rd2)
} else if rd3, ok := rd.(float64); ok {
rdu = uint64(rd3)
}
binary.LittleEndian.PutUint32(buf, uint32(BsonTypeNumber))
w.Write(buf)
buf = make([]byte, 8)
binary.LittleEndian.PutUint64(buf, rdu)
w.Write(buf)
case primitive.ObjectID:
binary.LittleEndian.PutUint32(buf, uint32(BsonTypeOid))
w.Write(buf)
buf = rd[:]
w.Write(buf)
default:
LOG.Crashf("ComputeHash unsupported bson type %T %#v\n", data, data)
}
out := w.Sum(nil)
result := int64(binary.LittleEndian.Uint64(out))
return result
}
func fromHex(c byte) byte {
if '0' <= c && c <= '9' {
return c - '0'
}
if 'a' <= c && c <= 'f' {
return c - 'a' + 10
}
if 'A' <= c && c <= 'F' {
return c - 'A' + 10
}
return 0xff
}
func chunkGt(x, y interface{}) bool {
xType, rx := getBsonType(x)
yType, ry := getBsonType(y)
if xType != yType {
return xType > yType
}
switch xType {
case BsonMinKey:
return false
case BsonMaxKey:
return false
case BsonTypeNumber:
return rx.(float64) > ry.(float64)
case BsonTypeString:
return rx.(string) > ry.(string)
default:
LOG.Crashf("chunkGt meet unknown type %v", xType)
}
return true
}
func chunkEqual(x, y interface{}) bool {
xType, rx := getBsonType(x)
yType, ry := getBsonType(y)
if xType != yType {
return false
}
switch xType {
case BsonMinKey:
return true
case BsonMaxKey:
return true
case BsonTypeNumber:
return rx.(float64) == ry.(float64)
case BsonTypeString:
return rx.(string) == ry.(string)
default:
LOG.Crashf("chunkEqual meet unknown type %v", xType)
}
return true
}
func chunkLt(x, y interface{}) bool {
xType, rx := getBsonType(x)
yType, ry := getBsonType(y)
if xType != yType {
return xType < yType
}
switch xType {
case BsonMinKey:
return false
case BsonMaxKey:
return false
case BsonTypeNumber:
return rx.(float64) < ry.(float64)
case BsonTypeString:
return rx.(string) < ry.(string)
default:
LOG.Crashf("chunkLt meet unknown type %v", xType)
}
return true
}
func getBsonType(x interface{}) (int, interface{}) {
if x == int64(math.MinInt64) {
return BsonMinKey, nil
}
if x == int64(math.MaxInt64) {
return BsonMaxKey, nil
}
switch rx := x.(type) {
case float32:
return BsonTypeNumber, float64(rx)
case float64:
return BsonTypeNumber, rx
case int:
return BsonTypeNumber, float64(rx)
case int32:
return BsonTypeNumber, float64(rx)
case int64:
return BsonTypeNumber, float64(rx)
case string:
return BsonTypeString, rx
case primitive.ObjectID:
return BsonTypeOid, rx.Hex()
default:
LOG.Crashf("chunkLt meet unknown type %T", x)
}
return BsonInvalid, nil
}