diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 9531152239..687911c52e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -90,6 +91,8 @@ public class StatisticManager implements IStatisticManager { protected Map processInfosThatHaveDoneWork = new ConcurrentHashMap(); + private Map> baseChannelStatsInMemory = new LinkedHashMap>(); + public StatisticManager(IParameterService parameterService, INodeService nodeService, IConfigurationService configurationService, IStatisticService statisticsService, IClusterService clusterService) { @@ -458,6 +461,10 @@ public void incrementTriggersCreatedCount(long count) { } protected void saveAdditionalStats(Date endTime, ChannelStats stats) { + if (baseChannelStatsInMemory.get(endTime) == null) { + baseChannelStatsInMemory.put(endTime, new HashMap()); + } + baseChannelStatsInMemory.get(endTime).put(stats.getChannelId(), stats); } public void flush() { @@ -490,6 +497,45 @@ public void flush() { } } + int rowsLoaded = 0; + int rowsSent = 0; + + for (Map.Entry> entry : baseChannelStatsInMemory.entrySet()) { + + for (Map.Entry channelEntry : entry.getValue().entrySet()) { + if (!channelEntry.getKey().equals("config") && !channelEntry.getKey().equals("heartbeat")) { + rowsLoaded += channelEntry.getValue().getDataLoaded(); + rowsSent += channelEntry.getValue().getDataSent(); + } + } + if (rowsLoaded > 0 || rowsSent > 0) { + log.debug("==================================="); + log.debug("Date: " + entry.getKey()); + log.debug("Rows Out: " + rowsSent); + log.debug("Rows In: " + rowsLoaded); + log.debug("==================================="); + } + } + + if (hostStats != null) { + hostStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS); + try { + if (recordStatistics) { + if (hostStats.getNodeId().equals(UNKNOWN)) { + Node node = nodeService.getCachedIdentity(); + if (node != null) { + hostStats.setNodeId(node.getNodeId()); + } + } + hostStats.setEndTime(new Date()); + statisticService.save(hostStats); + } + hostStats = null; + } finally { + hostStatsLock.release(NUMBER_OF_PERMITS); + } + } + if (jobStats != null) { List toFlush = null; jobStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS);