Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Nov 16, 2016
1 parent 48a6cfc commit 9a5ed15
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 6 deletions.
Expand Up @@ -35,7 +35,10 @@ public class IncomingBatchSummary implements Serializable {
private int dataCount;
private IncomingBatch.Status status;
private Date oldestBatchCreateTime;
private Date lastBatchUpdateTime;
private String channel;
private String sqlMessage;
private long errorBatchId;

public String getNodeId() {
return nodeId;
Expand Down Expand Up @@ -84,6 +87,30 @@ public String getChannel() {
public void setChannel(String channel) {
this.channel = channel;
}

public Date getLastBatchUpdateTime() {
return lastBatchUpdateTime;
}

public void setLastBatchUpdateTime(Date lastBatchUpdateTime) {
this.lastBatchUpdateTime = lastBatchUpdateTime;
}

public String getSqlMessage() {
return sqlMessage;
}

public void setSqlMessage(String sqlMessage) {
this.sqlMessage = sqlMessage;
}

public long getErrorBatchId() {
return errorBatchId;
}

public void setErrorBatchId(long errorBatchId) {
this.errorBatchId = errorBatchId;
}



Expand Down
Expand Up @@ -35,7 +35,12 @@ public class OutgoingBatchSummary implements Serializable {
private int dataCount;
private OutgoingBatch.Status status;
private Date oldestBatchCreateTime;
private Date lastBatchUpdateTime;
private String channel;
private String sqlMessage;
private long errorBatchId;
private long totalBytes;
private long totalMillis;

public String getNodeId() {
return nodeId;
Expand Down Expand Up @@ -85,5 +90,43 @@ public void setChannel(String channel) {
this.channel = channel;
}


public Date getLastBatchUpdateTime() {
return lastBatchUpdateTime;
}

public void setLastBatchUpdateTime(Date lastBatchUpdateTime) {
this.lastBatchUpdateTime = lastBatchUpdateTime;
}

public String getSqlMessage() {
return sqlMessage;
}

public void setSqlMessage(String sqlMessage) {
this.sqlMessage = sqlMessage;
}

public long getErrorBatchId() {
return errorBatchId;
}

public void setErrorBatchId(long errorBatchId) {
this.errorBatchId = errorBatchId;
}

public long getTotalBytes() {
return totalBytes;
}

public void setTotalBytes(long totalBytes) {
this.totalBytes = totalBytes;
}

public long getTotalMillis() {
return totalMillis;
}

public void setTotalMillis(long totalMillis) {
this.totalMillis = totalMillis;
}
}
Expand Up @@ -78,6 +78,8 @@ public List<IncomingBatch> listIncomingBatches(List<String> nodeIds, List<String

public List<IncomingBatchSummary> findIncomingBatchSummaryByChannel(Status... statuses);

public List<IncomingBatchSummary> findIncomingBatchSummary(Status... statuses);

public Map<String, Date> findLastUpdatedByChannel();

}
Expand Up @@ -376,6 +376,20 @@ public List<IncomingBatchSummary> findIncomingBatchSummaryByChannel(Status... st
String sql = getSql("selectIncomingBatchSummaryByStatusAndChannelSql").replace(":STATUS_LIST",
inList.substring(0, inList.length() - 1));

return sqlTemplate.query(sql, new IncomingBatchSummaryChannelMapper(), args);
}

public List<IncomingBatchSummary> findIncomingBatchSummary(Status... statuses) {
Object[] args = new Object[statuses.length];
StringBuilder inList = new StringBuilder();
for (int i = 0; i < statuses.length; i++) {
args[i] = statuses[i].name();
inList.append("?,");
}

String sql = getSql("selectIncomingBatchSummaryByStatusSql").replace(":STATUS_LIST",
inList.substring(0, inList.length() - 1));

return sqlTemplate.query(sql, new IncomingBatchSummaryMapper(), args);
}

Expand All @@ -394,12 +408,24 @@ public IncomingBatchSummary mapRow(Row rs) {
summary.setStatus(Status.valueOf(rs.getString("status")));
summary.setNodeId(rs.getString("node_id"));
summary.setOldestBatchCreateTime(rs.getDateTime("oldest_batch_time"));
summary.setChannel(rs.getString("channel_id"));
summary.setLastBatchUpdateTime(rs.getDateTime("last_update_time"));
summary.setDataCount(rs.getInt("data"));
return summary;
}
}

class IncomingBatchSummaryChannelMapper extends IncomingBatchSummaryMapper {
public IncomingBatchSummary mapRow(Row rs) {
IncomingBatchSummary summary = super.mapRow(rs);
summary.setChannel(rs.getString("channel_id"));
summary.setSqlMessage(rs.getString("sql_message"));
if (summary.getSqlMessage() != null) {
summary.setErrorBatchId(rs.getLong("batch_id"));
}
return summary;
}
}

class BatchIdMapper implements ISqlRowMapper<BatchId> {
Map<String, BatchId> ids;

Expand Down
Expand Up @@ -77,9 +77,15 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String
putSql("maxBatchIdsSql", "select max(batch_id) as batch_id, node_id, channel_id from $(incoming_batch) where status = ? group by node_id, channel_id");

putSql("selectIncomingBatchSummaryByStatusAndChannelSql",
"select count(*) as batches, status, sum(statement_count) as data, node_id, min(create_time) as oldest_batch_time, channel_id "
"select count(*) as batches, status, sum(statement_count) as data, node_id, min(create_time) as oldest_batch_time, channel_id, "
+ " max(last_update_time) as last_update_time, max(sql_message) as sql_message, min(batch_id) as batch_id "
+ " from $(incoming_batch) where status in (:STATUS_LIST) group by status, node_id, channel_id order by oldest_batch_time asc ");

putSql("selectIncomingBatchSummaryByStatusSql",
"select count(*) as batches, status, sum(statement_count) as data, node_id, min(create_time) as oldest_batch_time, "
+ " max(last_update_time) as last_update_time"
+ " from $(incoming_batch) where status in (:STATUS_LIST) group by status, node_id order by oldest_batch_time asc ");

putSql("lastUpdateByChannelSql", "select max(last_update_time) as last_update_time, channel_id from $(incoming_batch) group by channel_id");

}
Expand Down
Expand Up @@ -938,6 +938,9 @@ public OutgoingBatchSummary mapRow(Row rs) {
summary.setStatus(Status.valueOf(rs.getString("status")));
summary.setNodeId(rs.getString("node_id"));
summary.setOldestBatchCreateTime(rs.getDateTime("oldest_batch_time"));
summary.setLastBatchUpdateTime(rs.getDateTime("last_update_time"));
summary.setTotalBytes(rs.getLong("total_bytes"));
summary.setTotalMillis(rs.getLong("total_millis"));
return summary;
}
}
Expand All @@ -946,6 +949,10 @@ class OutgoingBatchSummaryChannelMapper extends OutgoingBatchSummaryMapper imple
public OutgoingBatchSummary mapRow(Row rs) {
OutgoingBatchSummary summary = super.mapRow(rs);
summary.setChannel(rs.getString("channel_id"));
summary.setSqlMessage(rs.getString("sql_message"));
if (summary.getSqlMessage() != null) {
summary.setErrorBatchId(rs.getLong("batch_id"));
}
return summary;
}
}
Expand Down
Expand Up @@ -111,11 +111,15 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
"select count(*) from $(outgoing_batch) where status != 'OK' and channel_id=?");

