-
Notifications
You must be signed in to change notification settings - Fork 299
/
io.go
268 lines (237 loc) · 10 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright © 2019 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package io
import (
"context"
"fmt"
"go.thethings.network/lorawan-stack/v3/pkg/cluster"
"go.thethings.network/lorawan-stack/v3/pkg/config"
"go.thethings.network/lorawan-stack/v3/pkg/errorcontext"
"go.thethings.network/lorawan-stack/v3/pkg/errors"
"go.thethings.network/lorawan-stack/v3/pkg/httpclient"
"go.thethings.network/lorawan-stack/v3/pkg/log"
"go.thethings.network/lorawan-stack/v3/pkg/ratelimit"
"go.thethings.network/lorawan-stack/v3/pkg/task"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
"google.golang.org/grpc"
)
// DefaultBufferSize is the default size of a subscription uplink buffer.
const DefaultBufferSize = 128
// PubSub represents the Application Server Pub/Sub capabilities to application frontends.
type PubSub interface {
// Publish publishes upstream traffic to the Application Server.
Publish(ctx context.Context, up *ttnpb.ApplicationUp) error
// Subscribe subscribes an application or integration by its identifiers to the Application Server, and returns a
// Subscription for traffic and control. If the cluster parameter is true, the subscription receives all of the
// traffic of the application. Otherwise, only traffic that was processed locally is sent.
Subscribe(ctx context.Context, protocol string, ids *ttnpb.ApplicationIdentifiers, cluster bool) (*Subscription, error)
}
// DownlinkQueueOperator represents the Application Server downlink queue operations to application frontends.
type DownlinkQueueOperator interface {
// DownlinkQueuePush pushes the given downlink messages to the end device's application downlink queue.
DownlinkQueuePush(context.Context, *ttnpb.EndDeviceIdentifiers, []*ttnpb.ApplicationDownlink) error
// DownlinkQueueReplace replaces the end device's application downlink queue with the given downlink messages.
DownlinkQueueReplace(context.Context, *ttnpb.EndDeviceIdentifiers, []*ttnpb.ApplicationDownlink) error
// DownlinkQueueList lists the application downlink queue of the given end device.
DownlinkQueueList(context.Context, *ttnpb.EndDeviceIdentifiers) ([]*ttnpb.ApplicationDownlink, error)
}
// UplinkStorage represents the Application Server uplink storage to application frontends.
type UplinkStorage interface {
// RangeUplinks ranges the application uplinks and calls the callback function, until false is returned.
RangeUplinks(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, paths []string, f func(ctx context.Context, up *ttnpb.ApplicationUplink) bool) error
}
// Cluster represents the Application Server cluster peers to application frontends.
type Cluster interface {
// GetPeers returns peers with the given role.
GetPeers(ctx context.Context, role ttnpb.ClusterRole) ([]cluster.Peer, error)
// GetPeer returns a peer with the given role, and a responsibility for the
// given identifiers. If the identifiers are nil, this function returns a random
// peer from the list that would be returned by GetPeers.
GetPeer(ctx context.Context, role ttnpb.ClusterRole, ids cluster.EntityIdentifiers) (cluster.Peer, error)
// GetPeerConn returns the gRPC client connection of a peer, if the peer is available as
// as per GetPeer.
GetPeerConn(ctx context.Context, role ttnpb.ClusterRole, ids cluster.EntityIdentifiers) (*grpc.ClientConn, error)
}
// EndDeviceRegistry represents the Application Server end device registry to application frontends.
type EndDeviceRegistry interface {
// GetEndDevice retrieves the end device from the Application Server end device registry.
// This call will be delegated to the underlying end device registry, and should not be
// used on the hot path. It exists for provisioning purposes.
GetEndDevice(ctx context.Context, ids *ttnpb.EndDeviceIdentifiers, paths []string) (*ttnpb.EndDevice, error)
}
// Server represents the Application Server to application frontends.
type Server interface {
task.Starter
httpclient.Provider
PubSub
DownlinkQueueOperator
UplinkStorage
Cluster
EndDeviceRegistry
// FromRequestContext decouples the lifetime of the provided context from the values found in the context.
FromRequestContext(context.Context) context.Context
// GetBaseConfig returns the component configuration.
GetBaseConfig(ctx context.Context) config.ServiceBase
// FillContext fills the given context.
// This method should only be used for request contexts.
FillContext(ctx context.Context) context.Context
// RateLimiter returns the rate limiter instance.
RateLimiter() ratelimit.Interface
}
// ContextualApplicationUp represents an ttnpb.ApplicationUp with its context.
type ContextualApplicationUp struct {
context.Context
*ttnpb.ApplicationUp
}
// Subscription is a subscription to an application or integration managed by a frontend.
type Subscription struct {
ctx context.Context
cancelCtx errorcontext.CancelFunc
protocol string
ids *ttnpb.ApplicationIdentifiers
upCh chan *ContextualApplicationUp
publish func(context.Context, context.Context, chan<- *ContextualApplicationUp, *ContextualApplicationUp) error
}
// SubscriptionOption is an option for a Subscription.
type SubscriptionOption interface {
// apply is unexposed in order to ensure that options
// are not applied after the Subscription has been created.
apply(*Subscription)
}
type subscriptionOptionFunc func(s *Subscription)
func (f subscriptionOptionFunc) apply(s *Subscription) { f(s) }
// WithBlocking controls if the Publish call is blocking or not.
func WithBlocking(blocking bool) SubscriptionOption {
return subscriptionOptionFunc(func(s *Subscription) {
if blocking {
s.publish = blockingPublish
} else {
s.publish = nonBlockingPublish
}
})
}
// WithBufferSize controls the size of the subscription buffer.
func WithBufferSize(bufferSize int) SubscriptionOption {
return subscriptionOptionFunc(func(s *Subscription) {
s.upCh = make(chan *ContextualApplicationUp, bufferSize)
})
}
// NewSubscription instantiates a new application or integration subscription.
func NewSubscription(ctx context.Context, protocol string, ids *ttnpb.ApplicationIdentifiers, opts ...SubscriptionOption) *Subscription {
ctx, cancelCtx := errorcontext.New(ctx)
s := &Subscription{
ctx: ctx,
cancelCtx: cancelCtx,
protocol: protocol,
ids: ids,
upCh: make(chan *ContextualApplicationUp, DefaultBufferSize),
publish: nonBlockingPublish,
}
for _, opt := range opts {
opt.apply(s)
}
return s
}
// Context returns the subscription context.
func (s *Subscription) Context() context.Context { return s.ctx }
// Disconnect marks the subscription as disconnected and cancels the context.
func (s *Subscription) Disconnect(err error) {
s.cancelCtx(err)
}
// Protocol returns the protocol used for the subscription, i.e. grpc, mqtt or http.
func (s *Subscription) Protocol() string { return s.protocol }
// ApplicationIDs returns the application identifiers, if the subscription represents any specific.
func (s *Subscription) ApplicationIDs() *ttnpb.ApplicationIdentifiers { return s.ids }
// Publish publishes an upstream message.
func (s *Subscription) Publish(ctx context.Context, up *ttnpb.ApplicationUp) error {
ctxUp := &ContextualApplicationUp{
Context: ctx,
ApplicationUp: up,
}
return s.publish(ctx, s.ctx, s.upCh, ctxUp)
}
func blockingPublish(ctx context.Context, subCtx context.Context, upCh chan<- *ContextualApplicationUp, up *ContextualApplicationUp) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-subCtx.Done():
return subCtx.Err()
case upCh <- up:
return nil
}
}
var errBufferFull = errors.DefineResourceExhausted("buffer_full", "buffer is full")
func nonBlockingPublish(ctx context.Context, subCtx context.Context, upCh chan<- *ContextualApplicationUp, up *ContextualApplicationUp) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-subCtx.Done():
return subCtx.Err()
case upCh <- up:
return nil
default:
return errBufferFull.New()
}
}
// Up returns the upstream channel.
func (s *Subscription) Up() <-chan *ContextualApplicationUp {
return s.upCh
}
// Pipe pipes the output of the Subscription to the provided handler.
func (s *Subscription) Pipe(
ctx context.Context,
ts task.Starter,
name string,
submit func(context.Context, *ttnpb.ApplicationUp) error,
) {
f := func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.ctx.Done():
return s.ctx.Err()
case up := <-s.upCh:
if err := submit(up.Context, up.ApplicationUp); err != nil {
log.FromContext(up.Context).WithError(err).Warn("Failed to submit message")
}
}
}
}
ts.StartTask(&task.Config{
Context: ctx,
ID: fmt.Sprintf("pipe_%v", name),
Func: f,
Restart: task.RestartOnFailure,
Backoff: task.DefaultBackoffConfig,
})
}
// CleanDownlinks returns a copy of the given downlink items with only the fields that can be set by the application.
func CleanDownlinks(items []*ttnpb.ApplicationDownlink) []*ttnpb.ApplicationDownlink {
res := make([]*ttnpb.ApplicationDownlink, 0, len(items))
for _, item := range items {
res = append(res, &ttnpb.ApplicationDownlink{
SessionKeyId: item.SessionKeyId, // SessionKeyID must be set when skipping application payload crypto.
FPort: item.FPort,
FCnt: item.FCnt, // FCnt must be set when skipping application payload crypto.
FrmPayload: item.FrmPayload,
DecodedPayload: item.DecodedPayload,
ClassBC: item.ClassBC,
Priority: item.Priority,
Confirmed: item.Confirmed,
CorrelationIds: item.CorrelationIds,
})
}
return res
}