-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
AuthenticationProviderSasl.java
322 lines (293 loc) · 14.7 KB
/
AuthenticationProviderSasl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_CLIENT_ALLOWED_IDS;
import static org.apache.pulsar.common.sasl.SaslConstants.JAAS_SERVER_SECTION_NAME;
import static org.apache.pulsar.common.sasl.SaslConstants.KINIT_COMMAND;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_ROLE_TOKEN_EXPIRED;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_AUTH_TOKEN;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_HEADER_STATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_CLIENT_INIT;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_COMPLETE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_NEGOTIATE;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
import static org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.security.auth.login.LoginException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.sasl.JAASCredentialsContainer;
import org.apache.pulsar.common.sasl.SaslConstants;
/**
* Authentication Provider for SASL (Simple Authentication and Security Layer).
*
* Note: This provider does not override the default implementation for
* {@link AuthenticationProvider#authenticate(AuthenticationDataSource)}. As the Javadoc for the interface's method
* indicates, this method should only be implemented when using single stage authentication. In the case of this
* provider, the authentication is multi-stage.
*/
@Slf4j
public class AuthenticationProviderSasl implements AuthenticationProvider {
private Pattern allowedIdsPattern;
private Map<String, String> configuration;
private JAASCredentialsContainer jaasCredentialsContainer;
private String loginContextName;
private Cache<Long, AuthenticationState> authStates;
@Override
public void initialize(ServiceConfiguration config) throws IOException {
this.configuration = new HashMap<>();
final String allowedIdsPatternRegExp = config.getSaslJaasClientAllowedIds();
configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
configuration.put(JAAS_SERVER_SECTION_NAME, config.getSaslJaasServerSectionName());
configuration.put(KINIT_COMMAND, config.getKinitCommand());
try {
this.allowedIdsPattern = Pattern.compile(allowedIdsPatternRegExp);
} catch (PatternSyntaxException error) {
log.error("Invalid regular expression for id {}", allowedIdsPatternRegExp, error);
throw new IOException(error);
}
loginContextName = config.getSaslJaasServerSectionName();
if (jaasCredentialsContainer == null) {
log.info("JAAS loginContext is: {}.", loginContextName);
try {
jaasCredentialsContainer = new JAASCredentialsContainer(
loginContextName,
new PulsarSaslServer.SaslServerCallbackHandler(allowedIdsPattern),
configuration);
} catch (LoginException e) {
log.error("JAAS login in broker failed", e);
throw new IOException(e);
}
}
String saslJaasServerRoleTokenSignerSecretPath = config.getSaslJaasServerRoleTokenSignerSecretPath();
byte[] secret = null;
if (StringUtils.isNotBlank(saslJaasServerRoleTokenSignerSecretPath)) {
secret = readSecretFromUrl(saslJaasServerRoleTokenSignerSecretPath);
} else {
String msg = "saslJaasServerRoleTokenSignerSecretPath parameter is empty";
throw new IllegalArgumentException(msg);
}
this.signer = new SaslRoleTokenSigner(secret);
this.authStates = Caffeine.newBuilder()
.maximumSize(config.getMaxInflightSaslContext())
.expireAfterWrite(config.getInflightSaslContextExpiryMs(), TimeUnit.MILLISECONDS).build();
}
@Override
public String getAuthMethodName() {
return SaslConstants.AUTH_METHOD_NAME;
}
@Override
public void close() throws IOException {
if (jaasCredentialsContainer != null) {
jaasCredentialsContainer.close();
jaasCredentialsContainer = null;
}
}
@Override
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) throws AuthenticationException {
try {
PulsarSaslServer server = new PulsarSaslServer(jaasCredentialsContainer.getSubject(), allowedIdsPattern);
return new SaslAuthenticationState(server);
} catch (Throwable t) {
log.error("Failed create sasl auth state", t);
throw new AuthenticationException(t.getMessage());
}
}
// for http auth.
private static final long SASL_ROLE_TOKEN_LIVE_SECONDS = 3600;
// A signer for http role token, with random secret.
private SaslRoleTokenSigner signer;
/**
* Returns null if authentication has not completed.
* Return auth role if authentication has completed, and httpRequest's role token contains the authRole
*/
public String authRoleFromHttpRequest(HttpServletRequest httpRequest) throws AuthenticationException {
String tokenStr = httpRequest.getHeader(SASL_AUTH_ROLE_TOKEN);
if (tokenStr == null) {
return null;
}
String unSigned = signer.verifyAndExtract(tokenStr);
SaslRoleToken token;
try {
token = SaslRoleToken.parse(unSigned);
if (log.isDebugEnabled()) {
log.debug("server side get role token: {}, session in token:{}, session in request:{}",
token, token.getSession(), httpRequest.getRemoteAddr());
}
} catch (Exception e) {
log.error("token parse failed, with exception: ", e);
return SASL_AUTH_ROLE_TOKEN_EXPIRED;
}
if (!token.isExpired()) {
return token.getUserRole();
} else if (token.isExpired()) {
return SASL_AUTH_ROLE_TOKEN_EXPIRED;
} else {
return null;
}
}
private String createAuthRoleToken(String role, String sessionId) {
long expireAtMs = System.currentTimeMillis() + SASL_ROLE_TOKEN_LIVE_SECONDS * 1000; // 1 hour
SaslRoleToken token = new SaslRoleToken(role, sessionId, expireAtMs);
String signed = signer.sign(token.toString());
if (log.isDebugEnabled()) {
log.debug("create role token token: {}, role: {} session :{}, expires:{}\nsigned:{}",
token, token.getUserRole(), token.getSession(), token.getExpires(), signed);
}
return signed;
}
private byte[] readSecretFromUrl(String secretConfUrl) throws IOException {
if (secretConfUrl.startsWith("file:")) {
URI filePath = URI.create(secretConfUrl);
return Files.readAllBytes(Paths.get(filePath));
} else if (Files.exists(Paths.get(secretConfUrl))) {
// Assume the key content was passed in a valid file path
return Files.readAllBytes(Paths.get(secretConfUrl));
} else {
String msg = "Role token signer secret file " + secretConfUrl + " doesn't exist";
throw new IllegalArgumentException(msg);
}
}
// return authState if it is in cache.
private AuthenticationState getAuthState(HttpServletRequest request) {
String id = request.getHeader(SASL_STATE_SERVER);
if (id == null) {
return null;
}
try {
return authStates.getIfPresent(Long.parseLong(id));
} catch (NumberFormatException e) {
log.error("[{}] Wrong Id String in Token {}. e:", request.getRequestURI(),
id, e);
return null;
}
}
private void setResponseHeaderState(HttpServletResponse response, String state) {
response.setHeader(SaslConstants.SASL_HEADER_TYPE, SaslConstants.SASL_TYPE_VALUE);
response.setHeader(SASL_HEADER_STATE, state);
}
/**
* Passed in request, set response, according to request.
* and return whether we should do following chain.doFilter or not.
*/
@Override
public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
AuthenticationState state = getAuthState(request);
String saslAuthRoleToken = authRoleFromHttpRequest(request);
// role token exist
if (saslAuthRoleToken != null) {
// role token expired, send role token expired to client.
if (saslAuthRoleToken.equalsIgnoreCase(SASL_AUTH_ROLE_TOKEN_EXPIRED)) {
setResponseHeaderState(response, SASL_AUTH_ROLE_TOKEN_EXPIRED);
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Role token expired");
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token expired: {}", request.getRequestURI(), saslAuthRoleToken);
}
return false;
}
// role token OK to use,
// if request is ask for role token verify, send auth complete to client
// if request is a real request with valid role token, pass this request down.
if (request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_COMPLETE)) {
request.setAttribute(AuthenticatedRoleAttributeName, saslAuthRoleToken);
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps(request));
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token OK to go on: {}", request.getRequestURI(),
saslAuthRoleToken);
}
return true;
} else {
checkState(request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_SERVER_CHECK_TOKEN));
setResponseHeaderState(response, SASL_STATE_COMPLETE);
response.setHeader(SASL_STATE_SERVER, request.getHeader(SASL_STATE_SERVER));
response.setStatus(HttpServletResponse.SC_OK);
if (log.isDebugEnabled()) {
log.debug("[{}] Server side role token verified success: {}", request.getRequestURI(),
saslAuthRoleToken);
}
return false;
}
} else {
// no role token, do sasl auth
// need new authState
if (state == null || request.getHeader(SASL_HEADER_STATE).equalsIgnoreCase(SASL_STATE_CLIENT_INIT)) {
state = newAuthState(null, null, null);
authStates.put(state.getStateId(), state);
}
checkState(request.getHeader(SASL_AUTH_TOKEN) != null,
"Header token should exist if no role token.");
// do the sasl auth
AuthData clientData = AuthData.of(Base64.getDecoder().decode(
request.getHeader(SASL_AUTH_TOKEN)));
AuthData brokerData = state.authenticate(clientData);
// authentication has completed, it has get the auth role.
if (state.isComplete()) {
if (log.isDebugEnabled()) {
log.debug("[{}] SASL server authentication complete, send OK to client.", request.getRequestURI());
}
String authRole = state.getAuthRole();
String authToken = createAuthRoleToken(authRole, String.valueOf(state.getStateId()));
response.setHeader(SASL_AUTH_ROLE_TOKEN, authToken);
// auth request complete, return OK, wait for a new real request to come.
response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
setResponseHeaderState(response, SASL_STATE_COMPLETE);
response.setStatus(HttpServletResponse.SC_OK);
// auth completed, no need to keep authState
authStates.invalidate(state.getStateId());
return false;
} else {
// auth not complete
if (log.isDebugEnabled()) {
log.debug("[{}] SASL server authentication not complete, send {} back to client.",
request.getRequestURI(), HttpServletResponse.SC_UNAUTHORIZED);
}
setResponseHeaderState(response, SASL_STATE_NEGOTIATE);
response.setHeader(SASL_STATE_SERVER, String.valueOf(state.getStateId()));
response.setHeader(SASL_AUTH_TOKEN, Base64.getEncoder().encodeToString(brokerData.getBytes()));
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "SASL Authentication not complete.");
return false;
}
}
}
}