/
SslEngineBuilder.java
314 lines (280 loc) · 13 KB
/
SslEngineBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.ssl;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManagerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class SslEngineBuilder {
private static final Logger log = LoggerFactory.getLogger(SslEngineBuilder.class);
private final Map<String, ?> configs;
private final String protocol;
private final String provider;
private final String kmfAlgorithm;
private final String tmfAlgorithm;
private final SecurityStore keystore;
private final SecurityStore truststore;
private final String[] cipherSuites;
private final String[] enabledProtocols;
private final SecureRandom secureRandomImplementation;
private final SSLContext sslContext;
private final SslClientAuth sslClientAuth;
@SuppressWarnings("unchecked")
SslEngineBuilder(Map<String, ?> configs) {
this.configs = Collections.unmodifiableMap(configs);
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) {
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
} else {
this.cipherSuites = null;
}
List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
if (enabledProtocolsList != null && !enabledProtocolsList.isEmpty()) {
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
} else {
this.enabledProtocols = null;
}
this.secureRandomImplementation = createSecureRandom((String)
configs.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG));
this.sslClientAuth = createSslClientAuth((String) configs.get(
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
this.keystore = createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
(Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
(Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
this.truststore = createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
(Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
this.sslContext = createSSLContext();
}
private static SslClientAuth createSslClientAuth(String key) {
SslClientAuth auth = SslClientAuth.forConfig(key);
if (auth != null) {
return auth;
}
log.warn("Unrecognized client authentication configuration {}. Falling " +
"back to NONE. Recognized client authentication configurations are {}.",
key, String.join(", ", SslClientAuth.VALUES.stream().
map(a -> a.name()).collect(Collectors.toList())));
return SslClientAuth.NONE;
}
private static SecureRandom createSecureRandom(String key) {
if (key == null) {
return null;
}
try {
return SecureRandom.getInstance(key);
} catch (GeneralSecurityException e) {
throw new KafkaException(e);
}
}
private SSLContext createSSLContext() {
try {
SSLContext sslContext;
if (provider != null)
sslContext = SSLContext.getInstance(protocol, provider);
else
sslContext = SSLContext.getInstance(protocol);
KeyManager[] keyManagers = null;
if (keystore != null || kmfAlgorithm != null) {
String kmfAlgorithm = this.kmfAlgorithm != null ?
this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
if (keystore != null) {
KeyStore ks = keystore.load();
Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
kmf.init(ks, keyPassword.value().toCharArray());
} else {
kmf.init(null, null);
}
keyManagers = kmf.getKeyManagers();
}
String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
KeyStore ts = truststore == null ? null : truststore.load();
tmf.init(ts);
sslContext.init(keyManagers, tmf.getTrustManagers(), this.secureRandomImplementation);
log.debug("Created SSL context with keystore {}, truststore {}", keystore, truststore);
return sslContext;
} catch (Exception e) {
throw new KafkaException(e);
}
}
private static SecurityStore createKeystore(String type, String path, Password password, Password keyPassword) {
if (path == null && password != null) {
throw new KafkaException("SSL key store is not specified, but key store password is specified.");
} else if (path != null && password == null) {
throw new KafkaException("SSL key store is specified, but key store password is not specified.");
} else if (path != null && password != null) {
return new SecurityStore(type, path, password, keyPassword);
} else
return null; // path == null, clients may use this path with brokers that don't require client auth
}
private static SecurityStore createTruststore(String type, String path, Password password) {
if (path == null && password != null) {
throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
} else if (path != null) {
return new SecurityStore(type, path, password, null);
} else
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> configs() {
return (Map<String, Object>) configs;
}
public SecurityStore keystore() {
return keystore;
}
public SecurityStore truststore() {
return truststore;
}
/**
* Create a new SSLEngine object.
*
* @param mode Whether to use client or server mode.
* @param peerHost The peer host to use. This is used in client mode if endpoint validation is enabled.
* @param peerPort The peer port to use. This is a hint and not used for validation.
* @param endpointIdentification Endpoint identification algorithm for client mode.
* @return The new SSLEngine.
*/
public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
if (mode == Mode.SERVER) {
sslEngine.setUseClientMode(false);
switch (sslClientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case REQUESTED:
sslEngine.setWantClientAuth(true);
break;
case NONE:
break;
}
sslEngine.setUseClientMode(false);
} else {
sslEngine.setUseClientMode(true);
SSLParameters sslParams = sslEngine.getSSLParameters();
// SSLParameters#setEndpointIdentificationAlgorithm enables endpoint validation
// only in client mode. Hence, validation is enabled only for clients.
sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
sslEngine.setSSLParameters(sslParams);
}
return sslEngine;
}
public SSLContext sslContext() {
return sslContext;
}
/**
* Returns true if this SslEngineBuilder needs to be rebuilt.
*
* @param nextConfigs The configuration we want to use.
* @return True only if this builder should be rebuilt.
*/
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
if (!nextConfigs.equals(configs)) {
return true;
}
if (truststore != null && truststore.modified()) {
return true;
}
if (keystore != null && keystore.modified()) {
return true;
}
return false;
}
// package access for testing
static class SecurityStore {
private final String type;
private final String path;
private final Password password;
private final Password keyPassword;
private final Long fileLastModifiedMs;
SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
this.type = type;
this.path = path;
this.password = password;
this.keyPassword = keyPassword;
fileLastModifiedMs = lastModifiedMs(path);
}
/**
* Loads this keystore
* @return the keystore
* @throws KafkaException if the file could not be read or if the keystore could not be loaded
* using the specified configs (e.g. if the password or keystore type is invalid)
*/
KeyStore load() {
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}
}
private Long lastModifiedMs(String path) {
try {
return Files.getLastModifiedTime(Paths.get(path)).toMillis();
} catch (IOException e) {
log.error("Modification time of key store could not be obtained: " + path, e);
return null;
}
}
boolean modified() {
Long modifiedMs = lastModifiedMs(path);
return modifiedMs != null && !Objects.equals(modifiedMs, this.fileLastModifiedMs);
}
@Override
public String toString() {
return "SecurityStore(" +
"path=" + path +
", modificationTime=" + (fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)) + ")";
}
}
}