Skip to content

Commit

Permalink
Fixed SSL bugs. Added test.
Browse files Browse the repository at this point in the history
  • Loading branch information
niktikhonov committed Jul 23, 2015
1 parent c5dc492 commit e37efa3
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 56 deletions.
Expand Up @@ -2064,9 +2064,8 @@ private void ackP2pConfiguration() {
private void ackSecurity() { private void ackSecurity() {
assert log != null; assert log != null;


if (log.isInfoEnabled()) U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled())
log.info("Security status [authentication=" + onOff(ctx.security().enabled()) + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
+ ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']');
} }


/** /**
Expand Down
Expand Up @@ -39,14 +39,14 @@ public class BlockingSslHandler {
/** Logger. */ /** Logger. */
private IgniteLogger log; private IgniteLogger log;


/** */ /** Socket channel. */
private SocketChannel ch; private SocketChannel ch;


/** */ /** Order. */
private GridFutureAdapter<ByteBuffer> fut; private final ByteOrder order;


/** SSL engine. */ /** SSL engine. */
private SSLEngine sslEngine; private final SSLEngine sslEngine;


/** Handshake completion flag. */ /** Handshake completion flag. */
private boolean handshakeFinished; private boolean handshakeFinished;
Expand All @@ -69,33 +69,38 @@ public class BlockingSslHandler {
/** /**
* @param sslEngine SSLEngine. * @param sslEngine SSLEngine.
* @param ch Socket channel. * @param ch Socket channel.
* @param fut Future. * @param directBuf Direct buffer flag.
* @param order Byte order.
* @param log Logger. * @param log Logger.
*/ */
public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter<ByteBuffer> fut, public BlockingSslHandler(SSLEngine sslEngine,
IgniteLogger log) throws SSLException { SocketChannel ch,
boolean directBuf,
ByteOrder order,
IgniteLogger log)
throws SSLException {
this.ch = ch; this.ch = ch;
this.fut = fut;
this.log = log; this.log = log;

this.sslEngine = sslEngine; this.sslEngine = sslEngine;
this.order = order;


// Allocate a little bit more so SSL engine would not return buffer overflow status. // Allocate a little bit more so SSL engine would not return buffer overflow status.
int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50; int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50;


outNetBuf = ByteBuffer.allocate(netBufSize); outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
inNetBuf = ByteBuffer.allocate(netBufSize); outNetBuf.order(order);


// Initially buffer is empty. // Initially buffer is empty.
outNetBuf.position(0); outNetBuf.position(0);
outNetBuf.limit(0); outNetBuf.limit(0);


inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize);
inNetBuf.order(order);

appBuf = allocateAppBuff(); appBuf = allocateAppBuff();


handshakeStatus = sslEngine.getHandshakeStatus(); handshakeStatus = sslEngine.getHandshakeStatus();


sslEngine.setUseClientMode(true);

if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']'); log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']');
} }
Expand All @@ -122,12 +127,6 @@ public boolean handshake() throws IgniteCheckedException, SSLException {
case FINISHED: { case FINISHED: {
handshakeFinished = true; handshakeFinished = true;


if (fut != null) {
appBuf.flip();

fut.onDone(appBuf);
}

loop = false; loop = false;


break; break;
Expand Down Expand Up @@ -186,6 +185,15 @@ public boolean handshake() throws IgniteCheckedException, SSLException {
return handshakeFinished; return handshakeFinished;
} }


/**
* @return Application buffer with decoded data.
*/
public ByteBuffer applicationBuffer() {
appBuf.flip();

return appBuf;
}

/** /**
* Encrypts data to be written to the network. * Encrypts data to be written to the network.
* *
Expand Down Expand Up @@ -439,27 +447,32 @@ private ByteBuffer allocateAppBuff() {


int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2); int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2);


return ByteBuffer.allocate(appBufSize); ByteBuffer buf = ByteBuffer.allocate(appBufSize);
buf.order(order);

return buf;
} }


/** /**
* Read data from net buffer. * Read data from net buffer.
*/ */
private void readFromNet() { private void readFromNet() throws IgniteCheckedException {
try { try {
inNetBuf.clear(); inNetBuf.clear();


ch.read(inNetBuf); int read = ch.read(inNetBuf);

if (read == -1)
throw new IgniteCheckedException("Failed to read remote node ID (connection closed).");
} }
catch (IOException e) { catch (IOException e) {
e.printStackTrace(); throw new IgniteCheckedException("Failed to write byte to socket.", e);
} }
} }


/** /**
* Copies data from out net buffer and passes it to the underlying chain. * Copies data from out net buffer and passes it to the underlying chain.
* *
* @return Nothing.
* @throws GridNioException If send failed. * @throws GridNioException If send failed.
*/ */
private void writeNetBuffer() throws IgniteCheckedException { private void writeNetBuffer() throws IgniteCheckedException {
Expand Down
Expand Up @@ -2051,12 +2051,18 @@ protected GridCommunicationClient createTcpClient(ClusterNode node) throws Ignit


long rcvCnt = -1; long rcvCnt = -1;


GridTuple<SSLEngine> ssl = new GridTuple<>(); SSLEngine sslEngine = null;


try { try {
ch.socket().connect(addr, (int)connTimeout); ch.socket().connect(addr, (int)connTimeout);


rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl); if (isSslEnabled()) {
sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine();

sslEngine.setUseClientMode(true);
}

rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine);


if (rcvCnt == -1) if (rcvCnt == -1)
return null; return null;
Expand All @@ -2072,10 +2078,9 @@ protected GridCommunicationClient createTcpClient(ClusterNode node) throws Ignit
meta.put(NODE_ID_META, node.id()); meta.put(NODE_ID_META, node.id());


if (isSslEnabled()) { if (isSslEnabled()) {
assert ssl != null; assert sslEngine != null;
assert ssl.get() != null;


meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get()); meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine);
} }
if (recoveryDesc != null) { if (recoveryDesc != null) {
recoveryDesc.onHandshake(rcvCnt); recoveryDesc.onHandshake(rcvCnt);
Expand Down Expand Up @@ -2211,7 +2216,7 @@ private <T> long safeHandshake(
@Nullable GridNioRecoveryDescriptor recovery, @Nullable GridNioRecoveryDescriptor recovery,
UUID rmtNodeId, UUID rmtNodeId,
long timeout, long timeout,
@Nullable GridTuple<SSLEngine> ssl @Nullable SSLEngine ssl
) throws IgniteCheckedException { ) throws IgniteCheckedException {
HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);


Expand All @@ -2233,23 +2238,14 @@ private <T> long safeHandshake(
ByteBuffer buf; ByteBuffer buf;


if (isSslEnabled()) { if (isSslEnabled()) {
GridFutureAdapter<ByteBuffer> handFut = new GridFutureAdapter<>(); sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log);

SSLEngine sslEngine = ignite.configuration().getSslContextFactory()
.create().createSSLEngine();

sslEngine.setUseClientMode(true);

sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log);


if (!sslHnd.handshake()) if (!sslHnd.handshake())
throw new IgniteCheckedException("SSL handshake isn't completed."); throw new IgniteCheckedException("SSL handshake is not completed.");

ssl.set(sslEngine);


ByteBuffer handBuff = handFut.get(); ByteBuffer handBuff = sslHnd.applicationBuffer();


if (handBuff.limit() < 17) { if (handBuff.remaining() < 17) {
buf = ByteBuffer.allocate(1000); buf = ByteBuffer.allocate(1000);


int read = ch.read(buf); int read = ch.read(buf);
Expand Down Expand Up @@ -2338,18 +2334,30 @@ else if (log.isDebugEnabled())


buf = ByteBuffer.allocate(1000); buf = ByteBuffer.allocate(1000);


ByteBuffer decode = null;

buf.order(ByteOrder.nativeOrder()); buf.order(ByteOrder.nativeOrder());


int read = ch.read(buf); for (int i = 0; i < 9; ) {
int read = ch.read(buf);


if (read == -1) if (read == -1)
throw new IgniteCheckedException("Failed to read remote node recovery handshake " + throw new IgniteCheckedException("Failed to read remote node recovery handshake " +
"(connection closed)."); "(connection closed).");


buf.flip(); buf.flip();

decode = sslHnd.decode(buf);


rcvCnt = sslHnd.decode(buf).getLong(1); i += decode.remaining();
} else {
buf.flip();
buf.compact();
}

rcvCnt = decode.getLong(1);
}
else {
buf = ByteBuffer.allocate(9); buf = ByteBuffer.allocate(9);


buf.order(ByteOrder.nativeOrder()); buf.order(ByteOrder.nativeOrder());
Expand Down
Expand Up @@ -4242,10 +4242,11 @@ else if (log.isDebugEnabled())
if (log.isDebugEnabled()) if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);


if (X.hasCause(e, SSLException.class) && spi.isSslEnabled()) if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " + LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " +
"Missed SSL configuration on node? [sock=" + sock + ']'); "Missed SSL configuration on node? [sock=" + sock + ']');
else if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) { else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
&& !spi.isNodeStopping0()) {
if (U.isMacInvalidArgumentError(e)) if (U.isMacInvalidArgumentError(e))
LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" + LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" +
U.MAC_INVALID_ARG_MSG); U.MAC_INVALID_ARG_MSG);
Expand Down
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.communication.tcp;

import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.testframework.*;

/**
*
*/
public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstractSelfTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);

cfg.setSslContextFactory(GridTestUtils.sslFactory());

return cfg;
}

/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.ATOMIC;
}

/** {@inheritDoc} */
@Override protected int keysCount() {
return 60_000;
}
}
Expand Up @@ -23,7 +23,7 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.distributed.replicated.*; import org.apache.ignite.spi.communication.tcp.*;
import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.*;


import java.util.*; import java.util.*;
Expand Down Expand Up @@ -75,6 +75,8 @@ public static TestSuite suite(Set<Class> ignoredTests) throws Exception {
suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class); suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class); suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);


suite.addTestSuite(IgniteCacheSslStartStopSelfTest.class);

return suite; return suite;
} }
} }

0 comments on commit e37efa3

Please sign in to comment.