Skip to content

Commit

Permalink
ZOOKEEPER-3172: Quorum TLS - fix port unification to allow rolling up…
Browse files Browse the repository at this point in the history
…grades
  • Loading branch information
ivmaykov committed Oct 25, 2018
1 parent b8b687a commit 119baf7
Show file tree
Hide file tree
Showing 12 changed files with 1,394 additions and 145 deletions.
Expand Up @@ -34,6 +34,8 @@
import javax.net.ssl.X509ExtendedTrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -82,6 +84,8 @@ public abstract class X509Util {
"TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256"
};

public static final int DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS = 5000;

/**
* This enum represents the file type of a KeyStore or TrustStore. Currently, JKS (java keystore) and PEM types
* are supported.
Expand Down Expand Up @@ -135,6 +139,7 @@ public static StoreFileType fromPropertyValue(String prop) {
private String sslHostnameVerificationEnabledProperty = getConfigPrefix() + "hostnameVerification";
private String sslCrlEnabledProperty = getConfigPrefix() + "crl";
private String sslOcspEnabledProperty = getConfigPrefix() + "ocsp";
private String sslHandshakeDetectionTimeoutMillisProperty = getConfigPrefix() + "handshakeDetectionTimeoutMillis";

private String[] cipherSuites;

Expand Down Expand Up @@ -196,6 +201,16 @@ public String getSslOcspEnabledProperty() {
return sslOcspEnabledProperty;
}

/**
* Returns the config property key that controls the amount of time, in milliseconds, that the first
* UnifiedServerSocket read operation will block for when trying to detect the client mode (TLS or PLAINTEXT).
*
* @return the config property key.
*/
public String getSslHandshakeDetectionTimeoutMillisProperty() {
return sslHandshakeDetectionTimeoutMillisProperty;
}

