Skip to content
Permalink
Browse files
GIRAPH-1228
closes #114
  • Loading branch information
dlogothetis committed Nov 19, 2019
1 parent 13da0eb commit acc1f71af03dec6f1a1c83fb0fcce08451e140ea
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 50 deletions.
@@ -18,6 +18,7 @@

package org.apache.giraph.comm.netty;

import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.CreditBasedFlowControl;
import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.flow_control.NoOpFlowControl;
@@ -252,9 +253,9 @@ public class NettyClient {
* terminate job.
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context,
final ImmutableClassesGiraphConfiguration conf,
TaskInfo myTaskInfo,
final Thread.UncaughtExceptionHandler exceptionHandler) {
final ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo,
final Thread.UncaughtExceptionHandler exceptionHandler) {

this.context = context;
this.myTaskInfo = myTaskInfo;
this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);
@@ -280,16 +281,13 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,

initialiseCounters();
networkRequestsResentForTimeout =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
networkRequestsResentForChannelFailure =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
networkRequestsResentForConnectionFailure =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
new GiraphHadoopCounter(context.getCounter(NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));

maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
@@ -343,6 +341,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (conf.authenticate()) {
LOG.info("Using Netty with authentication.");

PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
// Our pipeline starts with just byteCounter, and then we use
// addLast() to incrementally add pipeline elements, so that we
// can name them for identification for removal or replacement
@@ -394,6 +396,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
} else {
LOG.info("Using Netty without authentication.");
/*end[HADOOP_NON_SECURE]*/
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("clientInboundByteCounter",
inboundByteCounter, handlerToUseExecutionGroup,
executionGroup, ch);
@@ -864,13 +870,17 @@ public Long doSend(int destTaskId, WritableRequest request) {
}

/**
* Write request to a channel for its destination
* Write request to a channel for its destination.
*
* Whenever we write to the channel, we also call flush, but we have added a
* {@link FlushConsolidationHandler} in the pipeline, which batches the
* flushes.
*
* @param requestInfo Request info
*/
private void writeRequestToChannel(RequestInfo requestInfo) {
Channel channel = getNextChannel(requestInfo.getDestinationAddress());
ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
ChannelFuture writeFuture = channel.writeAndFlush(requestInfo.getRequest());
requestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
}
@@ -18,6 +18,7 @@

package org.apache.giraph.comm.netty;

import io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.giraph.comm.flow_control.FlowControl;
/*if_not[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.netty.handler.AuthorizeServerHandler;
@@ -257,6 +258,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
// pipeline components SaslServerHandler and ResponseEncoder are
// removed, leaving the pipeline the same as in the non-authenticated
// configuration except for the presence of the Authorize component.
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
@@ -307,6 +312,10 @@ public void channelActive(ChannelHandlerContext ctx)
ctx.fireChannelActive();
}
});
PipelineUtils.addLastWithExecutorCheck("flushConsolidation",
new FlushConsolidationHandler(FlushConsolidationHandler
.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true),
handlerToUseExecutionGroup, executionGroup, ch);
PipelineUtils.addLastWithExecutorCheck("serverInboundByteCounter",
inByteCounter, handlerToUseExecutionGroup, executionGroup, ch);
if (conf.doCompression()) {
@@ -20,7 +20,6 @@

import org.apache.giraph.comm.flow_control.FlowControl;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.time.SystemTime;
@@ -32,8 +31,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;

/**
@@ -64,10 +61,6 @@
private long startProcessingNanoseconds = -1;
/** Handler for uncaught exceptions */
private final Thread.UncaughtExceptionHandler exceptionHandler;
/** Whether it is the first time reading/handling a request*/
private final AtomicBoolean firstRead = new AtomicBoolean(true);
/** Cached value for NETTY_AUTO_READ configuration option */
private final boolean nettyAutoRead;

/**
* Constructor
@@ -86,7 +79,6 @@ public RequestServerHandler(
closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
this.myTaskInfo = myTaskInfo;
this.exceptionHandler = exceptionHandler;
this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
}

@Override
@@ -141,24 +133,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
flowControl.calculateResponse(alreadyDone, request.getClientId());
buffer.writeInt(signal);
ctx.write(buffer);
// NettyServer is bootstrapped with auto-read set to true by default. After
// the first request is processed, we set auto-read to false. This prevents
// netty from reading requests continuously and putting them in off-heap
// memory. Instead, we will call `read` on requests one by one, so that the
// lower level transport layer handles the congestion if the rate of
// incoming requests is more than the available processing capability.
if (!nettyAutoRead && firstRead.compareAndSet(true, false)) {
ctx.channel().config().setAutoRead(false);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!nettyAutoRead) {
ctx.read();
} else {
super.channelReadComplete(ctx);
}
}

/**
@@ -132,8 +132,6 @@ public void configure(GiraphConfiguration conf) {
StaticFlowControl.MAX_NUMBER_OF_OPEN_REQUESTS.setIfUnset(conf, 100);
// Pooled allocator in netty is faster
GiraphConstants.NETTY_USE_POOLED_ALLOCATOR.setIfUnset(conf, true);
// Turning off auto read is faster
GiraphConstants.NETTY_AUTO_READ.setIfUnset(conf, false);

// Synchronize full gc calls across workers
MemoryObserver.USE_MEMORY_OBSERVER.setIfUnset(conf, true);
@@ -653,15 +653,6 @@ public interface GiraphConstants {
new StrConfOption("giraph.nettyCompressionAlgorithm", "",
"Which compression algorithm to use in netty");

/**
* Whether netty should pro-actively read requests and feed them to its
* processing pipeline
*/
BooleanConfOption NETTY_AUTO_READ =
new BooleanConfOption("giraph.nettyAutoRead", true,
"Whether netty should pro-actively read requests and feed them to " +
"its processing pipeline");

/** Max resolve address attempts */
IntConfOption MAX_RESOLVE_ADDRESS_ATTEMPTS =
new IntConfOption("giraph.maxResolveAddressAttempts", 5,
@@ -1355,7 +1355,7 @@ public MessageToByteEncoder getNettyCompressionEncoder() {
public ByteToMessageDecoder getNettyCompressionDecoder() {
switch (GiraphConstants.NETTY_COMPRESSION_ALGORITHM.get(this)) {
case "SNAPPY":
return new SnappyFramedDecoder(true);
return new SnappyFramedDecoder();
case "INFLATE":
return new JdkZlibDecoder();
default:
@@ -350,7 +350,7 @@ under the License.
<dep.log4j.version>1.2.17</dep.log4j.version>
<dep.mockito.version>1.9.5</dep.mockito.version>
<!-- note: old version of netty is required by hadoop_facebook for tests to succeed -->
<dep.netty.version>4.0.14.Final</dep.netty.version>
<dep.netty.version>4.1.36.Final</dep.netty.version>
<dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version>
<dep.objenesis.version>2.2</dep.objenesis.version>
<dep.openhft-compiler.version>2.2.1</dep.openhft-compiler.version>

0 comments on commit acc1f71

Please sign in to comment.