From e37efa3357d96e7831068eaec29627bd1bcc2ba0 Mon Sep 17 00:00:00 2001 From: nikolay_tikhonov Date: Thu, 23 Jul 2015 11:37:26 +0300 Subject: [PATCH] Fixed SSL bugs. Added test. --- .../apache/ignite/internal/IgniteKernal.java | 5 +- .../util/nio/ssl/BlockingSslHandler.java | 61 +++++++++++-------- .../tcp/TcpCommunicationSpi.java | 60 ++++++++++-------- .../ignite/spi/discovery/tcp/ServerImpl.java | 5 +- .../tcp/IgniteCacheSslStartStopSelfTest.java | 46 ++++++++++++++ .../IgniteCacheFailoverTestSuite.java | 4 +- 6 files changed, 125 insertions(+), 56 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8a246dc4e8be0..b7462611e1053 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2064,9 +2064,8 @@ private void ackP2pConfiguration() { private void ackSecurity() { assert log != null; - if (log.isInfoEnabled()) - log.info("Security status [authentication=" + onOff(ctx.security().enabled()) - + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']'); + U.quietAndInfo(log, "Security status [authentication=" + onOff(ctx.security().enabled()) + + ", communication encrypted=" + onOff(ctx.config().getSslContextFactory() != null) + ']'); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java index eee90d8c51696..9890efeb25550 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/BlockingSslHandler.java @@ -39,14 +39,14 @@ public class BlockingSslHandler { /** Logger. */ private IgniteLogger log; - /** */ + /** Socket channel. */ private SocketChannel ch; - /** */ - private GridFutureAdapter fut; + /** Order. */ + private final ByteOrder order; /** SSL engine. */ - private SSLEngine sslEngine; + private final SSLEngine sslEngine; /** Handshake completion flag. */ private boolean handshakeFinished; @@ -69,33 +69,38 @@ public class BlockingSslHandler { /** * @param sslEngine SSLEngine. * @param ch Socket channel. - * @param fut Future. + * @param directBuf Direct buffer flag. + * @param order Byte order. * @param log Logger. */ - public BlockingSslHandler(SSLEngine sslEngine, SocketChannel ch, GridFutureAdapter fut, - IgniteLogger log) throws SSLException { + public BlockingSslHandler(SSLEngine sslEngine, + SocketChannel ch, + boolean directBuf, + ByteOrder order, + IgniteLogger log) + throws SSLException { this.ch = ch; - this.fut = fut; this.log = log; - this.sslEngine = sslEngine; + this.order = order; // Allocate a little bit more so SSL engine would not return buffer overflow status. int netBufSize = sslEngine.getSession().getPacketBufferSize() + 50; - outNetBuf = ByteBuffer.allocate(netBufSize); - inNetBuf = ByteBuffer.allocate(netBufSize); + outNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize); + outNetBuf.order(order); // Initially buffer is empty. outNetBuf.position(0); outNetBuf.limit(0); + inNetBuf = directBuf ? ByteBuffer.allocateDirect(netBufSize) : ByteBuffer.allocate(netBufSize); + inNetBuf.order(order); + appBuf = allocateAppBuff(); handshakeStatus = sslEngine.getHandshakeStatus(); - sslEngine.setUseClientMode(true); - if (log.isDebugEnabled()) log.debug("Started SSL session [netBufSize=" + netBufSize + ", appBufSize=" + appBuf.capacity() + ']'); } @@ -122,12 +127,6 @@ public boolean handshake() throws IgniteCheckedException, SSLException { case FINISHED: { handshakeFinished = true; - if (fut != null) { - appBuf.flip(); - - fut.onDone(appBuf); - } - loop = false; break; @@ -186,6 +185,15 @@ public boolean handshake() throws IgniteCheckedException, SSLException { return handshakeFinished; } + /** + * @return Application buffer with decoded data. + */ + public ByteBuffer applicationBuffer() { + appBuf.flip(); + + return appBuf; + } + /** * Encrypts data to be written to the network. * @@ -439,27 +447,32 @@ private ByteBuffer allocateAppBuff() { int appBufSize = Math.max(sslEngine.getSession().getApplicationBufferSize() + 50, netBufSize * 2); - return ByteBuffer.allocate(appBufSize); + ByteBuffer buf = ByteBuffer.allocate(appBufSize); + buf.order(order); + + return buf; } /** * Read data from net buffer. */ - private void readFromNet() { + private void readFromNet() throws IgniteCheckedException { try { inNetBuf.clear(); - ch.read(inNetBuf); + int read = ch.read(inNetBuf); + + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node ID (connection closed)."); } catch (IOException e) { - e.printStackTrace(); + throw new IgniteCheckedException("Failed to write byte to socket.", e); } } /** * Copies data from out net buffer and passes it to the underlying chain. * - * @return Nothing. * @throws GridNioException If send failed. */ private void writeNetBuffer() throws IgniteCheckedException { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 99ca2b7daa7ae..48dc52ed23650 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -2051,12 +2051,18 @@ protected GridCommunicationClient createTcpClient(ClusterNode node) throws Ignit long rcvCnt = -1; - GridTuple ssl = new GridTuple<>(); + SSLEngine sslEngine = null; try { ch.socket().connect(addr, (int)connTimeout); - rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, ssl); + if (isSslEnabled()) { + sslEngine = ignite.configuration().getSslContextFactory().create().createSSLEngine(); + + sslEngine.setUseClientMode(true); + } + + rcvCnt = safeHandshake(ch, recoveryDesc, node.id(), connTimeout0, sslEngine); if (rcvCnt == -1) return null; @@ -2072,10 +2078,9 @@ protected GridCommunicationClient createTcpClient(ClusterNode node) throws Ignit meta.put(NODE_ID_META, node.id()); if (isSslEnabled()) { - assert ssl != null; - assert ssl.get() != null; + assert sslEngine != null; - meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), ssl.get()); + meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); } if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); @@ -2211,7 +2216,7 @@ private long safeHandshake( @Nullable GridNioRecoveryDescriptor recovery, UUID rmtNodeId, long timeout, - @Nullable GridTuple ssl + @Nullable SSLEngine ssl ) throws IgniteCheckedException { HandshakeTimeoutObject obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout); @@ -2233,23 +2238,14 @@ private long safeHandshake( ByteBuffer buf; if (isSslEnabled()) { - GridFutureAdapter handFut = new GridFutureAdapter<>(); - - SSLEngine sslEngine = ignite.configuration().getSslContextFactory() - .create().createSSLEngine(); - - sslEngine.setUseClientMode(true); - - sslHnd = new BlockingSslHandler(sslEngine, ch, handFut, log); + sslHnd = new BlockingSslHandler(ssl, ch, directBuf, ByteOrder.nativeOrder(), log); if (!sslHnd.handshake()) - throw new IgniteCheckedException("SSL handshake isn't completed."); - - ssl.set(sslEngine); + throw new IgniteCheckedException("SSL handshake is not completed."); - ByteBuffer handBuff = handFut.get(); + ByteBuffer handBuff = sslHnd.applicationBuffer(); - if (handBuff.limit() < 17) { + if (handBuff.remaining() < 17) { buf = ByteBuffer.allocate(1000); int read = ch.read(buf); @@ -2338,18 +2334,30 @@ else if (log.isDebugEnabled()) buf = ByteBuffer.allocate(1000); + ByteBuffer decode = null; + buf.order(ByteOrder.nativeOrder()); - int read = ch.read(buf); + for (int i = 0; i < 9; ) { + int read = ch.read(buf); - if (read == -1) - throw new IgniteCheckedException("Failed to read remote node recovery handshake " + - "(connection closed)."); + if (read == -1) + throw new IgniteCheckedException("Failed to read remote node recovery handshake " + + "(connection closed)."); - buf.flip(); + buf.flip(); + + decode = sslHnd.decode(buf); - rcvCnt = sslHnd.decode(buf).getLong(1); - } else { + i += decode.remaining(); + + buf.flip(); + buf.compact(); + } + + rcvCnt = decode.getLong(1); + } + else { buf = ByteBuffer.allocate(9); buf.order(ByteOrder.nativeOrder()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 34f90f73145a3..68552a6032316 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4242,10 +4242,11 @@ else if (log.isDebugEnabled()) if (log.isDebugEnabled()) U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e); - if (X.hasCause(e, SSLException.class) && spi.isSslEnabled()) + if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0()) LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " + "Missed SSL configuration on node? [sock=" + sock + ']'); - else if (X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) { + else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed()) + && !spi.isNodeStopping0()) { if (U.isMacInvalidArgumentError(e)) LT.error(log, e, "Failed to initialize connection [sock=" + sock + "]\n\t" + U.MAC_INVALID_ARG_MSG); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java new file mode 100644 index 0000000000000..9bf6caae96748 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.testframework.*; + +/** + * + */ +public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstractSelfTest { + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setSslContextFactory(GridTestUtils.sslFactory()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 60_000; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index 80bfbf29badf3..524bfb3837d61 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.processors.cache.distributed.replicated.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.testframework.*; import java.util.*; @@ -75,6 +75,8 @@ public static TestSuite suite(Set ignoredTests) throws Exception { suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class); suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class); + suite.addTestSuite(IgniteCacheSslStartStopSelfTest.class); + return suite; } }