forked from tair-opensource/RedisShake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
114 lines (96 loc) · 2.78 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2016 CodisLabs. All Rights Reserved.
// Licensed under the MIT (MIT-LICENSE.txt) license.
package run
import (
"sync"
"pkg/libs/log"
"redis-shake/common"
"redis-shake/configure"
"redis-shake/dbSync"
)
// main struct
type CmdSync struct {
dbSyncers []*dbSync.DbSyncer
}
// return send buffer length, delay channel length, target db offset
func (cmd *CmdSync) GetDetailedInfo() interface{} {
ret := make([]map[string]interface{}, len(cmd.dbSyncers))
for i, syncer := range cmd.dbSyncers {
if syncer == nil {
continue
}
ret[i] = syncer.GetExtraInfo()
}
return ret
}
func (cmd *CmdSync) Main() {
type syncNode struct {
id int
source string
sourcePassword string
target []string
targetPassword string
slotLeftBoundary int
slotRightBoundary int
}
var slotDistribution []utils.SlotOwner
var err error
if conf.Options.SourceType == conf.RedisTypeCluster && conf.Options.ResumeFromBreakPoint {
if slotDistribution, err = utils.GetSlotDistribution(conf.Options.SourceAddressList[0], conf.Options.SourceAuthType,
conf.Options.SourcePasswordRaw, false); err != nil {
log.Errorf("get source slot distribution failed: %v", err)
return
}
}
// source redis number
total := utils.GetTotalLink()
syncChan := make(chan syncNode, total)
cmd.dbSyncers = make([]*dbSync.DbSyncer, total)
for i, source := range conf.Options.SourceAddressList {
var target []string
if conf.Options.TargetType == conf.RedisTypeCluster {
target = conf.Options.TargetAddressList
} else {
// round-robin pick
pick := utils.PickTargetRoundRobin(len(conf.Options.TargetAddressList))
target = []string{conf.Options.TargetAddressList[pick]}
}
// fetch slot boundary
leftSlotBoundary, rightSlotBoundary := utils.GetSlotBoundary(slotDistribution, source)
nd := syncNode{
id: i,
source: source,
sourcePassword: conf.Options.SourcePasswordRaw,
target: target,
targetPassword: conf.Options.TargetPasswordRaw,
slotLeftBoundary: leftSlotBoundary,
slotRightBoundary: rightSlotBoundary,
}
syncChan <- nd
}
var wg sync.WaitGroup
wg.Add(len(conf.Options.SourceAddressList))
for i := 0; i < int(conf.Options.SourceRdbParallel); i++ {
go func() {
for {
nd, ok := <-syncChan
if !ok {
break
}
// one sync link corresponding to one DbSyncer
ds := dbSync.NewDbSyncer(nd.id, nd.source, nd.sourcePassword, nd.target, nd.targetPassword,
nd.slotLeftBoundary, nd.slotRightBoundary, conf.Options.HttpProfile+i)
cmd.dbSyncers[nd.id] = ds
// run in routine
go ds.Sync()
// wait full sync done
<-ds.WaitFull
wg.Done()
}
}()
}
wg.Wait()
close(syncChan)
// never quit because increment syncing is always running
select {}
}