-
Notifications
You must be signed in to change notification settings - Fork 440
/
incr.go
89 lines (76 loc) · 2.84 KB
/
incr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package coordinator
import (
"errors"
"github.com/alibaba/MongoShake/v2/collector"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"fmt"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
func (coordinator *ReplicationCoordinator) startOplogReplication(oplogStartPosition interface{},
fullSyncFinishPosition int64,
startTsMap map[string]int64) error {
// prepare all syncer. only one syncer while source is ReplicaSet or mongos
// otherwise one syncer connects to one shard
LOG.Info("start incr replication")
for i, src := range coordinator.RealSourceIncrSync {
var syncerTs interface{}
if len(coordinator.MongoD) > 0 {
// read from shard or replicaset
if val, ok := oplogStartPosition.(int64); ok && val == 0 {
if v, ok := startTsMap[src.ReplicaName]; !ok {
return fmt.Errorf("replia[%v] not exists on startTsMap[%v]", src.ReplicaName, startTsMap)
} else {
syncerTs = v
}
} else {
syncerTs = oplogStartPosition
}
} else {
// read from mongos
syncerTs = oplogStartPosition
LOG.Info("read from mongos src.ReplicaName:%s ts:%v", src.ReplicaName, startTsMap[src.ReplicaName])
if len(conf.Options.MongoSUrl) > 0 && len(conf.Options.MongoCsUrl) == 0 && len(conf.Options.MongoUrls) == 0 {
if v, ok := startTsMap[src.ReplicaName]; ok {
syncerTs = v
LOG.Info("read from mongos and set ts src.ReplicaName:%s ts:%v", src.ReplicaName, startTsMap[src.ReplicaName])
}
}
}
LOG.Info("RealSourceIncrSync[%d]: %s, startTimestamp[%v]", i, src, syncerTs)
syncer := collector.NewOplogSyncer(src.ReplicaName, syncerTs, fullSyncFinishPosition, src.URL,
src.Gids)
// syncerGroup http api registry
syncer.Init()
coordinator.syncerGroup = append(coordinator.syncerGroup, syncer)
}
// set to group 0 as a leader
coordinator.syncerGroup[0].SyncGroup = coordinator.syncerGroup
// prepare worker routine and bind it to syncer
for i := 0; i < conf.Options.IncrSyncWorker; i++ {
syncer := coordinator.syncerGroup[i%len(coordinator.syncerGroup)]
w := collector.NewWorker(syncer, uint32(i))
if !w.Init() {
return errors.New("worker initialize error")
}
w.SetInitSyncFinishTs(fullSyncFinishPosition)
// syncer and worker are independent. the relationship between
// them needs binding here. one worker definitely belongs to a specific
// syncer. However individual syncer could bind multi workers (if source
// of overall replication is single mongodb replica)
syncer.Bind(w)
go w.StartWorker()
}
for _, syncer := range coordinator.syncerGroup {
go syncer.Start()
}
// start http server
nimo.GoRoutine(func() {
if err := utils.IncrSyncHttpApi.Listen(); err != nil {
LOG.Critical("start incr sync server with port[%v] failed: %v", conf.Options.IncrSyncHTTPListenPort,
err)
}
})
return nil
}