/
agentcccpcfg.go
81 lines (65 loc) · 1.71 KB
/
agentcccpcfg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package gocbcore
import (
"math/rand"
"time"
)
func (agent *Agent) cccpLooper() {
tickTime := agent.confCccpPollPeriod
maxWaitTime := agent.confCccpMaxWait
logDebugf("CCCP Looper starting.")
nodeIdx := -1
for {
// Wait for either the agent to be shut down, or our tick time to expire
select {
case <-time.After(tickTime):
case <-agent.closeNotify:
}
routingInfo := agent.routingInfo.Get()
if routingInfo == nil {
// If we have a blank routingInfo, it indicates the client is shut down.
break
}
numNodes := routingInfo.clientMux.NumPipelines()
if numNodes == 0 {
logDebugf("CCCPPOLL: No nodes available to poll")
continue
}
if nodeIdx < 0 {
nodeIdx = rand.Intn(numNodes)
}
var foundConfig *cfgBucket
for nodeOff := 0; nodeOff < numNodes; nodeOff++ {
nodeIdx = (nodeIdx + 1) % numNodes
pipeline := routingInfo.clientMux.GetPipeline(nodeIdx)
client := syncClient{
client: &memdPipelineSenderWrap{
pipeline: pipeline,
},
}
cccpBytes, err := client.ExecCccpRequest(time.Now().Add(maxWaitTime))
if err != nil {
logDebugf("CCCPPOLL: Failed to retrieve CCCP config. %v", err)
continue
}
hostName, err := hostFromHostPort(pipeline.Address())
if err != nil {
logErrorf("CCCPPOLL: Failed to parse source address. %v", err)
continue
}
bk, err := parseConfig(cccpBytes, hostName)
if err != nil {
logDebugf("CCCPPOLL: Failed to parse CCCP config. %v", err)
continue
}
foundConfig = bk
break
}
if foundConfig == nil {
logDebugf("CCCPPOLL: Failed to retrieve config from any node.")
continue
}
logDebugf("CCCPPOLL: Received new config")
agent.updateConfig(foundConfig)
}
close(agent.cccpLooperDoneSig)
}