public SSLContext getDefaultSSLContext() throws X509Exception.SSLContextException {
SSLContext result = defaultSSLContext.get();
if (result == null) {
Expand All @@ -218,6 +233,31 @@ private SSLContext createSSLContext() throws SSLContextException {
return createSSLContext(config);
}

/**
* Returns the max amount of time, in milliseconds, that the first UnifiedServerSocket read() operation should
* block for when trying to detect the client mode (TLS or PLAINTEXT).
* Defaults to {@link X509Util#DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS}.
*
* @return the handshake detection timeout, in milliseconds.
*/
public int getSslHandshakeTimeoutMillis() {
String propertyString = System.getProperty(getSslHandshakeDetectionTimeoutMillisProperty());
int result;
if (propertyString == null) {
result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
} else {
result = Integer.parseInt(propertyString);
if (result < 1) {
// Timeout of 0 is not allowed, since an infinite timeout can permanently lock up an
// accept() thread.
LOG.warn("Invalid value for " + getSslHandshakeDetectionTimeoutMillisProperty() + ": " + result +
", using the default value of " + DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS);
result = DEFAULT_HANDSHAKE_DETECTION_TIMEOUT_MILLIS;
}
}
return result;
}

public SSLContext createSSLContext(ZKConfig config) throws SSLContextException {
KeyManager[] keyManagers = null;
TrustManager[] trustManagers = null;
Expand Down Expand Up @@ -427,14 +467,22 @@ public static X509TrustManager createTrustManager(String trustStoreLocation, Str
public SSLSocket createSSLSocket() throws X509Exception, IOException {
SSLSocket sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket();
configureSSLSocket(sslSocket);

sslSocket.setUseClientMode(true);
return sslSocket;
}

public SSLSocket createSSLSocket(Socket socket) throws X509Exception, IOException {
SSLSocket sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(socket, null, socket.getPort(), true);
public SSLSocket createSSLSocket(Socket socket, byte[] pushbackBytes) throws X509Exception, IOException {
SSLSocket sslSocket;
if (pushbackBytes != null && pushbackBytes.length > 0) {
sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(
socket, new ByteArrayInputStream(pushbackBytes), true);
} else {
sslSocket = (SSLSocket) getDefaultSSLContext().getSocketFactory().createSocket(
socket, null, socket.getPort(), true);
}
configureSSLSocket(sslSocket);

sslSocket.setUseClientMode(false);
sslSocket.setNeedClientAuth(true);
return sslSocket;
}

Expand Down
Expand Up @@ -130,6 +130,8 @@ private void putSSLProperties(X509Util x509Util) {
System.getProperty(x509Util.getSslCrlEnabledProperty()));
properties.put(x509Util.getSslOcspEnabledProperty(),
System.getProperty(x509Util.getSslOcspEnabledProperty()));
properties.put(x509Util.getSslHandshakeDetectionTimeoutMillisProperty(),
System.getProperty(x509Util.getSslHandshakeDetectionTimeoutMillisProperty()));
}

/**
Expand Down
Expand Up @@ -42,7 +42,6 @@
import javax.security.sasl.SaslException;

import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.FinalRequestProcessor;
Expand Down Expand Up @@ -234,21 +233,16 @@ public boolean isQuorumSynced(QuorumVerifier qv) {

private final ServerSocket ss;

Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException, X509Exception {
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
try {
if (self.shouldUsePortUnification()) {
if (self.shouldUsePortUnification() || self.isSslQuorum()) {
boolean allowInsecureConnection = self.shouldUsePortUnification();
if (self.getQuorumListenOnAllIPs()) {
ss = new UnifiedServerSocket(new QuorumX509Util(), self.getQuorumAddress().getPort());
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection, self.getQuorumAddress().getPort());
} else {
ss = new UnifiedServerSocket(new QuorumX509Util());
}
} else if (self.isSslQuorum()) {
if (self.getQuorumListenOnAllIPs()) {
ss = new QuorumX509Util().createSSLServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new QuorumX509Util().createSSLServerSocket();
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
}
} else {
if (self.getQuorumListenOnAllIPs()) {
Expand All @@ -261,9 +255,6 @@ public boolean isQuorumSynced(QuorumVerifier qv) {
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (X509Exception e) {
LOG.error("Failed to setup ssl server socket", e);
throw e;
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
Expand Down Expand Up @@ -399,8 +390,10 @@ public LearnerCnxAcceptor() {
public void run() {
try {
while (!stop) {
try{
Socket s = ss.accept();
Socket s = null;
boolean error = false;
try {
s = ss.accept();

// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
Expand All @@ -412,6 +405,7 @@ public void run() {
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop) {
LOG.info("exception while shutting down acceptor: "
+ e);
Expand All @@ -425,6 +419,19 @@ public void run() {
}
} catch (SaslException e){
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && s != null && !s.isClosed()) {
try {
s.close();
} catch (IOException e) {
LOG.warn("Error closing socket", e);
}
}
}
}
} catch (Exception e) {
Expand Down
Expand Up @@ -38,9 +38,7 @@
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ExitCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,8 +72,6 @@ static class PacketInFlight {

protected Socket sock;

protected X509Util x509Util;

/**
* Socket getter
* @return
Expand Down Expand Up @@ -304,10 +300,7 @@ protected void connectToLeader(InetSocketAddress addr, String hostname)
private Socket createSocket() throws X509Exception, IOException {
Socket sock;
if (self.isSslQuorum()) {
if (x509Util == null) {
x509Util = new QuorumX509Util();
}
sock = x509Util.createSSLSocket();
sock = self.getX509Util().createSSLSocket();
} else {
sock = new Socket();
}
Expand Down
Expand Up @@ -18,32 +18,47 @@

package org.apache.zookeeper.server.quorum;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.io.PushbackInputStream;
import java.net.Socket;
import java.net.SocketImpl;

public class PrependableSocket extends Socket {

private SequenceInputStream sequenceInputStream;
private PushbackInputStream pushbackInputStream;

public PrependableSocket(SocketImpl base) throws IOException {
super(base);
}

@Override
public InputStream getInputStream() throws IOException {
if (sequenceInputStream == null) {
if (pushbackInputStream == null) {
return super.getInputStream();
}

return sequenceInputStream;
return pushbackInputStream;
}

public void prependToInputStream(byte[] bytes) throws IOException {
sequenceInputStream = new SequenceInputStream(new ByteArrayInputStream(bytes), getInputStream());
/**
* Prepend some bytes that have already been read back to the socket's input stream. Note that this method can be
* called at most once with a non-0 length per socket instance.
* @param bytes the bytes to prepend.
* @param offset offset in the byte array to start at.
* @param length number of bytes to prepend.
* @throws IOException if this method was already called on the socket instance, or if super.getInputStream() throws.
*/
public void prependToInputStream(byte[] bytes, int offset, int length) throws IOException {
if (length == 0) {
return; // nothing to prepend
}
if (pushbackInputStream != null) {
throw new IOException("prependToInputStream() called more than once");
}
PushbackInputStream pushbackInputStream = new PushbackInputStream(getInputStream(), length);
pushbackInputStream.unread(bytes, offset, length);
this.pushbackInputStream = pushbackInputStream;
}

}
Expand Up @@ -47,9 +47,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.util.ConfigUtils;
Expand Down Expand Up @@ -175,9 +173,6 @@ public class QuorumCnxManager {
*/
private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");


private X509Util x509Util;

static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
Expand Down Expand Up @@ -291,8 +286,6 @@ public QuorumCnxManager(QuorumPeer self,
// Starts listener thread that waits for connection requests
listener = new Listener();
listener.setName("QuorumPeerListener");

x509Util = new QuorumX509Util();
}

private void initializeAuth(final long mySid,
Expand Down Expand Up @@ -655,17 +648,18 @@ synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr
try {
LOG.debug("Opening channel to server " + sid);
if (self.isSslQuorum()) {
SSLSocket sslSock = x509Util.createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
} else {
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);
}
LOG.debug("Connected to server " + sid);
SSLSocket sslSock = self.getX509Util().createSSLSocket();
setSockOpts(sslSock);
sslSock.connect(electionAddr, cnxTO);
sslSock.startHandshake();
sock = sslSock;
} else {
sock = new Socket();
setSockOpts(sock);
sock.connect(electionAddr, cnxTO);

}
LOG.debug("Connected to server " + sid);
// Sends connection request asynchronously if the quorum
// sasl authentication is enabled. This is required because
// sasl server authentication process may take few seconds to
Expand Down Expand Up @@ -876,9 +870,9 @@ public void run() {
while((!shutdown) && (numRetries < 3)){
try {
if (self.shouldUsePortUnification()) {
ss = new UnifiedServerSocket(x509Util);
ss = new UnifiedServerSocket(self.getX509Util(), true);
} else if (self.isSslQuorum()) {
ss = x509Util.createSSLServerSocket();
ss = new UnifiedServerSocket(self.getX509Util(), false);
} else {
ss = new ServerSocket();
}
Expand Down Expand Up @@ -920,7 +914,7 @@ public void run() {
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException|X509Exception e) {
} catch (IOException e) {
if (shutdown) {
break;
}
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.zookeeper.KeeperException.BadArgumentsException;
import org.apache.zookeeper.common.AtomicFileWritingIdiom;
import org.apache.zookeeper.common.AtomicFileWritingIdiom.WriterStatement;
import org.apache.zookeeper.common.QuorumX509Util;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.jmx.MBeanRegistry;
Expand Down Expand Up @@ -479,6 +480,12 @@ public boolean shouldUsePortUnification() {
return shouldUsePortUnification;
}

private final QuorumX509Util x509Util;

QuorumX509Util getX509Util() {
return x509Util;
}

/**
* This is who I think the leader currently is.
*/
Expand Down Expand Up @@ -801,6 +808,7 @@ public QuorumPeer() throws SaslException {
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = new QuorumX509Util();
initialize();
}

Expand Down

0 comments on commit 119baf7

Please sign in to comment.