-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
166 lines (145 loc) · 5.19 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package router
import (
"fmt"
"time"
pb_gateway "github.com/TheThingsNetwork/api/gateway"
pb "github.com/TheThingsNetwork/api/router"
"github.com/TheThingsNetwork/api/router/routerclient"
"github.com/TheThingsNetwork/go-account-lib/claims"
"github.com/TheThingsNetwork/go-utils/grpc/ttnctx"
"github.com/TheThingsNetwork/ttn/api/ratelimit"
"github.com/TheThingsNetwork/ttn/core/router/gateway"
"github.com/TheThingsNetwork/ttn/utils/errors"
"github.com/TheThingsNetwork/ttn/utils/random"
"github.com/spf13/viper"
"golang.org/x/net/context" // See https://github.com/grpc/grpc-go/issues/711"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)
type routerRPC struct {
router *router
routerclient.RouterStreamServer
uplinkRate *ratelimit.Registry
statusRate *ratelimit.Registry
}
func (r *routerRPC) gatewayFromMetadata(md metadata.MD) (gtw *gateway.Gateway, err error) {
gatewayID, err := ttnctx.IDFromMetadata(md)
if err != nil {
return nil, err
}
authErr := errors.NewErrPermissionDenied("Gateway not authenticated")
authenticated := false
token, _ := ttnctx.TokenFromMetadata(md)
if token != "" {
if r.router.TokenKeyProvider == nil {
return nil, errors.NewErrInternal("No token provider configured")
}
claims, err := claims.FromGatewayToken(r.router.TokenKeyProvider, token)
if err != nil {
authErr = errors.NewErrPermissionDenied(fmt.Sprintf("Gateway token invalid: %s", err))
} else {
if claims.Subject != gatewayID {
authErr = errors.NewErrPermissionDenied(fmt.Sprintf("Token subject \"%s\" not consistent with gateway ID \"%s\"", claims.Subject, gatewayID))
} else {
authErr = nil
authenticated = true
}
}
}
if authErr != nil && !viper.GetBool("router.skip-verify-gateway-token") {
return nil, authErr
}
gtw = r.router.getGateway(gatewayID)
if authenticated {
gtw.SetAuth(token, authenticated)
}
return gtw, nil
}
func (r *routerRPC) gatewayFromContext(ctx context.Context) (gtw *gateway.Gateway, err error) {
md := ttnctx.MetadataFromIncomingContext(ctx)
return r.gatewayFromMetadata(md)
}
func (r *routerRPC) getUplink(md metadata.MD) (ch chan *pb.UplinkMessage, err error) {
gateway, err := r.gatewayFromMetadata(md)
if err != nil {
return nil, err
}
ch = make(chan *pb.UplinkMessage)
go func() {
for uplink := range ch {
if waitTime := r.uplinkRate.Wait(gateway.ID); waitTime != 0 {
r.router.Ctx.WithField("GatewayID", gateway.ID).WithField("Wait", waitTime).Warn("Gateway reached uplink rate limit")
time.Sleep(waitTime)
}
r.router.HandleUplink(gateway.ID, uplink)
}
}()
return
}
func (r *routerRPC) getGatewayStatus(md metadata.MD) (ch chan *pb_gateway.Status, err error) {
gateway, err := r.gatewayFromMetadata(md)
if err != nil {
return nil, err
}
ch = make(chan *pb_gateway.Status)
go func() {
for status := range ch {
if waitTime := r.statusRate.Wait(gateway.ID); waitTime != 0 {
r.router.Ctx.WithField("GatewayID", gateway.ID).WithField("Wait", waitTime).Warn("Gateway reached status rate limit")
time.Sleep(waitTime)
}
r.router.HandleGatewayStatus(gateway.ID, status)
}
}()
return
}
func (r *routerRPC) getDownlink(md metadata.MD) (ch <-chan *pb.DownlinkMessage, cancel func(), err error) {
gateway, err := r.gatewayFromMetadata(md)
if err != nil {
return nil, nil, err
}
subscriptionID := random.String(10)
ch = make(chan *pb.DownlinkMessage)
cancel = func() {
r.router.UnsubscribeDownlink(gateway.ID, subscriptionID)
}
downlinkChannel, err := r.router.SubscribeDownlink(gateway.ID, subscriptionID)
if err != nil {
return nil, nil, err
}
return downlinkChannel, cancel, nil
}
// Activate implements RouterServer interface (github.com/TheThingsNetwork/api/router)
func (r *routerRPC) Activate(ctx context.Context, req *pb.DeviceActivationRequest) (*pb.DeviceActivationResponse, error) {
gateway, err := r.gatewayFromContext(ctx)
if err != nil {
return nil, err
}
if err := req.Validate(); err != nil {
return nil, errors.Wrap(err, "Invalid Activation Request")
}
if r.uplinkRate.Limit(gateway.ID) {
return nil, grpc.Errorf(codes.ResourceExhausted, "Gateway reached uplink rate limit")
}
return r.router.HandleActivation(gateway.ID, req)
}
// RegisterRPC registers this router as a RouterServer (github.com/TheThingsNetwork/api/router)
func (r *router) RegisterRPC(s *grpc.Server) {
server := &routerRPC{router: r}
server.SetLogger(r.Ctx)
server.UplinkChanFunc = server.getUplink
server.DownlinkChanFunc = server.getDownlink
server.GatewayStatusChanFunc = server.getGatewayStatus
// TODO: Monitor actual rates and configure sensible limits
//
// The current values are based on the following:
// - 20 byte messages on all 6 orthogonal SFs at the same time -> ~1500 msgs/minute
// - 8 channels at 5% utilization: 600 msgs/minute
// - let's double that and round it to 1500/minute
server.uplinkRate = ratelimit.NewRegistry(1500, time.Minute) // includes activations
server.statusRate = ratelimit.NewRegistry(10, time.Minute) // 10 per minute (pkt fwd default is 2 per minute)
pb.RegisterRouterServer(s, server)
}