/
stats_handler.go
61 lines (49 loc) · 1.41 KB
/
stats_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package gateway
import (
"context"
"sync"
"github.com/gofrs/uuid"
log "github.com/sirupsen/logrus"
"github.com/brocaar/chirpstack-api/go/v3/gw"
"github.com/brocaar/chirpstack-network-server/v3/internal/backend/gateway"
"github.com/brocaar/chirpstack-network-server/v3/internal/gateway/stats"
"github.com/brocaar/chirpstack-network-server/v3/internal/logging"
)
// StatsHandler represents a stat handler for incoming gateway stats.
type StatsHandler struct {
wg sync.WaitGroup
}
// Start starts the stats handler.
func (s *StatsHandler) Start() error {
go func() {
s.wg.Add(1)
defer s.wg.Done()
if gateway.Backend() == nil {
return
}
for gwStats := range gateway.Backend().StatsPacketChan() {
go func(gwStats gw.GatewayStats) {
s.wg.Add(1)
defer s.wg.Done()
var statsID uuid.UUID
if gwStats.StatsId != nil {
copy(statsID[:], gwStats.StatsId)
}
ctx := context.Background()
ctx = context.WithValue(ctx, logging.ContextIDKey, statsID)
if err := stats.Handle(ctx, gwStats); err != nil {
log.WithError(err).WithFields(log.Fields{
"ctx_id": ctx.Value(logging.ContextIDKey),
}).Error("gateway: handle gateway stats error")
}
}(gwStats)
}
}()
return nil
}
// Stop waits for the stats handler to complete the pending packets.
// At this stage the gateway backend must already been closed.
func (s *StatsHandler) Stop() error {
s.wg.Wait()
return nil
}