/
mock_reader.go
99 lines (87 loc) · 2.38 KB
/
mock_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package tunnel
import (
"fmt"
"math/rand"
"time"
"mongoshake/oplog"
LOG "github.com/vinllen/log4go"
"github.com/vinllen/mgo/bson"
)
const (
BatchSize = 64
TableName = "mongoshake_mock.table"
)
var opDict = []string{"i", "d", "u", "n"}
type MockReader struct {
generator []*FakeGenerator
}
type FakeGenerator struct {
// not owned
replayer []Replayer
index uint32
}
func (tunnel *MockReader) Link(replayer []Replayer) error {
tunnel.generator = make([]*FakeGenerator, len(replayer))
for i := 0; i != len(replayer); i++ {
LOG.Info("mock receiver generator-%d start", i)
tunnel.generator[i] = &FakeGenerator{replayer: replayer}
tunnel.generator[i].index = uint32(i)
go tunnel.generator[i].start()
}
return nil
}
func (generator *FakeGenerator) start() {
existIds := make(map[string]bson.ObjectId, 10000000)
for {
var batch []*oplog.GenericOplog
var partialLog *oplog.PartialLog
for i := 0; i != BatchSize; i++ {
partialLog = &oplog.PartialLog{
Timestamp: bson.MongoTimestamp(time.Now().Unix() << 32),
Namespace: fmt.Sprintf("%s_%d", TableName, generator.index),
}
switch nr := rand.Uint32(); {
case nr%1000 == 0:
// noop 0.1%
partialLog.Operation = "n"
partialLog.Gid = "mock-noop"
partialLog.Object = bson.M{"mongoshake-mock": "ApsaraDB"}
case nr%100 == 0:
// delete 1%
for k, oid := range existIds {
partialLog.Operation = "d"
partialLog.Gid = "mock-delete"
partialLog.Object = bson.M{"_id": oid}
delete(existIds, k)
break
}
case nr%3 == 0:
// update 30%
for _, oid := range existIds {
partialLog.Operation = "u"
partialLog.Gid = "mock-update"
partialLog.Object = bson.M{"$set": bson.M{"updates": nr}}
partialLog.Query = bson.M{"_id": oid}
break
}
default:
// insert 70%
oid := bson.NewObjectId()
partialLog.Operation = "i"
partialLog.Gid = "mock-insert"
partialLog.Object = bson.M{"_id": oid, "test": "1", "abc": nr}
existIds[oid.Hex()] = oid
}
bytes, _ := bson.Marshal(partialLog)
batch = append(batch, &oplog.GenericOplog{Raw: bytes})
}
generator.replayer[generator.index].Sync(&TMessage{
Checksum: 0,
Tag: MsgRetransmission,
Shard: generator.index,
Compress: 0,
RawLogs: oplog.LogEntryEncode(batch),
}, nil)
LOG.Info("mock generator-index-%d generate and apply logs %d", generator.index, len(batch))
}
}