forked from brocaar/chirpstack-network-server
/
uplink.go
129 lines (114 loc) · 3.8 KB
/
uplink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package uplink
import (
"encoding/base64"
"encoding/hex"
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"github.com/brocaar/loraserver/api/gw"
"github.com/brocaar/loraserver/internal/config"
"github.com/brocaar/loraserver/internal/downlink/ack"
"github.com/brocaar/loraserver/internal/framelog"
"github.com/brocaar/loraserver/internal/gateway"
"github.com/brocaar/loraserver/internal/models"
"github.com/brocaar/loraserver/internal/uplink/data"
"github.com/brocaar/loraserver/internal/uplink/join"
"github.com/brocaar/loraserver/internal/uplink/proprietary"
"github.com/brocaar/loraserver/internal/uplink/rejoin"
"github.com/brocaar/lorawan"
)
// Server represents a server listening for uplink packets.
type Server struct {
wg sync.WaitGroup
}
// NewServer creates a new server.
func NewServer() *Server {
return &Server{}
}
// Start starts the server.
func (s *Server) Start() error {
go func() {
s.wg.Add(1)
defer s.wg.Done()
HandleRXPackets(&s.wg)
}()
go func() {
s.wg.Add(1)
defer s.wg.Done()
HandleDownlinkTXAcks(&s.wg)
}()
return nil
}
// Stop closes the gateway backend and waits for the server to complete the
// pending packets.
func (s *Server) Stop() error {
if err := config.C.NetworkServer.Gateway.Backend.Backend.Close(); err != nil {
return fmt.Errorf("close gateway backend error: %s", err)
}
log.Info("waiting for pending actions to complete")
s.wg.Wait()
return nil
}
// HandleRXPackets consumes received packets by the gateway and handles them
// in a separate go-routine. Errors are logged.
func HandleRXPackets(wg *sync.WaitGroup) {
for uplinkFrame := range config.C.NetworkServer.Gateway.Backend.Backend.RXPacketChan() {
go func(uplinkFrame gw.UplinkFrame) {
wg.Add(1)
defer wg.Done()
if err := HandleRXPacket(uplinkFrame); err != nil {
data := base64.StdEncoding.EncodeToString(uplinkFrame.PhyPayload)
log.WithField("data_base64", data).WithError(err).Error("processing uplink frame error")
}
}(uplinkFrame)
}
}
// HandleRXPacket handles a single rxpacket.
func HandleRXPacket(uplinkFrame gw.UplinkFrame) error {
return collectPackets(uplinkFrame)
}
// HandleDownlinkTXAcks consumes received downlink tx acknowledgements from
// the gateway.
func HandleDownlinkTXAcks(wg *sync.WaitGroup) {
for downlinkTXAck := range config.C.NetworkServer.Gateway.Backend.Backend.DownlinkTXAckChan() {
go func(downlinkTXAck gw.DownlinkTXAck) {
wg.Add(1)
defer wg.Done()
if err := ack.HandleDownlinkTXAck(downlinkTXAck); err != nil {
log.WithFields(log.Fields{
"gateway_id": hex.EncodeToString(downlinkTXAck.GatewayId),
"token": downlinkTXAck.Token,
}).WithError(err).Error("handle downlink tx ack error")
}
}(downlinkTXAck)
}
}
func collectPackets(uplinkFrame gw.UplinkFrame) error {
return collectAndCallOnce(config.C.Redis.Pool, uplinkFrame, func(rxPacket models.RXPacket) error {
// update the gateway meta-data
if err := gateway.UpdateMetaDataInRxInfoSet(config.C.PostgreSQL.DB, config.C.Redis.Pool, rxPacket.RXInfoSet); err != nil {
log.WithError(err).Error("update gateway meta-data in rx-info set error")
}
// log the frame for each receiving gatewa
if err := framelog.LogUplinkFrameForGateways(config.C.Redis.Pool, gw.UplinkFrameSet{
PhyPayload: uplinkFrame.PhyPayload,
TxInfo: rxPacket.TXInfo,
RxInfo: rxPacket.RXInfoSet,
}); err != nil {
log.WithError(err).Error("log uplink frames for gateways error")
}
// handle the frame based on message-type
switch rxPacket.PHYPayload.MHDR.MType {
case lorawan.JoinRequest:
return join.Handle(rxPacket)
case lorawan.RejoinRequest:
return rejoin.Handle(rxPacket)
case lorawan.UnconfirmedDataUp, lorawan.ConfirmedDataUp:
return data.Handle(rxPacket)
case lorawan.Proprietary:
return proprietary.Handle(rxPacket)
default:
return nil
}
})
}