putSql("selectOutgoingBatchSummaryByStatusSql",
"select count(*) as batches, sum(data_event_count) as data, status, node_id, min(create_time) as oldest_batch_time "
+ " from $(outgoing_batch) where status in (:STATUS_LIST) group by status, node_id order by oldest_batch_time asc ");
"select count(*) as batches, sum(data_event_count) as data, status, node_id, min(create_time) as oldest_batch_time, "
+ " max(last_update_time) as last_update_time, sum(byte_count) as total_bytes, "
+ " sum(router_millis + extract_millis + network_millis + filter_millis + load_millis) as total_millis "
+ " from $(outgoing_batch) where status in (:STATUS_LIST) group by status, node_id order by oldest_batch_time asc ");

putSql("selectOutgoingBatchSummaryByStatusAndChannelSql",
"select count(*) as batches, sum(data_event_count) as data, status, node_id, min(create_time) as oldest_batch_time, channel_id "
"select count(*) as batches, sum(data_event_count) as data, status, node_id, min(create_time) as oldest_batch_time, channel_id, "
+ " max(last_update_time) as last_update_time, max(sql_message) as sql_message, min(batch_id) as batch_id, "
+ " sum(byte_count) as total_bytes, sum(router_millis + extract_millis + network_millis + filter_millis + load_millis) as total_millis "
+ " from $(outgoing_batch) where status in (:STATUS_LIST) group by status, node_id, channel_id order by node_id, oldest_batch_time asc ");

putSql("updateOutgoingBatchesStatusSql",
Expand Down

0 comments on commit 9a5ed15

Please sign in to comment.