-
Notifications
You must be signed in to change notification settings - Fork 51
/
stats.go
130 lines (103 loc) · 2.92 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package remoteenforcer
import (
"fmt"
"os"
"strconv"
"time"
"go.uber.org/zap"
"github.com/aporeto-inc/trireme/collector"
"github.com/aporeto-inc/trireme/constants"
"github.com/aporeto-inc/trireme/enforcer/utils/rpcwrapper"
)
const (
defaultStatsIntervalMiliseconds = 1000
statsContextID = "UNUSED"
statsRPCCommand = "StatsServer.GetStats"
)
//StatsClient This is the struct for storing state for the rpc client
//which reports flow stats back to the controller process
type StatsClient struct {
collector *CollectorImpl
rpchdl *rpcwrapper.RPCWrapper
secret string
statsChannel string
statsInterval time.Duration
stop chan bool
}
// NewStatsClient initializes a new stats client
func NewStatsClient() (Stats, error) {
statsChannel := os.Getenv(constants.AporetoEnvStatsChannel)
if statsChannel == "" {
return nil, fmt.Errorf("No path to stats socket provided")
}
secret := os.Getenv(constants.AporetoEnvStatsSecret)
if secret == "" {
return nil, fmt.Errorf("No secret provided for stats channel")
}
statsInterval := defaultStatsIntervalMiliseconds * time.Millisecond
envstatsInterval, err := strconv.Atoi(os.Getenv("STATS_INTERVAL"))
if err == nil && envstatsInterval != 0 {
statsInterval = time.Duration(envstatsInterval) * time.Second
}
return &StatsClient{
collector: NewCollector(),
rpchdl: rpcwrapper.NewRPCWrapper(),
secret: secret,
statsChannel: statsChannel,
statsInterval: statsInterval,
stop: make(chan bool),
}, nil
}
//SendStats async function which makes a rpc call to send stats every STATS_INTERVAL
func (s *StatsClient) SendStats() {
ticker := time.NewTicker(s.statsInterval)
// nolint : gosimple
for {
select {
case <-ticker.C:
s.collector.Lock()
if len(s.collector.Flows) == 0 {
s.collector.Unlock()
break
}
collected := s.collector.Flows
s.collector.Flows = map[string]*collector.FlowRecord{}
s.collector.Unlock()
if len(collected) == 0 {
continue
}
rpcPayload := &rpcwrapper.StatsPayload{
Flows: collected,
}
request := rpcwrapper.Request{
Payload: rpcPayload,
}
err := s.rpchdl.RemoteCall(
statsContextID,
statsRPCCommand,
&request,
&rpcwrapper.Response{},
)
if err != nil {
zap.L().Error("RPC failure in sending statistics: Unable to send flows")
}
case <-s.stop:
return
}
}
}
// ConnectStatsClient This is an private function called by the remoteenforcer to connect back
// to the controller over a stats channel
func (s *StatsClient) ConnectStatsClient() error {
if err := s.rpchdl.NewRPCClient(statsContextID, s.statsChannel, s.secret); err != nil {
zap.L().Error("Stats RPC client cannot connect", zap.Error(err))
return err
}
go s.SendStats()
return nil
}
// Stop stops the stats client at clean up
func (s *StatsClient) Stop() {
s.stop <- true
zap.L().Debug("Stopping stats collector")
}