-
Notifications
You must be signed in to change notification settings - Fork 241
/
events_client.go
100 lines (87 loc) · 2.63 KB
/
events_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package transport
import (
"context"
"fmt"
"github.com/Azure/azure-container-networking/npm/pkg/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
)
// EventsClient is a client for the DataplaneEvents service
type EventsClient struct {
ctx context.Context
protos.DataplaneEventsClient
pod string
node string
serverAddr string
outCh chan *protos.Events
}
var (
ErrPodNodeNameNil = fmt.Errorf("pod and node name must be set")
ErrAddressNil = fmt.Errorf("address must be set")
)
func NewEventsClient(ctx context.Context, pod, node, addr string) (*EventsClient, error) {
if pod == "" || node == "" {
return nil, ErrPodNodeNameNil
}
if addr == "" {
return nil, ErrAddressNil
}
klog.Infof("Connecting to NPM controller gRPC server at address %s\n", addr)
// TODO Make this secure
// TODO Remove WithBlock option post testing
cc, err := grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to dial %s: %w", addr, err)
}
return &EventsClient{
ctx: ctx,
DataplaneEventsClient: protos.NewDataplaneEventsClient(cc),
pod: pod,
node: node,
serverAddr: addr,
outCh: make(chan *protos.Events),
}, nil
}
func (c *EventsClient) EventsChannel() chan *protos.Events {
return c.outCh
}
func (c *EventsClient) Start(stopCh <-chan struct{}) error {
go c.run(c.ctx, stopCh) //nolint:errcheck // ignore error since this is a go routine
return nil
}
func (c *EventsClient) run(ctx context.Context, stopCh <-chan struct{}) error {
var connectClient protos.DataplaneEvents_ConnectClient
var err error
clientMetadata := &protos.DatapathPodMetadata{
PodName: c.pod,
NodeName: c.node,
}
for {
select {
case <-ctx.Done():
klog.Errorf("recevied done event on context channel: %v", ctx.Err())
return fmt.Errorf("recevied done event on context channel: %w", ctx.Err())
case <-stopCh:
klog.Info("Received message on stop channel. Stopping transport client")
return nil
default:
if connectClient == nil {
klog.Info("Reconnecting to gRPC server controller")
opts := []grpc.CallOption{grpc.WaitForReady(true)}
connectClient, err = c.Connect(ctx, clientMetadata, opts...)
if err != nil {
return fmt.Errorf("failed to connect to dataplane events server: %w", err)
}
}
event, err := connectClient.Recv()
if err != nil {
klog.Errorf("failed to receive event: %v", err)
connectClient = nil
continue
}
klog.Infof("### Received event: %v", event)
c.outCh <- event
}
}
}