diff --git a/symmetric-assemble/.DS_Store b/symmetric-assemble/.DS_Store new file mode 100644 index 0000000000..10ccb6857a Binary files /dev/null and b/symmetric-assemble/.DS_Store differ diff --git a/symmetric-assemble/src/asciidoc/images/.DS_Store b/symmetric-assemble/src/asciidoc/images/.DS_Store new file mode 100644 index 0000000000..ee109d63f8 Binary files /dev/null and b/symmetric-assemble/src/asciidoc/images/.DS_Store differ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java index d9880df594..7ab173c780 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java @@ -1,5 +1,5 @@ /** - * Licensed to JumpMind Inc under one or more contributor +] * Licensed to JumpMind Inc under one or more contributor * license agreements. See the NOTICE file distributed * with this work for additional information regarding * copyright ownership. JumpMind Inc licenses this file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatchSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatchSummary.java index e917765e6b..18fe6056b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatchSummary.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatchSummary.java @@ -39,6 +39,7 @@ public class IncomingBatchSummary implements Serializable { private String channel; private String sqlMessage; private long errorBatchId; + private float averageRowsPerMilli; public String getNodeId() { return nodeId; @@ -112,6 +113,12 @@ public void setErrorBatchId(long errorBatchId) { this.errorBatchId = errorBatchId; } - + public float getAverageRowsPerMilli() { + return averageRowsPerMilli; + } + + public void setAverageRowsPerMilli(float averageRowsPerMilli) { + this.averageRowsPerMilli = averageRowsPerMilli; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatchSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatchSummary.java index aa107187aa..98f8f93da7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatchSummary.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatchSummary.java @@ -41,6 +41,7 @@ public class OutgoingBatchSummary implements Serializable { private long errorBatchId; private long totalBytes; private long totalMillis; + private float averageRowsPerMilli; public String getNodeId() { return nodeId; @@ -129,4 +130,14 @@ public long getTotalMillis() { public void setTotalMillis(long totalMillis) { this.totalMillis = totalMillis; } + + public float getAverageRowsPerMilli() { + return averageRowsPerMilli; + } + + public void setAverageRowsPerMilli(float averageRowsPerMilli) { + this.averageRowsPerMilli = averageRowsPerMilli; + } + + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 687ec9417a..627b9457bb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -97,6 +97,8 @@ public interface IOutgoingBatchService { public List findOutgoingBatchSummary(OutgoingBatch.Status ... statuses); public List findOutgoingBatchSummaryByChannel(OutgoingBatch.Status ... statuses); + + public Map getNodeThroughputByChannel(); public int countOutgoingBatches(List nodeIds, List channels, List statuses, List loads); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java index bf52efebcd..1a95c04788 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java @@ -42,6 +42,8 @@ public interface IStatisticService { public void save(JobStats stats); public TreeMap> getChannelStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); + + public TreeMap> getNodeStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); public TreeMap getHostStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 189586f470..1a34952963 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -829,7 +829,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe synchronized (lock) { if (!isPreviouslyExtracted(currentBatch)) { - currentBatch.setExtractCount(currentBatch.getExtractCount() + 1); + if (currentBatch.getExtractStartTime() == null) { currentBatch.setExtractStartTime(new Date()); } @@ -852,6 +852,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe extractTimeInMs = System.currentTimeMillis() - ts; Statistics stats = getExtractStats(writer); transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS); + currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.STATEMENTCOUNT)); extractTimeInMs = extractTimeInMs - transformTimeInMs; byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index efc1be7bae..86ee3edf06 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -609,6 +609,30 @@ public List findOutgoingBatchSummaryByChannel(Status... st return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryChannelMapper(), args); } + public Map getNodeThroughputByChannel() { + String sql = getSql("getNodeThroughputByChannelSql"); + NodeThroughputMapper mapper = new NodeThroughputMapper(); + + List temp = sqlTemplateDirty.query(sql, mapper); + return mapper.getThroughputMap(); + } + + private class NodeThroughputMapper implements ISqlRowMapper { + Map throughputMap = new HashMap(); + + @Override + public Object mapRow(Row row) { + throughputMap.put(row.getString("node_id") + "-" + row.getString("channel_id") + "-" + row.get("direction"), row.getFloat("rows_per_milli")); + return null; + } + + public Map getThroughputMap() { + return throughputMap; + } + + + } + public Set getActiveLoads(String sourceNodeId) { Set loads = new HashSet(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 57f18a73f9..553b3fa9f1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -218,6 +218,19 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " last_update_hostname, current_timestamp, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) "); + putSql("getNodeThroughputByChannelSql", "select node_id, channel_id, direction, rows_per_milli from ( " + + " select node_id, channel_id, 'outgoing' as direction, " + + " sum(byte_count) / avg(datediff(ms, create_time, last_update_time)) as rows_per_milli " + + " from sym_outgoing_batch where status = 'OK' and sent_count = 1 " + + " group by node_id, channel_id order by node_id " + + " ) " + + " union all " + + " select * from ( " + + " select node_id, channel_id, 'incoming' as direction, " + + " sum(statement_count) / avg(datediff(ms, create_time, last_update_time)) as rows_per_milli " + + " from sym_incoming_batch where status = 'OK' " + + " group by node_id, channel_id order by node_id " + + " )"); } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java index e036001800..4b26fd96d5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java @@ -37,6 +37,7 @@ import org.jumpmind.symmetric.statistic.HostStats; import org.jumpmind.symmetric.statistic.HostStatsByPeriodMap; import org.jumpmind.symmetric.statistic.JobStats; +import org.jumpmind.symmetric.statistic.NodeStatsByPeriodMap; /** * @see IStatisticService @@ -87,6 +88,13 @@ public TreeMap> getChannelStatsForPeriod(Date st return new ChannelStatsByPeriodMap(start, end, list, periodSizeInMinutes); } + public TreeMap> getNodeStatsForPeriod(Date start, Date end, + String nodeId, int periodSizeInMinutes) { + List list = sqlTemplateDirty.query(getSql("selectNodeStatsSql"), + new ChannelStatsMapper(), start, end, nodeId); + return new NodeStatsByPeriodMap(start, end, list, periodSizeInMinutes); + } + public void save(HostStats stats) { sqlTemplate.update( getSql("insertHostStatsSql"), @@ -139,8 +147,12 @@ class ChannelStatsMapper implements ISqlRowMapper { public ChannelStats mapRow(Row rs) { ChannelStats stats = new ChannelStats(); stats.setNodeId(rs.getString("node_id")); - stats.setHostName(rs.getString("host_name")); - stats.setChannelId(rs.getString("channel_id")); + try { + stats.setHostName(rs.getString("host_name")); + stats.setChannelId(rs.getString("channel_id")); + } + catch (Exception e) { + } stats.setStartTime(truncateToMinutes(rs.getDateTime("start_time"))); stats.setEndTime(truncateToMinutes(rs.getDateTime("end_time"))); stats.setDataRouted(rs.getLong("data_routed")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java index abc1c7fdb6..aef01c8ee7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java @@ -46,6 +46,21 @@ public StatisticServiceSqlMap(IDatabasePlatform platform, Map re " data_loaded, data_bytes_loaded, data_loaded_errors " + " from $(node_host_channel_stats) " + " where start_time >= ? and end_time <= ? and node_id=? order by start_time asc " ); + + putSql("selectNodeStatsSql", "" + +"select node_id, start_time, end_time, " + +" sum(data_routed) as data_routed, sum(data_unrouted) as data_unrouted, " + +" sum(data_event_inserted) as data_event_inserted, sum(data_extracted) as data_extracted," + +" sum(data_bytes_extracted) as data_bytes_extracted, " + +" sum(data_extracted_errors) as data_extracted_errors, sum(data_sent) as data_sent, " + +" sum(data_bytes_sent) as data_bytes_sent, sum(data_sent_errors) as data_sent_errors, " + +" sum(data_loaded) as data_loaded, sum(data_bytes_loaded) as data_bytes_loaded, " + +" sum(data_loaded_errors) as data_loaded_errors " + +" from sym_node_host_channel_stats " + +" where start_time >= ? and end_time <= ? and node_id=? " + +" and channel_id not in ('heartbeat', 'config') " + +" group by node_id, start_time, end_time " + +" order by start_time asc "); putSql("insertHostStatsSql" ,"" + "insert into $(node_host_stats) " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/NodeStatsByPeriodMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/NodeStatsByPeriodMap.java new file mode 100644 index 0000000000..e84a26e19b --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/NodeStatsByPeriodMap.java @@ -0,0 +1,37 @@ +package org.jumpmind.symmetric.statistic; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class NodeStatsByPeriodMap extends AbstractStatsByPeriodMap,ChannelStats> { + + private static final long serialVersionUID = 1L; + + public NodeStatsByPeriodMap(Date start, Date end, List list, + int periodSizeInMinutes) { + super(start, end, list, periodSizeInMinutes); + } + + @Override + protected void add(Date periodStart, ChannelStats stat) { + Map map = get(periodStart); + if (map == null) { + map = new HashMap(); + put(periodStart, map); + } + ChannelStats existing = map.get(stat.getNodeId()); + if (existing == null) { + map.put(stat.getNodeId(), stat); + } else { + existing.add(stat); + } + } + + @Override + protected void addBlank(Date periodStart) { + put(periodStart, new HashMap()); + + } +}