-
Notifications
You must be signed in to change notification settings - Fork 51
/
client.go
130 lines (107 loc) · 3.31 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package statsclient
import (
"context"
"errors"
"os"
"time"
"go.aporeto.io/trireme-lib/collector"
"go.aporeto.io/trireme-lib/controller/constants"
"go.aporeto.io/trireme-lib/controller/internal/enforcer/utils/rpcwrapper"
"go.aporeto.io/trireme-lib/controller/pkg/remoteenforcer/internal/client"
"go.aporeto.io/trireme-lib/controller/pkg/remoteenforcer/internal/statscollector"
"go.uber.org/zap"
)
const (
defaultStatsIntervalMiliseconds = 1000
defaultUserRetention = 10
statsContextID = "UNUSED"
statsRPCCommand = "ProxyRPCServer.PostStats"
)
// statsClient This is the struct for storing state for the rpc client
// which reports flow stats back to the controller process
type statsClient struct {
collector statscollector.Collector
rpchdl *rpcwrapper.RPCWrapper
secret string
statsChannel string
statsInterval time.Duration
userRetention time.Duration
stop chan bool
}
// NewStatsClient initializes a new stats client
func NewStatsClient(cr statscollector.Collector) (client.Reporter, error) {
sc := &statsClient{
collector: cr,
rpchdl: rpcwrapper.NewRPCWrapper(),
secret: os.Getenv(constants.EnvStatsSecret),
statsChannel: os.Getenv(constants.EnvStatsChannel),
statsInterval: defaultStatsIntervalMiliseconds * time.Millisecond,
userRetention: defaultUserRetention * time.Minute,
stop: make(chan bool),
}
if sc.statsChannel == "" {
return nil, errors.New("no path to stats socket provided")
}
if sc.secret == "" {
return nil, errors.New("no secret provided for stats channel")
}
return sc, nil
}
// sendStats async function which makes a rpc call to send stats every STATS_INTERVAL
func (s *statsClient) sendStats(ctx context.Context) {
ticker := time.NewTicker(s.statsInterval)
userTicker := time.NewTicker(s.userRetention)
// nolint : gosimple
for {
select {
case <-ticker.C:
flows := s.collector.GetFlowRecords()
users := s.collector.GetUserRecords()
if flows == nil && users == nil {
continue
}
s.sendRequest(flows, users)
case <-userTicker.C:
s.collector.FlushUserCache()
case <-ctx.Done():
return
}
}
}
func (s *statsClient) sendRequest(flows map[string]*collector.FlowRecord, users map[string]*collector.UserRecord) {
request := rpcwrapper.Request{
Payload: &rpcwrapper.StatsPayload{
Flows: flows,
Users: users,
},
}
if err := s.rpchdl.RemoteCall(
statsContextID,
statsRPCCommand,
&request,
&rpcwrapper.Response{},
); err != nil {
zap.L().Error("RPC failure in sending statistics: Unable to send flows", zap.Error(err))
}
}
// Send sends all the stats from the cache
func (s *statsClient) Send() error {
flows := s.collector.GetFlowRecords()
users := s.collector.GetUserRecords()
if flows == nil && users == nil {
zap.L().Debug("Flows and UserRecords are nil while sending stats to collector")
return nil
}
s.sendRequest(flows, users)
return nil
}
// Start This is an private function called by the remoteenforcer to connect back
// to the controller over a stats channel
func (s *statsClient) Run(ctx context.Context) error {
if err := s.rpchdl.NewRPCClient(statsContextID, s.statsChannel, s.secret); err != nil {
zap.L().Error("Stats RPC client cannot connect", zap.Error(err))
return err
}
go s.sendStats(ctx)
return nil
}