Skip to content
Permalink
Browse files
GIRAPH-1230
closes #118
  • Loading branch information
dlogothetis committed Dec 16, 2019
1 parent 526f561 commit f8d017e61d66ec56b17ecf796743d6851c2f0988
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 22 deletions.
@@ -227,7 +227,7 @@
</module>
<!-- Over time, we will revised this down -->
<module name="MethodLength">
<property name="max" value="200"/>
<property name="max" value="210"/>
</module>
<module name="ParameterNumber">
<property name="max" value="8"/>
@@ -25,12 +25,15 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.log4j.Logger;


/**
* Maintains multiple channels and rotates between them. This is thread-safe.
*/
public class ChannelRotater {
/** Logger */
private static final Logger LOG = Logger.getLogger(ChannelRotater.class);
/** Index of last used channel */
private int index = 0;
/** Channel list */
@@ -73,9 +76,9 @@ public synchronized void addChannel(Channel channel) {
*/
public synchronized Channel nextChannel() {
if (channelList.isEmpty()) {
throw new IllegalArgumentException(
"nextChannel: No channels exist for hostname " +
address.getHostName());
LOG.warn("nextChannel: No channels exist for hostname " +
address.getHostName());
return null;
}

++index;
@@ -242,6 +242,11 @@ public class NettyClient {
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;

/**
* Keeps track of the number of reconnect failures. Once this exceeds the
* value of {@link #maxConnectionFailures}, the job will fail.
*/
private int reconnectFailures = 0;

/**
* Only constructor
@@ -764,26 +769,25 @@ public void operationComplete(ChannelFuture cf) {
private Channel getNextChannel(InetSocketAddress remoteServer) {
Channel channel = addressChannelMap.get(remoteServer).nextChannel();
if (channel == null) {
throw new IllegalStateException(
"getNextChannel: No channel exists for " + remoteServer);
}

// Return this channel if it is connected
if (channel.isActive()) {
return channel;
}
LOG.warn("getNextChannel: No channel exists for " + remoteServer);
} else {
// Return this channel if it is connected
if (channel.isActive()) {
return channel;
}

// Get rid of the failed channel
if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
LOG.warn("getNextChannel: Unlikely event that the channel " +
// Get rid of the failed channel
if (addressChannelMap.get(remoteServer).removeChannel(channel)) {
LOG.warn("getNextChannel: Unlikely event that the channel " +
channel + " was already removed!");
}
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Fixing disconnected channel to " +
}
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Fixing disconnected channel to " +
remoteServer + ", open = " + channel.isOpen() + ", " +
"bound = " + channel.isRegistered());
}
}
int reconnectFailures = 0;

while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
try {
@@ -1205,15 +1209,14 @@ public boolean apply(RequestInfo requestInfo) {
* This listener class just dumps exception stack traces if
* something happens.
*/
private class LogOnErrorChannelFutureListener
private static class LogOnErrorChannelFutureListener
implements ChannelFutureListener {

@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isDone() && !future.isSuccess()) {
LOG.error("Channel failed channelId=" + future.channel().hashCode(),
future.cause());
checkRequestsAfterChannelFailure(future.channel());
}
}
}
@@ -1075,6 +1075,18 @@ public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler() {
getConf()), getJobProgressTracker());
}

/**
* Creates exception handler with the passed implementation of
* {@link CheckerIfWorkerShouldFailAfterException}.
*
* @param checker Instance that checks whether the job should fail.
* @return Exception handler.
*/
public Thread.UncaughtExceptionHandler createUncaughtExceptionHandler(
CheckerIfWorkerShouldFailAfterException checker) {
return new OverrideExceptionHandler(checker, getJobProgressTracker());
}

public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
return conf;
}
@@ -1128,6 +1140,9 @@ public OverrideExceptionHandler(
@Override
public void uncaughtException(final Thread t, final Throwable e) {
if (!checker.checkIfWorkerShouldFail(t, e)) {
LOG.error(
"uncaughtException: OverrideExceptionHandler on thread " +
t.getName() + ", msg = " + e.getMessage(), e);
return;
}
try {
@@ -1169,4 +1184,17 @@ public boolean checkIfWorkerShouldFail(Thread thread, Throwable exception) {
return true;
}
}

/**
* Checks the message of a throwable, and checks whether it is a
* "connection reset by peer" type of exception.
*
* @param throwable Throwable
* @return True if the throwable is a "connection reset by peer",
* false otherwise.
*/
public static boolean isConnectionResetByPeer(Throwable throwable) {
return throwable.getMessage().startsWith(
"Connection reset by peer") ? true : false;
}
}
@@ -121,6 +121,8 @@

import com.google.common.collect.Lists;

import static org.apache.giraph.graph.GraphTaskManager.isConnectionResetByPeer;

/**
* ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
*
@@ -217,7 +219,15 @@ public BspServiceWorker(
getGraphPartitionerFactory().createWorkerGraphPartitioner();
workerInfo = new WorkerInfo();
workerServer = new NettyWorkerServer<I, V, E>(conf, this, context,
graphTaskManager.createUncaughtExceptionHandler());
graphTaskManager.createUncaughtExceptionHandler(
(thread, throwable) -> {
// If the connection was closed by the client, then we just log
// the error, we do not fail the job, since the client will
// attempt to reconnect.
return !isConnectionResetByPeer(throwable);
}
)
);
workerInfo.setInetSocketAddress(workerServer.getMyAddress(),
workerServer.getLocalHostOrIp());
workerInfo.setTaskId(getTaskId());

0 comments on commit f8d017e

Please sign in to comment.