Permalink
Browse files

[REM3-141] Fixes to make cancellation of IoFuture<ConnectdStreamChann…

…el> and IoFuture<ConnectedSslStreamChannel> work properly.

NioXnioWorker.connectTcpStream should check if the IoFuture<ConnectedStreamChannel> has been canceled. In case future is cancelled, tcp channel should be closed.
Plus, JsseXnioSsl.connectSsl must connect the Ssl channel future with the underlying non-ssl future, to make cancellation work on both, if one of them is cancelled.
Finally, JsseConnectedSslStreamChannel must handle the scenario where the other end is cancelled prior to creation of a corresponding JsseConnectedSslStreamChannel.
Without an SSLEngine on the client side, no ssl message will be wrapped to the server ssl channel.
  • Loading branch information...
1 parent 7a720be commit 82a4420c55a1281a635c94c98e34765d3d0068c6 @fl4via fl4via committed Feb 14, 2012
@@ -393,7 +393,8 @@ private boolean handleHandshake(SSLEngineResult result, boolean write) throws IO
final ByteBuffer buffer = receiveBuffer.getResource();
final ByteBuffer unwrappedBuffer = readBuffer.getResource();
synchronized (getReadLock()) {
- if (handleUnwrapResult(result = unwrap(buffer, unwrappedBuffer)) >= 0) { // FIXME what if the unwrap return buffer overflow???
+ int unwrapResult = handleUnwrapResult(result = unwrap(buffer, unwrappedBuffer));
+ if (unwrapResult >= 0) { // FIXME what if the unwrap return buffer overflow???
// have we made some progress?
if(result.getHandshakeStatus() != HandshakeStatus.NEED_UNWRAP || result.bytesConsumed() > 0) {
if (result.bytesProduced() > 0 || buffer.hasRemaining()) {
@@ -406,6 +407,10 @@ private boolean handleHandshake(SSLEngineResult result, boolean write) throws IO
// no point in proceeding, we're stuck until the user reads anyway
setWriteRequiresRead();
return false;
+ } else if (unwrapResult == -1 && result.getHandshakeStatus() == HandshakeStatus.NEED_UNWRAP) {
+ // connection has been closed by peer prior to handshake finished
+ this.close();
+ throw new ClosedChannelException();
}
}
continue;
@@ -33,6 +33,7 @@
import org.xnio.BufferAllocator;
import org.xnio.ByteBufferSlicePool;
+import org.xnio.Cancellable;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.FutureResult;
@@ -115,11 +116,14 @@ public SSLContext getSslContext() {
public IoFuture<ConnectedSslStreamChannel> connectSsl(final XnioWorker worker, final InetSocketAddress bindAddress, final InetSocketAddress destination, final ChannelListener<? super ConnectedSslStreamChannel> openListener, final ChannelListener<? super BoundChannel> bindListener, final OptionMap optionMap) {
final FutureResult<ConnectedSslStreamChannel> futureResult = new FutureResult<ConnectedSslStreamChannel>(IoUtils.directExecutor());
- worker.connectStream(bindAddress, destination, new ChannelListener<ConnectedStreamChannel>() {
+ final IoFuture<ConnectedStreamChannel> connectedChannelFuture = worker.connectStream(bindAddress, destination, new ChannelListener<ConnectedStreamChannel>() {
public void handleEvent(final ConnectedStreamChannel tcpChannel) {
final ConnectedSslStreamChannel channel = createSslConnectedStreamChannel(sslContext, tcpChannel, optionMap);
- futureResult.setResult(channel);
- ChannelListeners.invokeChannelListener(channel, openListener);
+ if (!futureResult.setResult(channel)) {
+ IoUtils.safeClose(channel);
+ } else {
+ ChannelListeners.invokeChannelListener(channel, openListener);
+ }
}
}, bindListener, optionMap).addNotifier(new IoFuture.HandlingNotifier<ConnectedStreamChannel, FutureResult<ConnectedSslStreamChannel>>() {
public void handleCancelled(final FutureResult<ConnectedSslStreamChannel> result) {
@@ -130,6 +134,20 @@ public void handleFailed(final IOException exception, final FutureResult<Connect
result.setException(exception);
}
}, futureResult);
+ futureResult.getIoFuture().addNotifier(new IoFuture.HandlingNotifier<ConnectedStreamChannel, IoFuture<ConnectedStreamChannel>>() {
+ public void handleCancelled(final IoFuture<ConnectedStreamChannel> result) {
+ result.cancel();
+ }
+ }, connectedChannelFuture);
+ futureResult.addCancelHandler(new Cancellable() {
+
+ @Override
+ public Cancellable cancel() {
+ futureResult.setCancelled();
+ return this;
+ }
+
+ });
return futureResult.getIoFuture();
}
@@ -322,9 +322,13 @@ public void handleEvent(final NioTcpChannel channel) {
if (socketChannel.finishConnect()) {
connectHandle.suspend();
connectHandle.getHandlerSetter().set(null);
- futureResult.setResult(tcpChannel);
- //noinspection unchecked
- ChannelListeners.invokeChannelListener(tcpChannel, openListener);
+ if (!futureResult.setResult(tcpChannel)) {
+ // if futureResult is canceled, close channel
+ IoUtils.safeClose(channel);
+ } else {
+ //noinspection unchecked
+ ChannelListeners.invokeChannelListener(tcpChannel, openListener);
+ }
}
} catch (IOException e) {
IoUtils.safeClose(channel);
@@ -339,7 +343,7 @@ public String toString() {
futureResult.addCancelHandler(new Cancellable() {
public Cancellable cancel() {
if (futureResult.setCancelled()) {
- IoUtils.safeClose(channel);
+ IoUtils.safeClose(tcpChannel);
}
return this;
}

0 comments on commit 82a4420

Please sign in to comment.