Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
JIRA-1137
Browse files Browse the repository at this point in the history
closes #26
  • Loading branch information
Hassan Eslami authored and Hassan Eslami committed Mar 27, 2017
1 parent a1d546f commit 2173d87
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 29 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.giraph.comm.netty.NettyClient;
Expand All @@ -32,6 +33,7 @@
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.AdjustableSemaphore;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;

import java.util.ArrayDeque;
Expand All @@ -41,6 +43,8 @@
import java.util.Deque;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -165,6 +169,14 @@ public class CreditBasedFlowControl implements FlowControl {
*/
private final ConcurrentMap<Integer, Set<Long>> resumeRequestsId =
Maps.newConcurrentMap();
/**
* Queue of the cached requests to be sent out. The queue keeps pairs of
* (destination id, request). The thread-safe blocking queue is used here for
* the sake of simplicity. The blocking queue should be bounded (with bounds
* no more than user-defined max number of unsent/cached requests) to avoid
* excessive memory footprint.
*/
private final BlockingQueue<Pair<Integer, WritableRequest>> toBeSent;
/**
* Semaphore to control number of cached unsent requests. Maximum number of
* permits of this semaphore should be equal to MAX_NUM_OF_UNSENT_REQUESTS.
Expand All @@ -180,7 +192,7 @@ public class CreditBasedFlowControl implements FlowControl {
* @param exceptionHandler Exception handler
*/
public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
NettyClient nettyClient,
final NettyClient nettyClient,
Thread.UncaughtExceptionHandler
exceptionHandler) {
this.nettyClient = nettyClient;
Expand All @@ -189,10 +201,15 @@ public CreditBasedFlowControl(ImmutableClassesGiraphConfiguration conf,
checkState(maxOpenRequestsPerWorker < 0x4000 &&
maxOpenRequestsPerWorker > 0, "NettyClient: max number of open " +
"requests should be in range (0, " + 0x4FFF + ")");
unsentRequestPermit = new Semaphore(MAX_NUM_OF_UNSENT_REQUESTS.get(conf));
int maxUnsentRequests = MAX_NUM_OF_UNSENT_REQUESTS.get(conf);
unsentRequestPermit = new Semaphore(maxUnsentRequests);
this.toBeSent = new ArrayBlockingQueue<Pair<Integer, WritableRequest>>(
maxUnsentRequests);
unsentWaitMsecs = UNSENT_CACHE_WAIT_INTERVAL.get(conf);
waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
Thread thread = new Thread(new Runnable() {

// Thread to handle/send resume signals when necessary
ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
Expand All @@ -214,11 +231,31 @@ public void run() {
}
}
}
});
thread.setUncaughtExceptionHandler(exceptionHandler);
thread.setName("resume-sender");
thread.setDaemon(true);
thread.start();
}, "resume-sender", exceptionHandler);

// Thread to handle/send cached requests
ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
Pair<Integer, WritableRequest> pair = null;
try {
pair = toBeSent.take();
} catch (InterruptedException e) {
throw new IllegalStateException("run: failed while waiting to " +
"take an element from the request queue!", e);
}
int taskId = pair.getLeft();
WritableRequest request = pair.getRight();
nettyClient.doSend(taskId, request);
if (aggregateUnsentRequests.decrementAndGet() == 0) {
synchronized (aggregateUnsentRequests) {
aggregateUnsentRequests.notifyAll();
}
}
}
}
}, "cached-req-sender", exceptionHandler);
}

/**
Expand Down Expand Up @@ -510,13 +547,14 @@ private void trySendCachedRequests(int taskId) {
}
unsentRequestPermit.release();
// At this point, we have a request, and we reserved a credit for the
// sender client. So, we send the request to the client and update
// the state.
nettyClient.doSend(taskId, request);
if (aggregateUnsentRequests.decrementAndGet() == 0) {
synchronized (aggregateUnsentRequests) {
aggregateUnsentRequests.notifyAll();
}
// sender client. So, we put the request in a queue to be sent to the
// client.
try {
toBeSent.put(
new ImmutablePair<Integer, WritableRequest>(taskId, request));
} catch (InterruptedException e) {
throw new IllegalStateException("trySendCachedRequests: failed while" +
"waiting to put element in send queue!", e);
}
}
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.ooc.command.LoadPartitionIOCommand;
import org.apache.giraph.ooc.command.WaitIOCommand;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.EdgeInputSplitsCallable;
import org.apache.giraph.worker.VertexInputSplitsCallable;
import org.apache.giraph.worker.WorkerProgress;
Expand Down Expand Up @@ -171,7 +172,7 @@ public MemoryEstimatorOracle(ImmutableClassesGiraphConfiguration conf,

final long checkMemoryInterval = CHECK_MEMORY_INTERVAL.get(conf);

Thread thread = new Thread(new Runnable() {
ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
Expand Down Expand Up @@ -211,12 +212,8 @@ public void run() {
}
}
}
});
thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
.getGraphTaskManager().createUncaughtExceptionHandler());
thread.setName("ooc-memory-checker");
thread.setDaemon(true);
thread.start();
}, "ooc-memory-checker", oocEngine.getServiceWorker().getGraphTaskManager()
.createUncaughtExceptionHandler());
}

/**
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.giraph.ooc.OutOfCoreEngine;
import org.apache.giraph.ooc.command.IOCommand;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.log4j.Logger;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -170,7 +171,7 @@ public ThresholdBasedOracle(ImmutableClassesGiraphConfiguration conf,
this.oocEngine = oocEngine;
this.lastMajorGCTime = 0;

final Thread thread = new Thread(new Runnable() {
ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
Expand Down Expand Up @@ -207,12 +208,8 @@ public void run() {
}
}
}
});
thread.setUncaughtExceptionHandler(oocEngine.getServiceWorker()
.getGraphTaskManager().createUncaughtExceptionHandler());
thread.setName("memory-checker");
thread.setDaemon(true);
thread.start();
}, "memory-checker", oocEngine.getServiceWorker().getGraphTaskManager().
createUncaughtExceptionHandler());
}

/**
Expand Down
18 changes: 18 additions & 0 deletions giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
Expand Up @@ -105,6 +105,24 @@ public static Thread startThread(Runnable runnable, String threadName) {
return thread;
}

/**
* Start thread with specified name, runnable and exception handler, and make
* it daemon
*
* @param runnable Runnable to execute
* @param threadName Name of the thread
* @param handler Exception handler
* @return Thread
*/
public static Thread startThread(Runnable runnable, String threadName,
Thread.UncaughtExceptionHandler handler) {
Thread thread = new Thread(runnable, threadName);
thread.setUncaughtExceptionHandler(handler);
thread.setDaemon(true);
thread.start();
return thread;
}

/**
* Sleep for specified milliseconds, logging and ignoring interrupted
* exceptions
Expand Down

0 comments on commit 2173d87

Please sign in to comment.