-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
manager.go
224 lines (185 loc) · 6.03 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package auth
import (
"context"
"fmt"
"net"
"time"
"github.com/sirupsen/logrus"
"github.com/cilium/cilium/pkg/auth/certs"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/maps/authmap"
"github.com/cilium/cilium/pkg/policy"
)
// signalAuthKey used in the signalmap. Must reflect struct auth_key in the datapath
type signalAuthKey authmap.AuthKey
// low-cardinality stringer for metrics
func (key signalAuthKey) String() string {
return policy.AuthType(key.AuthType).String()
}
type authManager struct {
logger logrus.FieldLogger
ipCache ipCache
authHandlers map[policy.AuthType]authHandler
authmap authMap
mutex lock.Mutex
pending map[authKey]struct{}
handleAuthenticationFunc func(a *authManager, k authKey, reAuth bool)
}
// ipCache is the set of interactions the auth manager performs with the IPCache
type ipCache interface {
GetNodeIP(uint16) string
GetNodeID(nodeIP net.IP) (nodeID uint16, exists bool)
}
// authHandler is responsible to handle authentication for a specific auth type
type authHandler interface {
authenticate(*authRequest) (*authResponse, error)
authType() policy.AuthType
subscribeToRotatedIdentities() <-chan certs.CertificateRotationEvent
}
type authRequest struct {
localIdentity identity.NumericIdentity
remoteIdentity identity.NumericIdentity
remoteNodeIP string
}
type authResponse struct {
expirationTime time.Time
}
func newAuthManager(logger logrus.FieldLogger, authHandlers []authHandler, authmap authMap, ipCache ipCache) (*authManager, error) {
ahs := map[policy.AuthType]authHandler{}
for _, ah := range authHandlers {
if ah == nil {
continue
}
if _, ok := ahs[ah.authType()]; ok {
return nil, fmt.Errorf("multiple handlers for auth type: %s", ah.authType())
}
ahs[ah.authType()] = ah
}
return &authManager{
logger: logger,
authHandlers: ahs,
authmap: authmap,
ipCache: ipCache,
pending: make(map[authKey]struct{}),
handleAuthenticationFunc: handleAuthentication,
}, nil
}
// handleAuthRequest receives auth required signals and spawns a new go routine for each authentication request.
func (a *authManager) handleAuthRequest(_ context.Context, key signalAuthKey) error {
k := authKey{
localIdentity: identity.NumericIdentity(key.LocalIdentity),
remoteIdentity: identity.NumericIdentity(key.RemoteIdentity),
remoteNodeID: key.RemoteNodeID,
authType: policy.AuthType(key.AuthType),
}
a.logger.
WithField("key", k).
Debug("Handle authentication request")
a.handleAuthenticationFunc(a, k, false)
return nil
}
func (a *authManager) handleCertificateRotationEvent(_ context.Context, event certs.CertificateRotationEvent) error {
a.logger.
WithField("identity", event.Identity).
Debug("Handle certificate rotation event")
all, err := a.authmap.All()
if err != nil {
return fmt.Errorf("failed to get all auth map entries: %w", err)
}
for k := range all {
if k.localIdentity == event.Identity || k.remoteIdentity == event.Identity {
a.handleAuthenticationFunc(a, k, true)
}
}
return nil
}
func handleAuthentication(a *authManager, k authKey, reAuth bool) {
if a.markPendingAuth(k) {
go func(key authKey) {
defer a.clearPendingAuth(key)
if !reAuth {
// Check if the auth is actually required, as we might have
// updated the authmap since the datapath issued the auth
// required signal.
if i, err := a.authmap.Get(key); err == nil && i.expiration.After(time.Now()) {
a.logger.
WithField("key", key).
Debug("Already authenticated, skipping authentication")
return
}
}
if err := a.authenticate(key); err != nil {
a.logger.
WithError(err).
WithField("key", key).
Warning("Failed to authenticate request")
}
}(k)
}
}
// markPendingAuth checks if there is a pending authentication for the given key.
// If an auth is already pending returns false, otherwise marks the key as pending
// and returns true.
func (a *authManager) markPendingAuth(key authKey) bool {
a.mutex.Lock()
defer a.mutex.Unlock()
if _, exists := a.pending[key]; exists {
// Auth for this key is already pending
return false
}
a.pending[key] = struct{}{}
return true
}
// clearPendingAuth marks the pending authentication as finished.
func (a *authManager) clearPendingAuth(key authKey) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.pending, key)
}
func (a *authManager) authenticate(key authKey) error {
a.logger.
WithField("auth_type", key.authType).
WithField("local_identity", key.localIdentity).
WithField("remote_identity", key.remoteIdentity).
Debug("Policy is requiring authentication")
// Authenticate according to the requested auth type
h, ok := a.authHandlers[key.authType]
if !ok {
return fmt.Errorf("unknown requested auth type: %s", key.authType)
}
nodeIP := a.ipCache.GetNodeIP(key.remoteNodeID)
if nodeIP == "" {
return fmt.Errorf("remote node IP not available for node ID %d", key.remoteNodeID)
}
authReq := &authRequest{
localIdentity: key.localIdentity,
remoteIdentity: key.remoteIdentity,
remoteNodeIP: nodeIP,
}
authResp, err := h.authenticate(authReq)
if err != nil {
return fmt.Errorf("failed to authenticate with auth type %s: %w", key.authType, err)
}
if err = a.updateAuthMap(key, authResp.expirationTime); err != nil {
return fmt.Errorf("failed to update BPF map in datapath: %w", err)
}
a.logger.
WithField("auth_type", key.authType).
WithField("local_identity", key.localIdentity).
WithField("remote_identity", key.remoteIdentity).
WithField("remote_node_ip", nodeIP).
Debug("Successfully authenticated")
return nil
}
func (a *authManager) updateAuthMap(key authKey, expirationTime time.Time) error {
val := authInfo{
expiration: expirationTime,
}
if err := a.authmap.Update(key, val); err != nil {
return fmt.Errorf("failed to write auth information to BPF map: %w", err)
}
return nil
}