/
client.go
102 lines (86 loc) · 2.18 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package client
import (
"badoo/_packages/log"
"github.com/badoo/lsd/internal/client/offsets"
"github.com/badoo/lsd/internal/traffic"
lsdProto "github.com/badoo/lsd/proto"
"fmt"
"sync"
"time"
)
func NewClient(config *lsdProto.LsdConfigClientConfigT, trafficManager *traffic.Manager) (*Client, error) {
offsetsDb, err := offsets.InitDb(config.GetOffsetsDb())
if err != nil {
return nil, fmt.Errorf("failed to load offsetsDb db from disk: %v", err)
}
if offsetsDb.IsEmpty() {
log.Warning("loaded offsets db is empty")
}
netRouter, err := NewNetworkRouter(offsetsDb, trafficManager, config)
if err != nil {
return nil, fmt.Errorf("failed to init network router: %v", err)
}
fsRouter, err := newFsRouter(config, trafficManager, netRouter, offsetsDb)
if err != nil {
return nil, fmt.Errorf("failed to init fs router: %v", err)
}
return &Client{
config: config,
offsetsDb: offsetsDb,
trafficManager: trafficManager,
fsRouter: fsRouter,
netRouter: netRouter,
}, nil
}
type Client struct {
config *lsdProto.LsdConfigClientConfigT
offsetsDb *offsets.Db
trafficManager *traffic.Manager
fsRouter *fsRouter
netRouter *NetworkRouter
}
func (c *Client) Start() {
// calculate, update and publish categories traffic stats
c.trafficManager.Publish()
go c.updateTrafficStatsLoop()
// save offsets db to disk
go c.saveOffsetsLoop()
c.fsRouter.start()
}
func (c *Client) Stop() {
// can stop all in parallel
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
c.fsRouter.stop()
wg.Done()
}()
wg.Add(1)
go func() {
c.netRouter.stop()
wg.Done()
}()
wg.Wait()
// force offsets db save before exit
err := c.offsetsDb.Save()
if err != nil {
log.Errorf("failed to save offsets on stop: %v", err)
}
}
func (c *Client) updateTrafficStatsLoop() {
ticker := time.Tick(int2sec(c.config.GetTrafficStatsRecalcInterval()))
for {
<-ticker
c.trafficManager.Recalculate(c.config.GetTrafficStatsRecalcInterval())
}
}
func (c *Client) saveOffsetsLoop() {
ticker := time.Tick(int2sec(c.config.GetOffsetsSaveInterval()))
for {
<-ticker
err := c.offsetsDb.Save()
if err != nil {
log.Fatalf("failed to save offsets: %v", err)
}
}
}