-
Notifications
You must be signed in to change notification settings - Fork 241
/
events_server.go
192 lines (165 loc) · 6.02 KB
/
events_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package transport
import (
"context"
"fmt"
"net"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/dpshim"
"github.com/Azure/azure-container-networking/npm/pkg/protos"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/stats"
"k8s.io/klog/v2"
)
// EventsServer contains of the grpc server and the watchdog server
type EventsServer struct {
ctx context.Context
// Server is the gRPC server
Server protos.DataplaneEventsServer
// Watchdog is the watchdog for the gRPC server that implements the
// gRPC stats handler interface
Watchdog stats.Handler
// Registrations is a map of dataplane pod address to their associate connection stream
Registrations map[string]clientStreamConnection
// port is the port the manager is listening on
port int
// inCh is the input channel for the manager
inCh chan *protos.Events
// regCh is the registration channel
regCh chan clientStreamConnection
// deregCh is the deregistration channel
deregCh chan deregistrationEvent
// errCh is the error channel
errCh chan error
// dp has the dataplane instance, helps in hydration calls
dp *dpshim.DPShim
}
// NewEventsServer creates an instance of the EventsServer
func NewEventsServer(ctx context.Context, port int, dp *dpshim.DPShim) *EventsServer {
// Create a registration channel
regCh := make(chan clientStreamConnection, grpcMaxConcurrentStreams)
// Create a deregistration channel
deregCh := make(chan deregistrationEvent, grpcMaxConcurrentStreams)
return &EventsServer{
ctx: ctx,
Server: NewServer(ctx, regCh),
Watchdog: NewWatchdog(deregCh),
Registrations: make(map[string]clientStreamConnection),
port: port,
inCh: dp.OutChannel,
errCh: make(chan error),
deregCh: deregCh,
regCh: regCh,
dp: dp,
}
}
// InputChannel returns the input channel for the manager
func (m *EventsServer) InputChannel() chan *protos.Events {
return m.inCh
}
// Start starts the events manager (grpc server and watchdog)
func (m *EventsServer) Start(stopCh <-chan struct{}) error {
klog.Info("Starting transport manager")
if err := m.start(stopCh); err != nil {
klog.Errorf("Failed to Start transport manager: %v", err)
return err
}
return nil
}
func (m *EventsServer) start(stopCh <-chan struct{}) error {
if err := m.handle(); err != nil {
return fmt.Errorf("failed to start transport manager handlers: %w", err)
}
for {
select {
case client := <-m.regCh:
// (TODO) Hydration is a very expensive event, so we want to make sure
// that pagination is done for large clusters. In case of a daemon restart in a large cluster
// we should be able to hydrate daemon in multiple phases,
// 1. 1st Level IPSets
// 2. Nested IPSets
// 3. Network Policies
// within the same castegory we will have to paginate.
klog.Infof("Registering remote client %s", client)
m.Registrations[client.String()] = client
event, err := m.dp.HydrateClients()
if err != nil {
klog.Errorf("Failed to hydrate client %s: %v", client, err)
}
// (TODO) Hydration event takes a lock of whole DPShim instance, essentially blocking the
// controllers from receiving any more new events or servicing existing daemons.
// So we will need to add a buffering mechanism to wait until either we have a N number of daemons
// or hit S milliseconds of wait time and send huydration event to all the buffered daemons.
go func() {
klog.Infof("Hydrating remote client %s", client)
if err := client.stream.SendMsg(event); err != nil {
klog.Errorf("Failed to hydrate client %s: %v", client, err)
}
}()
case ev := <-m.deregCh:
// (TODO) A heart beat for each daemon should also be added alongside watchdog to monitor
// daemon restarts and then if that fails, we will need to delete the client.
klog.Infof("Degregistering remote client %s", ev.remoteAddr)
if v, ok := m.Registrations[ev.remoteAddr]; ok {
if v.timestamp <= ev.timestamp {
klog.Infof("Deregistering remote client %s", ev.remoteAddr)
delete(m.Registrations, ev.remoteAddr)
} else {
klog.Info("Ignoring stale deregistration event")
}
}
case msg := <-m.inCh:
klog.Infof("######## Received event to broadcast ######")
for clientName, client := range m.Registrations {
// (TODO) Should we call this SendMsg per client in a separate go routine?
klog.Infof("######## Servicing the event to %s ######", clientName)
if err := client.stream.SendMsg(msg); err != nil {
// (TODO) What happens if a portion of the clients fails?
// there should be a mechanism to retry the failed clients.
klog.Errorf("Failed to send message to client %s: %v", client, err)
}
}
case <-m.ctx.Done():
klog.Info("Context Done. Stopping transport manager")
return nil
case err := <-m.errCh:
klog.Errorf("Error in transport manager: %v", err)
return err
case <-stopCh:
klog.Info("Received message on stop channel. Stopping transport manager")
return nil
}
}
}
func (m *EventsServer) handle() error {
klog.Infof("Starting transport manager listener on port %v", m.port)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", m.port))
if err != nil {
return fmt.Errorf("failed to handle server connections: %w", err)
}
// load the server certificates
creds, err := serverTLSCreds()
if err != nil {
return fmt.Errorf("failed to load TLS certificates: %w", err)
}
var opts []grpc.ServerOption = []grpc.ServerOption{
grpc.Creds(creds),
grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
grpc.StatsHandler(m.Watchdog),
}
server := grpc.NewServer(opts...)
protos.RegisterDataplaneEventsServer(
server,
m.Server,
)
// Register reflection service on gRPC server.
// This is useful for debugging and testing with grpcurl and other CLI tools.
reflection.Register(server)
klog.Info("Starting transport manager server")
// Start gRPC Server in background
go func() {
if err := server.Serve(lis); err != nil {
m.errCh <- fmt.Errorf("failed to start gRPC server: %w", err)
}
}()
return nil
}