This repository has been archived by the owner on Aug 18, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
handler.go
51 lines (44 loc) · 1.59 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Copyright © 2017 The Things Network
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
package monitorclient
import (
"context"
"github.com/TheThingsNetwork/go-utils/grpc/streambuffer"
"github.com/TheThingsNetwork/go-utils/grpc/ttnctx"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// HandlerClient creates a new client for Handler monitoring
func (m *MonitorClient) HandlerClient(ctx context.Context, opts ...grpc.CallOption) Stream {
c := new(componentClient)
c.log = m.log.WithField("Component", "Handler")
if md, ok := metadata.FromOutgoingContext(ctx); ok {
if id, err := ttnctx.IDFromMetadata(md); err == nil {
c.log = c.log.WithField("ID", id)
}
}
c.setup = func() {
var sessionCtx context.Context
sessionCtx, c.cancel = context.WithCancel(ctx)
for name, cli := range m.clients {
cli := cli // shadow cli; we're using it in the setup funcs below
uplink := streambuffer.New(m.bufferSize, func() (grpc.ClientStream, error) {
return cli.HandlerUplink(sessionCtx, opts...)
})
c.uplink = append(c.uplink, uplink)
go c.run(name, "Uplink", uplink)
downlink := streambuffer.New(m.bufferSize, func() (grpc.ClientStream, error) {
return cli.HandlerDownlink(sessionCtx, opts...)
})
c.downlink = append(c.downlink, downlink)
go c.run(name, "Downlink", downlink)
status := streambuffer.New(m.bufferSize, func() (grpc.ClientStream, error) {
return cli.HandlerStatus(sessionCtx, opts...)
})
c.status = append(c.status, status)
go c.run(name, "Status", status)
}
}
c.Open()
return c
}