/
cell.go
182 lines (148 loc) · 6.59 KB
/
cell.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package auth
import (
"fmt"
"runtime/pprof"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/cilium/cilium/pkg/auth/spire"
"github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/hive/job"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/maps/authmap"
nodeManager "github.com/cilium/cilium/pkg/node/manager"
"github.com/cilium/cilium/pkg/policy"
"github.com/cilium/cilium/pkg/signal"
"github.com/cilium/cilium/pkg/stream"
"github.com/cilium/cilium/pkg/time"
)
// Cell provides AuthManager which is responsible for request authentication.
// It does this by registering to "auth required" signals from the signal package
// and reacting upon received signal events.
// Actual authentication gets performed by an auth handler which is
// responsible for the configured auth type on the corresponding policy.
var Cell = cell.Module(
"auth",
"Authenticates requests as demanded by policy",
spire.Cell,
// The auth manager is the main entry point which gets registered to signal map and receives auth requests.
// In addition, it handles re-authentication and auth map garbage collection.
cell.Provide(registerAuthManager),
cell.ProvidePrivate(
// Null auth handler provides support for auth type "null" - which always succeeds.
newMutualAuthHandler,
// Always fail auth handler provides support for auth type "always-fail" - which always fails.
newAlwaysFailAuthHandler,
),
cell.Config(config{
MeshAuthEnabled: true,
MeshAuthQueueSize: 1024,
MeshAuthGCInterval: 5 * time.Minute,
MeshAuthSignalBackoffDuration: 1 * time.Second, // this default is based on the default TCP retransmission timeout
}),
cell.Config(MutualAuthConfig{}),
)
type config struct {
MeshAuthEnabled bool
MeshAuthQueueSize int
MeshAuthGCInterval time.Duration
MeshAuthSignalBackoffDuration time.Duration
}
func (r config) Flags(flags *pflag.FlagSet) {
flags.Bool("mesh-auth-enabled", r.MeshAuthEnabled, "Enable authentication processing & garbage collection (beta)")
flags.Int("mesh-auth-queue-size", r.MeshAuthQueueSize, "Queue size for the auth manager")
flags.Duration("mesh-auth-gc-interval", r.MeshAuthGCInterval, "Interval in which auth entries are attempted to be garbage collected")
flags.Duration("mesh-auth-signal-backoff-duration", r.MeshAuthSignalBackoffDuration, "Time to wait betweeen two authentication required signals in case of a cache mismatch")
flags.MarkHidden("mesh-auth-signal-backoff-duration")
}
type authManagerParams struct {
cell.In
Logger logrus.FieldLogger
Lifecycle cell.Lifecycle
JobRegistry job.Registry
Scope cell.Scope
Config config
AuthMap authmap.Map
AuthHandlers []authHandler `group:"authHandlers"`
SignalManager signal.SignalManager
NodeIDHandler types.NodeIDHandler
IdentityChanges stream.Observable[cache.IdentityChange]
NodeManager nodeManager.NodeManager
EndpointManager endpointmanager.EndpointManager
PolicyRepo *policy.Repository
}
func registerAuthManager(params authManagerParams) (*AuthManager, error) {
if !params.Config.MeshAuthEnabled {
params.Logger.Info("Authentication processing is disabled")
return nil, nil
}
// Instantiate & wire auth components
mapWriter := newAuthMapWriter(params.Logger, params.AuthMap)
mapCache := newAuthMapCache(params.Logger, mapWriter)
mgr, err := newAuthManager(params.Logger, params.AuthHandlers, mapCache, params.NodeIDHandler, params.Config.MeshAuthSignalBackoffDuration)
if err != nil {
return nil, fmt.Errorf("failed to create auth manager: %w", err)
}
mapGC := newAuthMapGC(params.Logger, mapCache, params.NodeIDHandler, params.PolicyRepo)
// Register auth components to lifecycle hooks & jobs
params.Lifecycle.Append(cell.Hook{
OnStart: func(hookContext cell.HookContext) error {
if err := mapCache.restoreCache(); err != nil {
return fmt.Errorf("failed to restore auth map cache: %w", err)
}
return nil
},
})
jobGroup := params.JobRegistry.NewGroup(
params.Scope,
job.WithLogger(params.Logger),
job.WithPprofLabels(pprof.Labels("cell", "auth")),
)
if err := registerSignalAuthenticationJob(jobGroup, mgr, params.SignalManager, params.Config); err != nil {
return nil, fmt.Errorf("failed to register signal authentication job: %w", err)
}
registerReAuthenticationJob(jobGroup, mgr, params.AuthHandlers)
registerGCJobs(jobGroup, params.Lifecycle, mapGC, params.Config, params.NodeManager, params.EndpointManager, params.IdentityChanges)
params.Lifecycle.Append(jobGroup)
return mgr, nil
}
func registerReAuthenticationJob(jobGroup job.Group, mgr *AuthManager, authHandlers []authHandler) {
for _, ah := range authHandlers {
if ah != nil && ah.subscribeToRotatedIdentities() != nil {
jobGroup.Add(job.Observer("auth re-authentication", mgr.handleCertificateRotationEvent, stream.FromChannel(ah.subscribeToRotatedIdentities())))
}
}
}
func registerSignalAuthenticationJob(jobGroup job.Group, mgr *AuthManager, sm signal.SignalManager, config config) error {
var signalChannel = make(chan signalAuthKey, config.MeshAuthQueueSize)
// RegisterHandler registers signalChannel with SignalManager, but flow of events
// starts later during the OnStart hook of the SignalManager
if err := sm.RegisterHandler(signal.ChannelHandler(signalChannel), signal.SignalAuthRequired); err != nil {
return fmt.Errorf("failed to set up signal channel for datapath authentication required events: %w", err)
}
jobGroup.Add(job.Observer("auth request-authentication", mgr.handleAuthRequest, stream.FromChannel(signalChannel)))
return nil
}
func registerGCJobs(jobGroup job.Group, lifecycle cell.Lifecycle, mapGC *authMapGarbageCollector, cfg config, nodeManager nodeManager.NodeManager, endpointManager endpointmanager.EndpointManager, identityChanges stream.Observable[cache.IdentityChange]) {
lifecycle.Append(cell.Hook{
OnStart: func(hookContext cell.HookContext) error {
mapGC.subscribeToNodeEvents(nodeManager)
mapGC.subscribeToEndpointEvents(endpointManager)
return nil
},
OnStop: func(hookContext cell.HookContext) error {
nodeManager.Unsubscribe(mapGC)
endpointManager.Unsubscribe(mapGC)
return nil
},
})
jobGroup.Add(job.Observer("auth gc-identity-events", mapGC.handleIdentityChange, identityChanges))
jobGroup.Add(job.Timer("auth gc-cleanup", mapGC.cleanup, cfg.MeshAuthGCInterval))
}
type authHandlerResult struct {
cell.Out
AuthHandler authHandler `group:"authHandlers"`
}