Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@
import org.apache.storm.thrift.protocol.TProtocol;
import org.apache.storm.thrift.server.TServer;
import org.apache.storm.thrift.server.TThreadPoolServer;
import org.apache.storm.thrift.transport.TSSLTransportFactory;
import org.apache.storm.thrift.transport.TServerSocket;
import org.apache.storm.thrift.transport.TSocket;
import org.apache.storm.thrift.transport.TTransport;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.ExtendedThreadPoolExecutor;
import org.apache.storm.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TlsTransportPlugin implements ITransportPlugin {
private static final Logger LOG = LoggerFactory.getLogger(TlsTransportPlugin.class);
private static final String ANONYMOUS_PRINCIPAL_NAME = "CN=ANONYMOUS";
protected ThriftConnectionType type;
protected Map<String, Object> conf;
private int port;
Expand All @@ -71,22 +70,13 @@ public TServer getServer(TProcessor processor) throws IOException, TTransportExc
int configuredPort = type.getPort(conf);
Integer socketTimeout = type.getSocketTimeOut(conf);

TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters();
if (type.getServerKeyStorePath(conf) != null && type.getServerKeyStorePassword(conf) != null) {
params.setKeyStore(type.getServerKeyStorePath(conf), type.getServerKeyStorePassword(conf), null,
SecurityUtils.inferKeyStoreTypeFromPath(type.getServerKeyStorePath(conf)));
} else {
if (type.getServerKeyStorePath(conf) == null || type.getServerKeyStorePassword(conf) == null) {
throw new IllegalArgumentException("The server keystore is not configured properly");
}

if (type.isClientAuthRequired(conf)) {
if (type.getServerTrustStorePath(conf) != null && type.getServerTrustStorePassword(conf) != null) {
params.setTrustStore(type.getServerTrustStorePath(conf), type.getServerTrustStorePassword(conf), null,
SecurityUtils.inferKeyStoreTypeFromPath(type.getServerTrustStorePath(conf)));
params.requireClientAuth(true);
} else {
throw new IllegalArgumentException("The server truststore is not configured properly");
}
if (type.isClientAuthRequired(conf)
&& (type.getServerTrustStorePath(conf) == null || type.getServerTrustStorePassword(conf) == null)) {
throw new IllegalArgumentException("The server truststore is not configured properly");
}

int clientTimeout = (socketTimeout == null ? 0 : socketTimeout);
Expand Down Expand Up @@ -152,15 +142,15 @@ public void process(final TProtocol inProt, final TProtocol outProt) throws TExc
TSocket tsocket = (TSocket) trans;
SSLSocket socket = (SSLSocket) tsocket.getSocket();

String principalName = "CN=ANONYMOUS";
String principalName = ANONYMOUS_PRINCIPAL_NAME;
try {
for (X509Certificate cert: socket.getSession().getPeerCertificateChain()) {
Principal principal = cert.getSubjectDN();
principalName = principal.getName();
break;
}
} catch (SSLPeerUnverifiedException e) {
LOG.debug("Client cert is not verified. Set principalName={}.", principalName, e);
LOG.warn("Client cert is not verified. Set principalName={}.", principalName, e);
}
LOG.debug("principalName : {} ", principalName);
ReqContext reqContext = ReqContext.context();
Expand Down
Loading