Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Nov 28, 2016
1 parent 6e3e501 commit 32419e1
Show file tree
Hide file tree
Showing 13 changed files with 129 additions and 5 deletions.
Binary file added symmetric-assemble/.DS_Store
Binary file not shown.
Binary file not shown.
@@ -1,5 +1,5 @@
/**
* Licensed to JumpMind Inc under one or more contributor
] * Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
Expand Down
Expand Up @@ -39,6 +39,7 @@ public class IncomingBatchSummary implements Serializable {
private String channel;
private String sqlMessage;
private long errorBatchId;
private float averageRowsPerMilli;

public String getNodeId() {
return nodeId;
Expand Down Expand Up @@ -112,6 +113,12 @@ public void setErrorBatchId(long errorBatchId) {
this.errorBatchId = errorBatchId;
}


public float getAverageRowsPerMilli() {
return averageRowsPerMilli;
}

public void setAverageRowsPerMilli(float averageRowsPerMilli) {
this.averageRowsPerMilli = averageRowsPerMilli;
}

}
Expand Up @@ -41,6 +41,7 @@ public class OutgoingBatchSummary implements Serializable {
private long errorBatchId;
private long totalBytes;
private long totalMillis;
private float averageRowsPerMilli;

public String getNodeId() {
return nodeId;
Expand Down Expand Up @@ -129,4 +130,14 @@ public long getTotalMillis() {
public void setTotalMillis(long totalMillis) {
this.totalMillis = totalMillis;
}

public float getAverageRowsPerMilli() {
return averageRowsPerMilli;
}

public void setAverageRowsPerMilli(float averageRowsPerMilli) {
this.averageRowsPerMilli = averageRowsPerMilli;
}


}
Expand Up @@ -97,6 +97,8 @@ public interface IOutgoingBatchService {
public List<OutgoingBatchSummary> findOutgoingBatchSummary(OutgoingBatch.Status ... statuses);

public List<OutgoingBatchSummary> findOutgoingBatchSummaryByChannel(OutgoingBatch.Status ... statuses);

public Map<String, Float> getNodeThroughputByChannel();

public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
List<OutgoingBatch.Status> statuses, List<String> loads);
Expand Down
Expand Up @@ -42,6 +42,8 @@ public interface IStatisticService {
public void save(JobStats stats);

public TreeMap<Date, Map<String, ChannelStats>> getChannelStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes);

public TreeMap<Date, Map<String, ChannelStats>> getNodeStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes);

public TreeMap<Date, HostStats> getHostStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes);

Expand Down
Expand Up @@ -829,7 +829,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe

synchronized (lock) {
if (!isPreviouslyExtracted(currentBatch)) {
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);

if (currentBatch.getExtractStartTime() == null) {
currentBatch.setExtractStartTime(new Date());
}
Expand All @@ -852,6 +852,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = getExtractStats(writer);
transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS);
currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.STATEMENTCOUNT));
extractTimeInMs = extractTimeInMs - transformTimeInMs;
byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT);
}
Expand Down
Expand Up @@ -609,6 +609,30 @@ public List<OutgoingBatchSummary> findOutgoingBatchSummaryByChannel(Status... st
return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryChannelMapper(), args);
}

public Map<String, Float> getNodeThroughputByChannel() {
String sql = getSql("getNodeThroughputByChannelSql");
NodeThroughputMapper mapper = new NodeThroughputMapper();

List<Object> temp = sqlTemplateDirty.query(sql, mapper);
return mapper.getThroughputMap();
}

private class NodeThroughputMapper implements ISqlRowMapper<Object> {
Map<String, Float> throughputMap = new HashMap<String, Float>();

@Override
public Object mapRow(Row row) {
throughputMap.put(row.getString("node_id") + "-" + row.getString("channel_id") + "-" + row.get("direction"), row.getFloat("rows_per_milli"));
return null;
}

public Map<String, Float> getThroughputMap() {
return throughputMap;
}


}

public Set<Long> getActiveLoads(String sourceNodeId) {
Set<Long> loads = new HashSet<Long>();

Expand Down
Expand Up @@ -218,6 +218,19 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
+ " last_update_hostname, current_timestamp, create_time, 'copy' from $(outgoing_batch) where node_id=? and channel_id=? and batch_id > ?) ");


putSql("getNodeThroughputByChannelSql", "select node_id, channel_id, direction, rows_per_milli from ( "
+ " select node_id, channel_id, 'outgoing' as direction, "
+ " sum(byte_count) / avg(datediff(ms, create_time, last_update_time)) as rows_per_milli "
+ " from sym_outgoing_batch where status = 'OK' and sent_count = 1 "
+ " group by node_id, channel_id order by node_id "
+ " ) "
+ " union all "
+ " select * from ( "
+ " select node_id, channel_id, 'incoming' as direction, "
+ " sum(statement_count) / avg(datediff(ms, create_time, last_update_time)) as rows_per_milli "
+ " from sym_incoming_batch where status = 'OK' "
+ " group by node_id, channel_id order by node_id "
+ " )");
}

}
Expand Up @@ -37,6 +37,7 @@
import org.jumpmind.symmetric.statistic.HostStats;
import org.jumpmind.symmetric.statistic.HostStatsByPeriodMap;
import org.jumpmind.symmetric.statistic.JobStats;
import org.jumpmind.symmetric.statistic.NodeStatsByPeriodMap;

/**
* @see IStatisticService
Expand Down Expand Up @@ -87,6 +88,13 @@ public TreeMap<Date, Map<String, ChannelStats>> getChannelStatsForPeriod(Date st
return new ChannelStatsByPeriodMap(start, end, list, periodSizeInMinutes);
}

public TreeMap<Date, Map<String, ChannelStats>> getNodeStatsForPeriod(Date start, Date end,
String nodeId, int periodSizeInMinutes) {
List<ChannelStats> list = sqlTemplateDirty.query(getSql("selectNodeStatsSql"),
new ChannelStatsMapper(), start, end, nodeId);
return new NodeStatsByPeriodMap(start, end, list, periodSizeInMinutes);
}

public void save(HostStats stats) {
sqlTemplate.update(
getSql("insertHostStatsSql"),
Expand Down Expand Up @@ -139,8 +147,12 @@ class ChannelStatsMapper implements ISqlRowMapper<ChannelStats> {
public ChannelStats mapRow(Row rs) {
ChannelStats stats = new ChannelStats();
stats.setNodeId(rs.getString("node_id"));
stats.setHostName(rs.getString("host_name"));
stats.setChannelId(rs.getString("channel_id"));
try {
stats.setHostName(rs.getString("host_name"));
stats.setChannelId(rs.getString("channel_id"));
}
catch (Exception e) {
}
stats.setStartTime(truncateToMinutes(rs.getDateTime("start_time")));
stats.setEndTime(truncateToMinutes(rs.getDateTime("end_time")));
stats.setDataRouted(rs.getLong("data_routed"));
Expand Down
Expand Up @@ -46,6 +46,21 @@ public StatisticServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" data_loaded, data_bytes_loaded, data_loaded_errors " +
" from $(node_host_channel_stats) " +
" where start_time >= ? and end_time <= ? and node_id=? order by start_time asc " );

putSql("selectNodeStatsSql", "" +
"select node_id, start_time, end_time, " +
" sum(data_routed) as data_routed, sum(data_unrouted) as data_unrouted, " +
" sum(data_event_inserted) as data_event_inserted, sum(data_extracted) as data_extracted," +
" sum(data_bytes_extracted) as data_bytes_extracted, " +
" sum(data_extracted_errors) as data_extracted_errors, sum(data_sent) as data_sent, " +
" sum(data_bytes_sent) as data_bytes_sent, sum(data_sent_errors) as data_sent_errors, " +
" sum(data_loaded) as data_loaded, sum(data_bytes_loaded) as data_bytes_loaded, " +
" sum(data_loaded_errors) as data_loaded_errors " +
" from sym_node_host_channel_stats " +
" where start_time >= ? and end_time <= ? and node_id=? " +
" and channel_id not in ('heartbeat', 'config') " +
" group by node_id, start_time, end_time " +
" order by start_time asc ");

putSql("insertHostStatsSql" ,"" +
"insert into $(node_host_stats) " +
Expand Down
@@ -0,0 +1,37 @@
package org.jumpmind.symmetric.statistic;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class NodeStatsByPeriodMap extends AbstractStatsByPeriodMap<Map<String,ChannelStats>,ChannelStats> {

private static final long serialVersionUID = 1L;

public NodeStatsByPeriodMap(Date start, Date end, List<ChannelStats> list,
int periodSizeInMinutes) {
super(start, end, list, periodSizeInMinutes);
}

@Override
protected void add(Date periodStart, ChannelStats stat) {
Map<String, ChannelStats> map = get(periodStart);
if (map == null) {
map = new HashMap<String, ChannelStats>();
put(periodStart, map);
}
ChannelStats existing = map.get(stat.getNodeId());
if (existing == null) {
map.put(stat.getNodeId(), stat);
} else {
existing.add(stat);
}
}

@Override
protected void addBlank(Date periodStart) {
put(periodStart, new HashMap<String, ChannelStats>());

}
}

0 comments on commit 32419e1

Please sign in to comment.