-
Notifications
You must be signed in to change notification settings - Fork 310
/
grpc_nsgs.go
101 lines (93 loc) · 4.08 KB
/
grpc_nsgs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gatewayserver
import (
"context"
"fmt"
"github.com/mohae/deepcopy"
clusterauth "go.thethings.network/lorawan-stack/v3/pkg/auth/cluster"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/events"
"go.thethings.network/lorawan-stack/v3/pkg/gatewayserver/io"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"go.thethings.network/lorawan-stack/v3/pkg/unique"
)
var (
errNotTxRequest = errors.DefineInvalidArgument("not_tx_request", "downlink message is not a Tx request")
errSchedulePath = errors.Define("schedule_path", "failed to schedule on path `{gateway_uid}`")
errSchedule = errors.DefineAborted("schedule", "failed to schedule")
errUplinkToken = errors.DefineCorruption("uplink_token", "uplink token is not generated by this server")
)
// ScheduleDownlink instructs the Gateway Server to schedule a downlink message request.
// This method returns an error if the downlink path cannot be found, if the requested parameters are invalid for the
// gateway's frequency plan or if there is no transmission window available because of scheduling conflicts or regional
// limitations such as duty-cycle and dwell time.
func (gs *GatewayServer) ScheduleDownlink(ctx context.Context, down *ttnpb.DownlinkMessage) (*ttnpb.ScheduleDownlinkResponse, error) {
if err := clusterauth.Authorized(ctx); err != nil {
return nil, err
}
request := down.GetRequest()
if request == nil {
return nil, errNotTxRequest.New()
}
var pathErrs []errors.ErrorDetails
logger := log.FromContext(ctx)
for _, path := range request.DownlinkPaths {
var ids ttnpb.GatewayIdentifiers
switch p := path.Path.(type) {
case *ttnpb.DownlinkPath_Fixed:
ids = p.Fixed.GatewayIdentifiers
case *ttnpb.DownlinkPath_UplinkToken:
token, err := io.ParseUplinkToken(p.UplinkToken)
if err != nil {
pathErrs = append(pathErrs, errUplinkToken.New()) // Hide the cause as uplink tokens are opaque to the Network Server.
continue
}
ids = token.GatewayIdentifiers
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", path.Path))
}
uid := unique.ID(ctx, ids)
conn, ok := gs.GetConnection(ctx, ids)
if !ok {
pathErrs = append(pathErrs, errNotConnected.WithAttributes("gateway_uid", uid))
continue
}
connDown := deepcopy.Copy(down).(*ttnpb.DownlinkMessage) // Let the connection own the DownlinkMessage.
connDown.GetRequest().DownlinkPaths = nil // And do not leak the downlink paths to the gateway.
rx1, rx2, delay, err := conn.ScheduleDown(path, connDown)
if err != nil {
logger.WithField("gateway_uid", uid).WithError(err).Debug("Failed to schedule on path")
pathErrs = append(pathErrs, errSchedulePath.WithCause(err).WithAttributes("gateway_uid", uid))
continue
}
ctx = events.ContextWithCorrelationID(ctx, events.CorrelationIDsFromContext(conn.Context())...)
down.CorrelationIDs = append(down.CorrelationIDs, events.CorrelationIDsFromContext(ctx)...)
registerSendDownlink(ctx, conn.Gateway(), down, conn.Frontend().Protocol())
return &ttnpb.ScheduleDownlinkResponse{
Delay: delay,
DownlinkPath: path,
Rx1: rx1,
Rx2: rx2,
}, nil
}
protoErrs := make([]*ttnpb.ErrorDetails, 0, len(pathErrs))
for _, pathErr := range pathErrs {
protoErrs = append(protoErrs, ttnpb.ErrorDetailsToProto(pathErr))
}
return nil, errSchedule.WithDetails(&ttnpb.ScheduleDownlinkErrorDetails{
PathErrors: protoErrs,
})
}