Skip to content
Permalink
Browse files
GIRAPH-799
closes #117
  • Loading branch information
aanchal204 authored and dlogothetis committed Dec 9, 2019
1 parent 8c6c587 commit 526f5619e6b115ad8db1af245fd4736125dd5c37
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 16 deletions.
@@ -183,7 +183,10 @@ void cleanup(SuperstepState superstepState)

/**
* Add the Giraph Timers to thirft counter struct, and send to the job client
* Counters include the Giraph Timers for setup, initialise, shutdown, total,
* and time for the given superstep
* @param superstep superstep for which the GiraphTimer will be sent
*
*/
void addGiraphTimersAndSendCounters();
void addGiraphTimersAndSendCounters(long superstep);
}
@@ -19,6 +19,7 @@
package org.apache.giraph.counters;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -73,11 +74,16 @@ public static void addCustomCounter(String groupName, String counterName,

/**
* Get the unique counter group and names
* This will also clear the counters list, to avoid duplicate
* counters from the previous superstep from being sent to the
* zookeeper again
*
* @return Map of unique counter names
*/
public static Set<CustomCounter> getCustomCounters() {
return COUNTER_NAMES;
public static Set<CustomCounter> getAndClearCustomCounters() {
Set<CustomCounter> counterNamesCopy = new HashSet<>(COUNTER_NAMES);
COUNTER_NAMES.clear();
return counterNamesCopy;
}

/**
@@ -184,23 +184,26 @@ public Iterator<GiraphHadoopCounter> iterator() {
}

/**
* Get a map of counter names and values
*
* Get a map of counter names and values for the given superstep
* Counters include Setup, Initialise, Shutdown, Total, and time for
* the given superstep
* @param superstep superstep for which to fetch the GiraphTimer
* @return Map of counter names and values
*/
public List<CustomCounter> getCounterList() {
public List<CustomCounter> getCounterList(long superstep) {
List<CustomCounter> countersList = new ArrayList<>();
for (GiraphHadoopCounter counter: jobCounters) {
CustomCounter customCounter = new CustomCounter(
GROUP_NAME, counter.getName(),
CustomCounter.Aggregation.SUM, counter.getValue());
countersList.add(customCounter);
}
for (Map.Entry<Long, GiraphHadoopCounter> entry :
superstepMsec.entrySet()) {
GiraphHadoopCounter giraphHadoopCounter = superstepMsec.get(superstep);
if (giraphHadoopCounter != null) {
CustomCounter customCounter = new CustomCounter(
GROUP_NAME, entry.getValue().getName(),
CustomCounter.Aggregation.SUM, entry.getValue().getValue());
GROUP_NAME, giraphHadoopCounter.getName(),
CustomCounter.Aggregation.SUM,
giraphHadoopCounter.getValue());
countersList.add(customCounter);
}
return countersList;
@@ -1678,7 +1678,6 @@ public int compare(WorkerInfo wi1, WorkerInfo wi2) {
// are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
aggregateCountersFromWorkersAndMaster();
addGiraphTimersAndSendCounters();
if (masterCompute.isHalted() ||
(globalStats.getFinishedVertexCount() ==
globalStats.getVertexCount() &&
@@ -1927,7 +1926,7 @@ private void aggregateCountersFromWorkersAndMaster() {
// we should not add them again here.
Counter counter;
Set<CustomCounter> masterCounterNames =
CustomCounters.getCustomCounters();
CustomCounters.getAndClearCustomCounters();
for (CustomCounter customCounter : masterCounterNames) {
String groupName = customCounter.getGroupName();
String counterName = customCounter.getCounterName();
@@ -1961,12 +1960,13 @@ private void aggregateCountersFromWorkersAndMaster() {
* the time required for shutdown and cleanup
* This will fetch the final Giraph Timers, and send all the counters
* to the job client
* @param superstep superstep for which the GiraphTimer will be sent
*
*/
public void addGiraphTimersAndSendCounters() {
public void addGiraphTimersAndSendCounters(long superstep) {
List<CustomCounter> giraphCounters =
giraphCountersThriftStruct.getCounters();
giraphCounters.addAll(GiraphTimers.getInstance().getCounterList());
giraphCounters.addAll(GiraphTimers.getInstance().getCounterList(superstep));
giraphCountersThriftStruct.setCounters(giraphCounters);
getJobProgressTracker().sendMasterCounters(giraphCountersThriftStruct);
}
@@ -147,6 +147,7 @@ public void run() {
GiraphTimers.getInstance().getSuperstepMs(cachedSuperstep,
computationName).increment(superstepMillis);
}
bspServiceMaster.addGiraphTimersAndSendCounters(cachedSuperstep);

bspServiceMaster.postSuperstep();

@@ -191,7 +192,8 @@ public void run() {
GiraphTimers.getInstance().getTotalMs().
increment(System.currentTimeMillis() - initializeMillis);
}
bspServiceMaster.addGiraphTimersAndSendCounters();
bspServiceMaster.addGiraphTimersAndSendCounters(
bspServiceMaster.getSuperstep());
bspServiceMaster.postApplication();
// CHECKSTYLE: stop IllegalCatchCheck
} catch (Exception e) {
@@ -1251,7 +1251,7 @@ public void cleanup(FinishedSuperstepStats finishedSuperstepStats)
*/
public void storeCountersInZooKeeper(boolean allSuperstepsDone) {
Set<CustomCounter> additionalCounters =
CustomCounters.getCustomCounters();
CustomCounters.getAndClearCustomCounters();

JSONArray jsonCounters = new JSONArray();
Mapper.Context context = getContext();

0 comments on commit 526f561

Please sign in to comment.