Navigation Menu

Skip to content

Commit

Permalink
0003423: Node host stats not working
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Feb 15, 2018
1 parent e9e094a commit 1ef5c85
Showing 1 changed file with 46 additions and 0 deletions.
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class StatisticManager implements IStatisticManager {

protected Map<ProcessInfoKey, ProcessInfo> processInfosThatHaveDoneWork = new ConcurrentHashMap<ProcessInfoKey, ProcessInfo>();

private Map<Date, Map<String, ChannelStats>> baseChannelStatsInMemory = new LinkedHashMap<Date, Map<String, ChannelStats>>();

public StatisticManager(IParameterService parameterService, INodeService nodeService,
IConfigurationService configurationService, IStatisticService statisticsService,
IClusterService clusterService) {
Expand Down Expand Up @@ -458,6 +461,10 @@ public void incrementTriggersCreatedCount(long count) {
}

protected void saveAdditionalStats(Date endTime, ChannelStats stats) {
if (baseChannelStatsInMemory.get(endTime) == null) {
baseChannelStatsInMemory.put(endTime, new HashMap<String, ChannelStats>());
}
baseChannelStatsInMemory.get(endTime).put(stats.getChannelId(), stats);
}

public void flush() {
Expand Down Expand Up @@ -490,6 +497,45 @@ public void flush() {
}
}

int rowsLoaded = 0;
int rowsSent = 0;

for (Map.Entry<Date, Map<String, ChannelStats>> entry : baseChannelStatsInMemory.entrySet()) {

for (Map.Entry<String, ChannelStats> channelEntry : entry.getValue().entrySet()) {
if (!channelEntry.getKey().equals("config") && !channelEntry.getKey().equals("heartbeat")) {
rowsLoaded += channelEntry.getValue().getDataLoaded();
rowsSent += channelEntry.getValue().getDataSent();
}
}
if (rowsLoaded > 0 || rowsSent > 0) {
log.debug("===================================");
log.debug("Date: " + entry.getKey());
log.debug("Rows Out: " + rowsSent);
log.debug("Rows In: " + rowsLoaded);
log.debug("===================================");
}
}

if (hostStats != null) {
hostStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS);
try {
if (recordStatistics) {
if (hostStats.getNodeId().equals(UNKNOWN)) {
Node node = nodeService.getCachedIdentity();
if (node != null) {
hostStats.setNodeId(node.getNodeId());
}
}
hostStats.setEndTime(new Date());
statisticService.save(hostStats);
}
hostStats = null;
} finally {
hostStatsLock.release(NUMBER_OF_PERMITS);
}
}

if (jobStats != null) {
List<JobStats> toFlush = null;
jobStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS);
Expand Down

0 comments on commit 1ef5c85

Please sign in to comment.