Skip to content
Permalink
Browse files
GIRAPH-1227
closes #110
  • Loading branch information
aanchal204 authored and dlogothetis committed Nov 12, 2019
1 parent 19f76a3 commit 13da0ebb032035880adb65ef0a9bd0efc0511499
Show file tree
Hide file tree
Showing 24 changed files with 905 additions and 48 deletions.
@@ -23,6 +23,8 @@
import org.apache.giraph.block_app.framework.api.StatusReporter;
import org.apache.giraph.block_app.framework.internal.BlockMasterLogic.TimeStatsPerEvent;

import org.apache.giraph.counters.CustomCounter;
import org.apache.giraph.counters.CustomCounters;
import org.apache.hadoop.mapreduce.Mapper;

/** Utility class for Blocks Framework related counters */
@@ -49,6 +51,9 @@ public static void setStageCounters(
for (Field field : fields) {
try {
long value = field.getLong(stage);
String counterName = prefix + field.getName();
CustomCounters.addCustomCounter(GROUP, counterName,
CustomCounter.Aggregation.SUM);
reporter.getCounter(
GROUP, prefix + field.getName()).setValue(value);

@@ -67,11 +72,12 @@ public static void setMasterTimeCounter(
long millis, StatusReporter reporter,
TimeStatsPerEvent timeStats) {
String name = masterPiece.getPiece().toString();
reporter.getCounter(
GROUP + " Master Timers",
String.format(
"In %6.1f %s (s)", superstep - 0.5, name)
).setValue(millis / 1000);
String groupName = GROUP + " Master Timers";
String counterName = String.format(
"In %6.1f %s (s)", superstep - 0.5, name);
CustomCounters.addCustomCounter(groupName, counterName,
CustomCounter.Aggregation.SUM);
reporter.getCounter(groupName, counterName).setValue(millis / 1000);
timeStats.inc(name, millis);
}

@@ -80,10 +86,11 @@ public static void setWorkerTimeCounter(
long millis, StatusReporter reporter,
TimeStatsPerEvent timeStats) {
String name = workerPieces.toStringShort();
reporter.getCounter(
GROUP + " Worker Timers",
String.format("In %6d %s (s)", superstep, name)
).setValue(millis / 1000);
String groupName = GROUP + " Worker Timers";
String counterName = String.format("In %6d %s (s)", superstep, name);
CustomCounters.addCustomCounter(groupName, counterName,
CustomCounter.Aggregation.SUM);
reporter.getCounter(groupName, counterName).setValue(millis / 1000);
timeStats.inc(name, millis);
}

@@ -27,6 +27,8 @@
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.counters.CustomCounter;
import org.apache.giraph.counters.CustomCounters;
import org.apache.giraph.factories.DefaultMessageValueFactory;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.types.NoMessage;
@@ -51,6 +53,16 @@
private static final
String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces";

/** Passed worker stats counter group */
private static final String PASSED_WORKER_STATS_GROUP = "PassedWorker Stats";

/** Total serialised size counter name */
private static final String TOTAL_SERIALISED_SIZE_NAME =
"total serialized size";

/** Split parts counter name */
private static final String SPLIT_PARTS_NAME = "split parts";

private final PairedPieceAndStage<S> receiver;
private final PairedPieceAndStage<S> sender;
private final BlockApiHandle blockApiHandle;
@@ -135,11 +147,14 @@ public static <S> void setNextWorkerPieces(

LOG.info("Next worker piece - total serialized size: " + data.length +
", split into " + splittedData.size());
master.getContext().getCounter(
"PassedWorker Stats", "total serialized size")
CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
TOTAL_SERIALISED_SIZE_NAME, CustomCounter.Aggregation.SUM);
master.getContext().getCounter(PASSED_WORKER_STATS_GROUP,
TOTAL_SERIALISED_SIZE_NAME)
.increment(data.length);
master.getContext().getCounter(
"PassedWorker Stats", "split parts")
CustomCounters.addCustomCounter(PASSED_WORKER_STATS_GROUP,
SPLIT_PARTS_NAME, CustomCounter.Aggregation.SUM);
master.getContext().getCounter(PASSED_WORKER_STATS_GROUP, SPLIT_PARTS_NAME)
.increment(splittedData.size());

master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size()));
@@ -92,6 +92,10 @@
public static final String MASTER_ELECTION_DIR = "/_masterElectionDir";
/** Superstep scope */
public static final String SUPERSTEP_DIR = "/_superstepDir";
/** Counter sub directory */
public static final String COUNTERS_DIR = "/_counters";
/** Metrics sub directory */
public static final String METRICS_DIR = "/_metrics";
/** Healthy workers register here. */
public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir";
/** Unhealthy workers register here. */
@@ -178,6 +182,9 @@
private final BspEvent masterElectionChildrenChanged;
/** Cleaned up directory children changed*/
private final BspEvent cleanedUpChildrenChanged;
/** Event to synchronize when workers have written their counters to the
* zookeeper*/
private final BspEvent writtenCountersToZK;
/** Registered list of BspEvents */
private final List<BspEvent> registeredBspEvents =
new ArrayList<BspEvent>();
@@ -223,6 +230,7 @@ public BspService(
this.superstepFinished = new PredicateLock(context);
this.masterElectionChildrenChanged = new PredicateLock(context);
this.cleanedUpChildrenChanged = new PredicateLock(context);
this.writtenCountersToZK = new PredicateLock(context);

registerBspEvent(connectedEvent);
registerBspEvent(workerHealthRegistrationChanged);
@@ -232,6 +240,7 @@ public BspService(
registerBspEvent(superstepFinished);
registerBspEvent(masterElectionChildrenChanged);
registerBspEvent(cleanedUpChildrenChanged);
registerBspEvent(writtenCountersToZK);

this.context = context;
this.graphTaskManager = graphTaskManager;
@@ -248,8 +257,10 @@ public BspService(
this.graphPartitionerFactory = conf.createGraphPartitioner();

basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId;
getContext().getCounter(GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP,
basePath);
if (LOG.isInfoEnabled()) {
LOG.info(String.format("%s: %s",
GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP, basePath));
}
masterJobStatePath = basePath + MASTER_JOB_STATE_NODE;
inputSplitsWorkerDonePath = basePath + INPUT_SPLITS_WORKER_DONE_DIR;
inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
@@ -421,15 +432,31 @@ public final String getWorkerWroteCheckpointPath(long attempt,

/**
* Generate the worker "finished" directory path for a
* superstep
* superstep, for storing the superstep-related metrics
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
public final String getWorkerMetricsFinishedPath(
long attempt, long superstep) {
return applicationAttemptsPath + "/" + attempt +
SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR + METRICS_DIR;
}

/**
* Generate the worker "finished" directory path for a
* superstep, for storing the superstep-related counters
*
* @param attempt application attempt number
* @param superstep superstep to use
* @return directory path based on the a superstep
*/
public final String getWorkerFinishedPath(long attempt, long superstep) {
public final String getWorkerCountersFinishedPath(
long attempt, long superstep) {
return applicationAttemptsPath + "/" + attempt +
SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR;
SUPERSTEP_DIR + "/" + superstep +
WORKER_FINISHED_DIR + COUNTERS_DIR;
}

/**
@@ -585,6 +612,10 @@ public final BspEvent getCleanedUpChildrenChangedEvent() {
return cleanedUpChildrenChanged;
}

public final BspEvent getWrittenCountersToZKEvent() {
return writtenCountersToZK;
}

/**
* Get the master commanded job state as a JSONObject. Also sets the
* watches to see if the master commanded job state changes.
@@ -911,6 +942,11 @@ public final void process(WatchedEvent event) {
}
cleanedUpChildrenChanged.signal();
eventProcessed = true;
} else if (event.getPath().endsWith(COUNTERS_DIR) &&
event.getType() == EventType.NodeChildrenChanged) {
LOG.info("process: writtenCountersToZK signaled");
getWrittenCountersToZKEvent().signal();
eventProcessed = true;
}

if (!(processEvent(event)) && (!eventProcessed)) {
@@ -180,4 +180,10 @@ void setJobState(ApplicationState state,
*/
void cleanup(SuperstepState superstepState)
throws IOException, InterruptedException;

/**
* Add the Giraph Timers to thirft counter struct, and send to the job client
*
*/
void addGiraphTimersAndSendCounters();
}
@@ -256,4 +256,19 @@ void cleanup(FinishedSuperstepStats finishedSuperstepStats)
*/
void addressesAndPartitionsReceived(
AddressesAndPartitionsWritable addressesAndPartitions);

/**
* Store the counter values in the zookeeper after every superstep
* and also after all supersteps are done. This is called before closing
* the zookeeper. We need to call this method after calling cleanup on the
* worker, since some counters are updated during cleanup
* @param allSuperstepsDone boolean value whether all the supersteps
* are completed
*/
void storeCountersInZooKeeper(boolean allSuperstepsDone);

/**
* Close zookeeper
*/
void closeZooKeeper();
}
@@ -60,8 +60,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -150,6 +153,9 @@ public class NettyClient {

/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Netty related counter names */
private static Map<String, Set<String>> COUNTER_GROUP_AND_NAMES =
new HashMap<>();
/** Context used to report progress */
private final Mapper<?, ?, ?, ?>.Context context;
/** Client bootstrap */
@@ -235,6 +241,7 @@ public class NettyClient {
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;


/**
* Only constructor
*
@@ -271,6 +278,7 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
flowControl = new NoOpFlowControl(this);
}

initialiseCounters();
networkRequestsResentForTimeout =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
@@ -442,6 +450,23 @@ public void run() {
}, "open-requests-observer");
}

/**
* Put the Netty-related counters in a single map which will be accessed
* from the worker/master
*/
private void initialiseCounters() {
Set<String> counters = COUNTER_GROUP_AND_NAMES.getOrDefault(
NETTY_COUNTERS_GROUP, new HashSet<>());
counters.add(NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME);
counters.add(NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME);
counters.add(NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME);
COUNTER_GROUP_AND_NAMES.put(NETTY_COUNTERS_GROUP, counters);
}

public static Map<String, Set<String>> getCounterGroupsAndNames() {
return COUNTER_GROUP_AND_NAMES;
}

/**
* Whether master task is involved in the communication with a given client
*
@@ -1088,6 +1088,10 @@ public int getMaxMasterSuperstepWaitMsecs() {
return MAX_MASTER_SUPERSTEP_WAIT_MSECS.get(this);
}

public int getMaxCounterWaitMsecs() {
return MAX_COUNTER_WAIT_MSECS.get(this);
}

/**
* Set the maximum milliseconds to wait before giving up trying to get the
* minimum number of workers before a superstep.
@@ -687,6 +687,16 @@ public interface GiraphConstants {
"Maximum milliseconds to wait before giving up trying to get the " +
"minimum number of workers before a superstep (int).");

/**
* Maximum milliseconds to wait before giving up waiting for the workers to
* write the counters to the Zookeeper after a superstep
*/
IntConfOption MAX_COUNTER_WAIT_MSECS = new IntConfOption(
"giraph.maxCounterWaitMsecs", MINUTES.toMillis(2),
"Maximum milliseconds to wait before giving up waiting for" +
"the workers to write their counters to the " +
"zookeeper after a superstep");

/** Milliseconds for a request to complete (or else resend) */
IntConfOption MAX_REQUEST_MILLISECONDS =
new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10),

0 comments on commit 13da0eb

Please sign in to comment.