-
-
Notifications
You must be signed in to change notification settings - Fork 545
/
uplink.go
276 lines (240 loc) · 8.59 KB
/
uplink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package uplink
import (
"context"
"encoding/hex"
"fmt"
"sync"
"time"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/brocaar/chirpstack-api/go/v3/common"
"github.com/brocaar/chirpstack-api/go/v3/gw"
"github.com/brocaar/chirpstack-api/go/v3/nc"
"github.com/brocaar/chirpstack-api/go/v3/ns"
"github.com/brocaar/chirpstack-network-server/v3/internal/backend/controller"
gwbackend "github.com/brocaar/chirpstack-network-server/v3/internal/backend/gateway"
"github.com/brocaar/chirpstack-network-server/v3/internal/config"
"github.com/brocaar/chirpstack-network-server/v3/internal/downlink/ack"
"github.com/brocaar/chirpstack-network-server/v3/internal/framelog"
"github.com/brocaar/chirpstack-network-server/v3/internal/gateway"
"github.com/brocaar/chirpstack-network-server/v3/internal/helpers"
"github.com/brocaar/chirpstack-network-server/v3/internal/logging"
"github.com/brocaar/chirpstack-network-server/v3/internal/models"
"github.com/brocaar/chirpstack-network-server/v3/internal/storage"
"github.com/brocaar/chirpstack-network-server/v3/internal/uplink/data"
"github.com/brocaar/chirpstack-network-server/v3/internal/uplink/join"
"github.com/brocaar/chirpstack-network-server/v3/internal/uplink/proprietary"
"github.com/brocaar/chirpstack-network-server/v3/internal/uplink/rejoin"
"github.com/brocaar/lorawan"
)
var (
deduplicationDelay time.Duration
)
// Setup configures the package.
func Setup(conf config.Config) error {
if err := data.Setup(conf); err != nil {
return errors.Wrap(err, "configure uplink/data error")
}
if err := join.Setup(conf); err != nil {
return errors.Wrap(err, "configure uplink/join error")
}
if err := rejoin.Setup(conf); err != nil {
return errors.Wrap(err, "configure uplink/rejoin error")
}
deduplicationDelay = conf.NetworkServer.DeduplicationDelay
return nil
}
// Server represents a server listening for uplink packets.
type Server struct {
wg sync.WaitGroup
}
// NewServer creates a new server.
func NewServer() *Server {
return &Server{}
}
// Start starts the server.
func (s *Server) Start() error {
go func() {
s.wg.Add(1)
defer s.wg.Done()
HandleUplinkFrames(&s.wg)
}()
go func() {
s.wg.Add(1)
defer s.wg.Done()
HandleDownlinkTXAcks(&s.wg)
}()
return nil
}
// Stop closes the gateway backend and waits for the server to complete the
// pending packets.
func (s *Server) Stop() error {
if err := gwbackend.Backend().Close(); err != nil {
return fmt.Errorf("close gateway backend error: %s", err)
}
log.Info("uplink: waiting for pending actions to complete")
s.wg.Wait()
return nil
}
// HandleUplinkFrames consumes received packets by the gateway and handles them
// in a separate go-routine. Errors are logged.
func HandleUplinkFrames(wg *sync.WaitGroup) {
for uplinkFrame := range gwbackend.Backend().RXPacketChan() {
go func(uplinkFrame gw.UplinkFrame) {
wg.Add(1)
defer wg.Done()
// The ctxID will be available as context value "ctx_id" so that
// this can be used when writing logs. This makes it easier to
// group multiple log-lines to the same context.
ctxID, err := uuid.NewV4()
if err != nil {
log.WithError(err).Error("uplink: get new uuid error")
}
ctx := context.Background()
ctx = context.WithValue(ctx, logging.ContextIDKey, ctxID)
if err := HandleUplinkFrame(ctx, uplinkFrame); err != nil {
log.WithFields(log.Fields{
"ctx_id": ctxID,
}).WithError(err).Error("uplink: processing uplink frame error")
}
}(uplinkFrame)
}
}
// HandleUplinkFrame handles a single uplink frame.
func HandleUplinkFrame(ctx context.Context, uplinkFrame gw.UplinkFrame) error {
return collectUplinkFrames(ctx, uplinkFrame)
}
// HandleDownlinkTXAcks consumes received downlink tx acknowledgements from
// the gateway.
func HandleDownlinkTXAcks(wg *sync.WaitGroup) {
for downlinkTXAck := range gwbackend.Backend().DownlinkTXAckChan() {
go func(downlinkTXAck gw.DownlinkTXAck) {
wg.Add(1)
defer wg.Done()
// The ctxID will be available as context value "ctx_id" so that
// this can be used when writing logs. This makes it easier to
// group multiple log-lines to the same context.
var ctxID uuid.UUID
if downlinkTXAck.DownlinkId != nil {
copy(ctxID[:], downlinkTXAck.DownlinkId)
}
ctx := context.Background()
ctx = context.WithValue(ctx, logging.ContextIDKey, ctxID)
if err := ack.HandleDownlinkTXAck(ctx, &downlinkTXAck); err != nil {
log.WithFields(log.Fields{
"gateway_id": hex.EncodeToString(downlinkTXAck.GatewayId),
"token": downlinkTXAck.Token,
"ctx_id": ctxID,
}).WithError(err).Error("uplink: handle downlink tx ack error")
}
}(downlinkTXAck)
}
}
func collectUplinkFrames(ctx context.Context, uplinkFrame gw.UplinkFrame) error {
return collectAndCallOnce(uplinkFrame, func(rxPacket models.RXPacket) error {
err := handleCollectedUplink(ctx, uplinkFrame, rxPacket)
if err != nil {
cause := errors.Cause(err)
if cause == storage.ErrDoesNotExist || cause == storage.ErrFrameCounterReset || cause == storage.ErrInvalidMIC || cause == storage.ErrFrameCounterRetransmission {
if _, err := controller.Client().HandleRejectedUplinkFrameSet(ctx, &nc.HandleRejectedUplinkFrameSetRequest{
FrameSet: &gw.UplinkFrameSet{
PhyPayload: uplinkFrame.PhyPayload,
TxInfo: rxPacket.TXInfo,
RxInfo: rxPacket.RXInfoSet,
},
}); err != nil {
log.WithError(err).Error("uplink: call controller HandleRejectedUplinkFrameSet RPC error")
}
}
}
return err
})
}
func runHandlerWithMetric(err error, mt lorawan.MType) error {
mts := mt.String()
if err != nil {
uplinkFrameCounter(mts + "Err").Inc()
return err
}
uplinkFrameCounter(mts).Inc()
return err
}
func handleCollectedUplink(ctx context.Context, uplinkFrame gw.UplinkFrame, rxPacket models.RXPacket) error {
// Update the gateway meta-data.
// This sets the location information from the database, decrypts the
// fine-timestamp when it is available and retrieves the service-profile
// information of the gateways.
// Note: this is done after de-duplication as a single uplink might be
// received multiple times in case of multiple NS instances and depending
// the MQTT broker (e.g. if it supports consumer groups).
if err := gateway.UpdateMetaDataInRXPacket(ctx, storage.DB(), &rxPacket); err != nil {
return errors.Wrap(err, "update RXPacket meta-data error")
}
// Return if the RXInfoSet is empty.
if len(rxPacket.RXInfoSet) == 0 {
return nil
}
var uplinkIDs []uuid.UUID
for _, p := range rxPacket.RXInfoSet {
uplinkIDs = append(uplinkIDs, helpers.GetUplinkID(p))
}
log.WithFields(log.Fields{
"uplink_ids": uplinkIDs,
"mtype": rxPacket.PHYPayload.MHDR.MType,
"ctx_id": ctx.Value(logging.ContextIDKey),
}).Info("uplink: frame(s) collected")
// Extract MType
var protoMType common.MType
switch rxPacket.PHYPayload.MHDR.MType {
case lorawan.JoinRequest:
protoMType = common.MType_JoinRequest
case lorawan.RejoinRequest:
protoMType = common.MType_RejoinRequest
case lorawan.UnconfirmedDataUp:
protoMType = common.MType_UnconfirmedDataUp
case lorawan.ConfirmedDataUp:
protoMType = common.MType_ConfirmedDataUp
case lorawan.Proprietary:
protoMType = common.MType_Proprietary
}
// Extract DevAddr or DevEUI (if available)
var devAddr []byte
var devEUI []byte
switch v := rxPacket.PHYPayload.MACPayload.(type) {
case *lorawan.MACPayload:
devAddr = v.FHDR.DevAddr[:]
case *lorawan.JoinRequestPayload:
devEUI = v.DevEUI[:]
case *lorawan.RejoinRequestType02Payload:
devEUI = v.DevEUI[:]
case *lorawan.RejoinRequestType1Payload:
devEUI = v.DevEUI[:]
}
// log the frame for each receiving gateway.
if err := framelog.LogUplinkFrameForGateways(ctx, ns.UplinkFrameLog{
PhyPayload: uplinkFrame.PhyPayload,
TxInfo: rxPacket.TXInfo,
RxInfo: rxPacket.RXInfoSet,
MType: protoMType,
DevAddr: devAddr,
DevEui: devEUI,
}); err != nil {
log.WithFields(log.Fields{
"ctx_id": ctx.Value(logging.ContextIDKey),
}).WithError(err).Error("uplink: log uplink frames for gateways error")
}
// handle the frame based on message-type
switch rxPacket.PHYPayload.MHDR.MType {
case lorawan.JoinRequest:
return runHandlerWithMetric(join.Handle(ctx, rxPacket), lorawan.JoinRequest)
case lorawan.RejoinRequest:
return runHandlerWithMetric(rejoin.Handle(ctx, rxPacket), lorawan.RejoinRequest)
case lorawan.UnconfirmedDataUp, lorawan.ConfirmedDataUp:
return runHandlerWithMetric(data.Handle(ctx, rxPacket), lorawan.UnconfirmedDataUp)
case lorawan.Proprietary:
return runHandlerWithMetric(proprietary.Handle(ctx, rxPacket), lorawan.Proprietary)
default:
return nil
}
}