Skip to content
Permalink
Browse files
GIRAPH-1251
closes #150
  • Loading branch information
atanu1991 authored and dlogothetis committed Jun 3, 2021
1 parent 8ba2fea commit 2c63aa2321a570f7dfcc659f565d18afb63a2e3c
Show file tree
Hide file tree
Showing 10 changed files with 606 additions and 6 deletions.
@@ -227,7 +227,7 @@
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
<property name="max" value="210"/>
<property name="max" value="220"/>
</module>
<module name="ParameterNumber">
<property name="max" value="8"/>
@@ -241,6 +241,8 @@ public class NettyClient {
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;
/** Netty SSL Handler class */
private final NettySSLHandler nettySSLHandler;

/**
* Keeps track of the number of reconnect failures. Once this exceeds the
@@ -325,6 +327,12 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
executionGroup = null;
}

if (conf.sslAuthenticate()) {
nettySSLHandler = new NettySSLHandler(true, conf);
} else {
nettySSLHandler = null;
}

workerGroup = new NioEventLoopGroup(maxPoolSize,
ThreadUtils.createThreadFactory(
"netty-client-worker-%d", exceptionHandler));
@@ -395,11 +403,19 @@ protected void initChannel(SocketChannel ch) throws Exception {
new SaslClientHandler(conf), handlerToUseExecutionGroup,
executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("response-handler",
new ResponseClientHandler(NettyClient.this, conf),
new ResponseClientHandler(
NettyClient.this, conf, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);
} else {
LOG.info("Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/

if (conf.sslAuthenticate()) {
PipelineUtils.addLastWithExecutorCheck("sslHandler",
nettySSLHandler.getSslHandler(ch.alloc()),
handlerToUseExecutionGroup, executionGroup, ch);
}

PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
@@ -429,8 +445,11 @@ protected void initChannel(SocketChannel ch) throws Exception {
PipelineUtils.addLastWithExecutorCheck("request-encoder",
new RequestEncoder(conf), handlerToUseExecutionGroup,
executionGroup, ch);
// ResponseClientHandler is the last handler in channel pipeline
// It handles the SSL Exception in a special way
PipelineUtils.addLastWithExecutorCheck("response-handler",
new ResponseClientHandler(NettyClient.this, conf),
new ResponseClientHandler(
NettyClient.this, conf, exceptionHandler),
handlerToUseExecutionGroup, executionGroup, ch);

/*if_not[HADOOP_NON_SECURE]*/
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.giraph.comm.netty;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.log4j.Logger;

import javax.net.ssl.SSLException;

/**
* Class to handle all SSL related logic. This class creates the SSL Context,
* sets up SSL handler and the interface for custom SSL event handling. When
* SSL_ENCRYPT is set to true, an object of this class is created and the
* getSslHandler function is called to get the SSL handler and add it to the
* Netty channel pipeline. This class also adds an event listener function to
* on handshake complete event and calls the custom
* {@link SSLEventHandler} handleOnSslHandshakeComplete function, if defined.
*/
public class NettySSLHandler
{
/** Class Logger */
private static final Logger LOG = Logger.getLogger(NettySSLHandler.class);
/** Client or Server */
private boolean isClient;
/** Giraph Configuration */
private ImmutableClassesGiraphConfiguration conf;
/** SSL Event Handler interface */
private SSLEventHandler sslEventHandler;
/** SSL Context object */
private SslContext sslContext;

/**
* Constructor
*
* This is called before the Netty Channel is setup. This initializes the
* SslContext object, which will be used by the SSL Handler. Since this
* class is instantiated once for every Netty client and server, creating
* the SslContext ensures that we create it only once and re-use it to
* create the SSL handlers (getSslHandler) instead of creating the
* SslContext for every call to getSslHandler.
*
* @param isClient Client/server for which the ssl handler
* needs to be created
* @param conf configuration object
*/
public NettySSLHandler(
boolean isClient,
ImmutableClassesGiraphConfiguration conf) {
this.isClient = isClient;
this.conf = conf;
this.sslEventHandler = conf.createSSLEventHandler();
try {
this.sslContext = new SSLConfig.Builder(
this.isClient, this.conf, SSLConfig.VerifyMode.VERIFY_REQ_CLIENT_CERT)
.build()
.buildSslContext();
} catch (SSLException e) {
LOG.error("Failed to build SSLConfig object " + e.getCause());
throw new IllegalStateException(e);
}
}

/**
* Build the Client or server SSL Context, create new SSL handler,
* add a listener function to onSslHandshakeComplete and return
*
* @param allocator ByteBufAllocator of the channel
*
* @return The SSL Handler object
*/
public SslHandler getSslHandler(ByteBufAllocator allocator)
{
SslHandler handler = this.sslContext.newHandler(allocator);
handler.handshakeFuture().addListener(
f -> onSslHandshakeComplete(f, handler));
return handler;
}

/**
* Build the Client or server SSL Context, create new SSL handler,
* add a listener function to onSslHandshakeComplete and return
*
* @param future Future object to be notified once handshake completes
* @param sslHandler SslHandler object
*
* @throws Exception
*/
private void onSslHandshakeComplete(
Future<? super Channel> future,
SslHandler sslHandler) throws Exception
{
// If no custom SSL Event Handler is defined, return
if (sslEventHandler == null) {
return;
}
try {
sslEventHandler.handleOnSslHandshakeComplete(
future, sslHandler, isClient);
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
// If there is any exception from onSslHandshakeComplete
// Cast it to SSLException and propagate it down the channel
LOG.error("Error in handleOnSslHandshakeComplete: " + e.getMessage());
Channel ch = (Channel) future.getNow();
ch.pipeline().fireExceptionCaught(new SSLException(e));
}
LOG.debug("onSslHandshakeComplete succeeded");
}
}
@@ -127,7 +127,8 @@ public class NettyServer {
private final String handlerToUseExecutionGroup;
/** Handles all uncaught exceptions in netty threads */
private final Thread.UncaughtExceptionHandler exceptionHandler;

/** Netty SSL Handler class */
private final NettySSLHandler nettySSLHandler;

/**
* Constructor for creating the server
@@ -192,6 +193,12 @@ public NettyServer(ImmutableClassesGiraphConfiguration conf,
} else {
executionGroup = null;
}

if (conf.sslAuthenticate()) {
nettySSLHandler = new NettySSLHandler(false, conf);
} else {
nettySSLHandler = null;
}
}

/*if_not[HADOOP_NON_SECURE]*/
@@ -311,6 +318,13 @@ protected void initChannel(SocketChannel ch) throws Exception {
} else {
LOG.info("start: Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/

if (conf.sslAuthenticate()) {
PipelineUtils.addLastWithExecutorCheck("sslHandler",
nettySSLHandler.getSslHandler(ch.alloc()),
handlerToUseExecutionGroup, executionGroup, ch);
}

// Store all connected channels in order to ensure that we can close
// them on stop(), or else stop() may hang waiting for the
// connections to close on their own

0 comments on commit 2c63aa2

Please sign in to comment.