-
Notifications
You must be signed in to change notification settings - Fork 51
/
envoyauthorizerenforcer.go
352 lines (308 loc) · 11.9 KB
/
envoyauthorizerenforcer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package envoyauthorizer
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"sync"
"time"
"go.aporeto.io/enforcerd/trireme-lib/collector"
"go.aporeto.io/enforcerd/trireme-lib/common"
"go.aporeto.io/enforcerd/trireme-lib/controller/constants"
"go.aporeto.io/enforcerd/trireme-lib/controller/internal/enforcer/applicationproxy/serviceregistry"
enforcerconstants "go.aporeto.io/enforcerd/trireme-lib/controller/internal/enforcer/constants"
"go.aporeto.io/enforcerd/trireme-lib/controller/internal/enforcer/envoyauthorizer/envoyproxy"
"go.aporeto.io/enforcerd/trireme-lib/controller/internal/enforcer/metadata"
"go.aporeto.io/enforcerd/trireme-lib/controller/pkg/ebpf"
"go.aporeto.io/enforcerd/trireme-lib/controller/pkg/fqconfig"
"go.aporeto.io/enforcerd/trireme-lib/controller/pkg/packettracing"
"go.aporeto.io/enforcerd/trireme-lib/controller/pkg/pucontext"
"go.aporeto.io/enforcerd/trireme-lib/controller/pkg/secrets"
"go.aporeto.io/enforcerd/trireme-lib/controller/runtime"
"go.aporeto.io/enforcerd/trireme-lib/policy"
"go.aporeto.io/enforcerd/trireme-lib/utils/cache"
"go.uber.org/zap"
)
// Enforcer implements the Enforcer interface as an envoy authorizer
// and starts envoy external authz filter gRPC servers for enforcement.
type Enforcer struct {
mode constants.ModeType
collector collector.EventCollector
externalIPCacheTimeout time.Duration
secrets secrets.Secrets
tokenIssuer common.ServiceTokenIssuer
puContexts cache.DataStore
clients cache.DataStore
systemCAPool *x509.CertPool
metadata *metadata.Client
sync.RWMutex
}
// envoyAuthzServers, envoy servers used my enforcer
type envoyServers struct {
ingress *envoyproxy.AuthServer
egress *envoyproxy.AuthServer
sds *envoyproxy.SdsServer
}
// NewEnvoyAuthorizerEnforcer creates a new envoy authorizer
func NewEnvoyAuthorizerEnforcer(mode constants.ModeType, eventCollector collector.EventCollector, externalIPCacheTimeout time.Duration, secrets secrets.Secrets, tokenIssuer common.ServiceTokenIssuer) (*Enforcer, error) {
// abort if this is not the right mode
if mode != constants.RemoteContainerEnvoyAuthorizer && mode != constants.LocalEnvoyAuthorizer {
return nil, fmt.Errorf("enforcer mode type must be either RemoteContainerEnvoyAuthorizer or LocalEnvoyAuthorizer, got: %d", mode)
}
zap.L().Info("Creating Envoy Authorizer Enforcer")
// same logic as in the nfqdatapath
if externalIPCacheTimeout <= 0 {
var err error
externalIPCacheTimeout, err = time.ParseDuration(enforcerconstants.DefaultExternalIPTimeout)
if err != nil {
externalIPCacheTimeout = time.Second
}
}
// same logic as in app proxy
systemPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
if ok := systemPool.AppendCertsFromPEM(secrets.CertAuthority()); !ok {
return nil, fmt.Errorf("error while adding provided CA")
}
// TODO: systemPool needs the same treatment as the AppProxy and a `processCertificateUpdates` and `expandCAPool` implementation as well
return &Enforcer{
mode: mode,
collector: eventCollector,
externalIPCacheTimeout: externalIPCacheTimeout,
secrets: secrets,
tokenIssuer: tokenIssuer,
puContexts: cache.NewCache("puContexts"),
clients: cache.NewCache("clients"),
// auth: apiauth.New(puContexts, registry, secrets),
// metadata: metadata.NewClient(puContext, registry, tokenIssuer),
}, nil
}
// Secrets implements the LockedSecrets
func (e *Enforcer) Secrets() (secrets.Secrets, func()) {
e.RLock()
return e.secrets, e.RUnlock
}
// Enforce starts enforcing policies for the given policy.PUInfo.
// here we do the following:
// 1. create a new PU always and instantiate a new apiAuth, as we want to be as stateless as possible.
// 2. create a PUcontext as this will be used in auth code.
// 3. If envoy servers are not present then create all 3 envoy servers.
// 4. If the servers are already present under policy update then update the service certs.
func (e *Enforcer) Enforce(ctx context.Context, contextID string, puInfo *policy.PUInfo) error {
e.Lock()
defer e.Unlock()
zap.L().Debug("Enforce for the envoy for pu", zap.String("puID", contextID))
// here we 1st need to create a PuContext, as the PU context will derive the
// serviceCtxt which will be used by the authorizer to determine the policyInfo.
pu, err := pucontext.NewPU(contextID, puInfo, nil, e.externalIPCacheTimeout)
if err != nil {
return fmt.Errorf("error creating new pu: %s", err)
}
// Add the puContext to the cache as we need to later while serving the requests.
e.puContexts.AddOrUpdate(contextID, pu)
sctx, err := serviceregistry.Instance().Register(contextID, puInfo, pu, e.secrets)
if err != nil {
return fmt.Errorf("policy conflicts detected: %s", err)
}
caPool := e.expandCAPool(sctx.RootCA)
// now instantiate the apiAuth and metadata
// create a new server if it doesn't exist yet
if _, err := e.clients.Get(contextID); err != nil {
zap.L().Debug("creating new auth and sds servers", zap.String("puID", contextID))
ingressServer, err := envoyproxy.NewExtAuthzServer(contextID, e.puContexts, e.collector, envoyproxy.IngressDirection, e.secrets, e.tokenIssuer)
if err != nil {
zap.L().Error("Cannot create and run IngressServer", zap.Error(err))
return err
}
egressServer, err := envoyproxy.NewExtAuthzServer(contextID, e.puContexts, e.collector, envoyproxy.EgressDirection, e.secrets, e.tokenIssuer)
if err != nil {
zap.L().Error("Cannot create and run EgressServer", zap.Error(err))
ingressServer.Stop()
return err
}
sdsServer, err := envoyproxy.NewSdsServer(contextID, puInfo, caPool, e.secrets)
if err != nil {
zap.L().Error("Cannot create and run SdsServer", zap.Error(err))
return err
}
// Add the EnvoyServers to our cache
if err := e.clients.Add(contextID, &envoyServers{ingress: ingressServer, egress: egressServer, sds: sdsServer}); err != nil {
ingressServer.Stop()
egressServer.Stop()
sdsServer.Stop()
return err
}
} else {
// we have this client already, this is only a policy update
zap.L().Debug("handling policy update for envoy servers", zap.String("puID", contextID))
// For updates we need to update the certificates if we have new ones. Otherwise
// we return. There is nothing else to do in case of policy update.
// this required for the Envoy servers.
if c, cerr := e.clients.Get(contextID); cerr == nil {
_, perr := e.processCertificateUpdates(puInfo, c.(*envoyServers), caPool)
if perr != nil {
zap.L().Error("unable to update certificates for services", zap.Error(perr))
return perr
}
return nil
}
}
return nil
}
// processCertificateUpdates processes the certificate information and updates
// the servers.
func (e *Enforcer) processCertificateUpdates(puInfo *policy.PUInfo, server *envoyServers, caPool *x509.CertPool) (bool, error) {
// If there are certificates provided, we will need to update them for the
// services. If the certificates are nil, we ignore them.
certPEM, keyPEM, caPEM := puInfo.Policy.ServiceCertificates()
if certPEM == "" || keyPEM == "" {
return false, nil
}
// Process any updates on the cert pool
if caPEM != "" {
if !caPool.AppendCertsFromPEM([]byte(caPEM)) {
zap.L().Warn("Failed to add Services CA")
}
}
// Create the TLS certificate
tlsCert, err := tls.X509KeyPair([]byte(certPEM), []byte(keyPEM))
if err != nil {
return false, fmt.Errorf("Invalid certificates: %s", err)
}
// Here update the enforcer secrets because we are using the LockedSecrets.
// Also, send a update event to the SDS server so it can send a new cert to the envoy Sidecar.
// // update all the server certs, the Write lock has already been acquired by the Enforce function, so no need to lock again.
server.ingress.UpdateSecrets(&tlsCert, caPool, e.secrets, certPEM, keyPEM)
server.egress.UpdateSecrets(&tlsCert, caPool, e.secrets, certPEM, keyPEM)
server.sds.UpdateSecrets(&tlsCert, caPool, e.secrets, certPEM, keyPEM)
if e.metadata != nil {
e.metadata.UpdateSecrets([]byte(certPEM), []byte(keyPEM))
}
return true, nil
}
func (e *Enforcer) expandCAPool(externalCAs [][]byte) *x509.CertPool {
systemPool, err := x509.SystemCertPool()
if err != nil {
zap.L().Error("cannot process system pool", zap.Error(err))
return e.systemCAPool
}
if ok := systemPool.AppendCertsFromPEM(e.secrets.CertAuthority()); !ok {
zap.L().Error("cannot appen system CA", zap.Error(err))
return e.systemCAPool
}
for _, ca := range externalCAs {
if ok := systemPool.AppendCertsFromPEM(ca); !ok {
zap.L().Error("cannot append external service ca", zap.String("CA", string(ca)))
}
}
return systemPool
}
// Unenforce stops enforcing policy for the given IP.
func (e *Enforcer) Unenforce(ctx context.Context, contextID string) error {
e.Lock()
defer e.Unlock()
// stop the authz servers
rawAuthzServers, err := e.clients.Get(contextID)
if err != nil {
return err
}
server := rawAuthzServers.(*envoyServers)
shutdownCtx, shutdownCtxCancel := context.WithTimeout(ctx, time.Second*10)
defer shutdownCtxCancel()
var wg sync.WaitGroup
shutdownCh := make(chan struct{})
wg.Add(3)
go func() {
server.ingress.GracefulStop()
wg.Done()
}()
go func() {
server.egress.GracefulStop()
wg.Done()
}()
go func() {
server.sds.GracefulStop()
wg.Done()
}()
go func() {
wg.Wait()
shutdownCh <- struct{}{}
}()
select {
case <-shutdownCtx.Done():
zap.L().Warn("Graceful shutdown of envoy server did not finish in time. Shutting down hard now...", zap.String("puID", contextID), zap.Error(shutdownCtx.Err()))
var wg sync.WaitGroup
wg.Add(3)
go func() {
server.ingress.Stop()
wg.Done()
}()
go func() {
server.egress.Stop()
wg.Done()
}()
go func() {
server.sds.Stop()
wg.Done()
}()
wg.Wait()
case <-shutdownCh:
}
if err := e.puContexts.RemoveWithDelay(contextID, 10*time.Second); err != nil {
zap.L().Debug("Unable to remove PU context from cache", zap.String("puID", contextID), zap.Error(err))
}
return nil
}
// UpdateSecrets -- updates the secrets of running enforcers managed by trireme. Remote enforcers will get the secret updates with the next policy push
func (e *Enforcer) UpdateSecrets(secrets secrets.Secrets) error {
e.Lock()
defer e.Unlock()
e.secrets = secrets
return nil
}
// SetTargetNetworks is unimplemented in the envoy authorizer
func (e *Enforcer) SetTargetNetworks(cfg *runtime.Configuration) error {
return nil
}
// SetLogLevel is unimplemented in the envoy authorizer
func (e *Enforcer) SetLogLevel(level constants.LogLevel) error {
return nil
}
// CleanUp is unimplemented in the envoy authorizer
func (e *Enforcer) CleanUp() error {
return nil
}
// Run is unimplemented in the envoy authorizer
func (e *Enforcer) Run(ctx context.Context) error {
return nil
}
// GetBPFObject is unimplemented in the envoy authorizer
func (e *Enforcer) GetBPFObject() ebpf.BPFModule {
return nil
}
// GetServiceMeshType is unimplemented in the envoy authorizer
func (e *Enforcer) GetServiceMeshType() policy.ServiceMesh {
return policy.None
}
// GetFilterQueue is unimplemented in the envoy authorizer
func (e *Enforcer) GetFilterQueue() fqconfig.FilterQueue {
return nil
}
// EnableDatapathPacketTracing is unimplemented in the envoy authorizer
func (e *Enforcer) EnableDatapathPacketTracing(ctx context.Context, contextID string, direction packettracing.TracingDirection, interval time.Duration) error {
return nil
}
// EnableIPTablesPacketTracing is unimplemented in the envoy authorizer
func (e *Enforcer) EnableIPTablesPacketTracing(ctx context.Context, contextID string, interval time.Duration) error {
return nil
}
// Ping is unimplemented in the envoy authorizer
func (e *Enforcer) Ping(ctx context.Context, contextID string, pingConfig *policy.PingConfig) error {
return nil
}
// DebugCollect is unimplemented in the envoy authorizer
func (e *Enforcer) DebugCollect(ctx context.Context, contextID string, debugConfig *policy.DebugConfig) error {
return nil
}