diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java
new file mode 100644
index 0000000000..e6b5ec6f4a
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU Lesser General Public License (the
+ * "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.model;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class OutgoingLoadSummary implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private long loadId;
+ private String nodeId;
+ private boolean inError;
+ private int finishedBatchCount;
+ private int pendingBatchCount;
+ private long currentBatchId;
+ private String currentTable;
+ private long currentDataEventCount;
+ private String createBy;
+ private Date createTime;
+ private Date lastUpdateTime;
+ private int reloadBatchCount;
+
+ public boolean isActive() {
+ return pendingBatchCount > 0;
+ }
+
+ public void setInError(boolean inError) {
+ this.inError = inError;
+ }
+
+ public boolean isInError() {
+ return inError;
+ }
+
+ public long getLoadId() {
+ return loadId;
+ }
+
+ public void setLoadId(long loadId) {
+ this.loadId = loadId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public int getFinishedBatchCount() {
+ return finishedBatchCount;
+ }
+
+ public void setFinishedBatchCount(int finishedBatchCount) {
+ this.finishedBatchCount = finishedBatchCount;
+ }
+
+ public int getPendingBatchCount() {
+ return pendingBatchCount;
+ }
+
+ public void setPendingBatchCount(int pendingBatchCount) {
+ this.pendingBatchCount = pendingBatchCount;
+ }
+
+ public long getCurrentBatchId() {
+ return currentBatchId;
+ }
+
+ public void setCurrentBatchId(long currentBatchId) {
+ this.currentBatchId = currentBatchId;
+ }
+
+ public String getCurrentTable() {
+ return currentTable;
+ }
+
+ public void setCurrentTable(String currentTable) {
+ this.currentTable = currentTable;
+ }
+
+ public long getCurrentDataEventCount() {
+ return currentDataEventCount;
+ }
+
+ public void setCurrentDataEventCount(long currentDataEventCount) {
+ this.currentDataEventCount = currentDataEventCount;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public Date getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public void setLastUpdateTime(Date lastUpdateTime) {
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+ public int getReloadBatchCount() {
+ return reloadBatchCount;
+ }
+
+ public void setReloadBatchCount(int reloadBatchCount) {
+ this.reloadBatchCount = reloadBatchCount;
+ }
+
+ public void setCreateBy(String createBy) {
+ this.createBy = createBy;
+ }
+
+ public String getCreateBy() {
+ return createBy;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java
index e9e6e8f6d1..f33656b242 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java
@@ -24,6 +24,7 @@
import java.util.List;
import org.jumpmind.db.sql.ISqlTransaction;
+import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
@@ -73,6 +74,8 @@ public int countOutgoingBatches(List nodeIds, List channels,
List statuses);
public List listOutgoingBatches(List nodeIds, List channels,
- List statuses, long startAtBatchId, int rowsExpected, boolean ascending);
+ List statuses, long startAtBatchId, int rowsExpected, boolean ascending);
+
+ public List getLoadSummaries(boolean activeOnly);
}
\ No newline at end of file
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java
index a55889725a..579d70abb5 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java
@@ -27,8 +27,10 @@
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
@@ -38,7 +40,9 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
+import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Channel;
+import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeHost;
@@ -66,12 +70,13 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
private IConfigurationService configurationService;
private ISequenceService sequenceService;
-
+
private IClusterService clusterService;
public OutgoingBatchService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, INodeService nodeService,
- IConfigurationService configurationService, ISequenceService sequenceService, IClusterService clusterService) {
+ IConfigurationService configurationService, ISequenceService sequenceService,
+ IClusterService clusterService) {
super(parameterService, symmetricDialect);
this.nodeService = nodeService;
this.configurationService = configurationService;
@@ -86,10 +91,10 @@ public void markAllAsSentForNode(String nodeId) {
do {
batches = getOutgoingBatches(nodeId, true);
List list = batches.getBatches();
- /* Sort in reverse order so we don't get fk errors for
- * batches that are currently processing. We don't
- * make the update transactional to prevent contention
- * in highly loaded systems
+ /*
+ * Sort in reverse order so we don't get fk errors for batches that
+ * are currently processing. We don't make the update transactional
+ * to prevent contention in highly loaded systems
*/
Collections.sort(list, new Comparator() {
public int compare(OutgoingBatch o1, OutgoingBatch o2) {
@@ -120,7 +125,7 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
outgoingBatch.setLastUpdatedHostName(clusterService.getServerId());
sqlTemplate.update(
getSql("updateOutgoingBatchSql"),
- new Object[] { outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(),
+ new Object[] { outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(),
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0,
outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(),
outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(),
@@ -134,13 +139,13 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()),
outgoingBatch.getFailedDataId(), outgoingBatch.getLastUpdatedHostName(),
outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(),
- outgoingBatch.getNodeId() },
- new int[] { Types.CHAR, Types.BIGINT, Types.NUMERIC, Types.NUMERIC, Types.BIGINT, Types.BIGINT,
+ outgoingBatch.getNodeId() }, new int[] { Types.CHAR, Types.BIGINT,
+ Types.NUMERIC, Types.NUMERIC, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT,
- Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR,
- Types.NUMERIC, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP,
- Types.NUMERIC, Types.VARCHAR });
+ Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.NUMERIC,
+ Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, Types.NUMERIC,
+ Types.VARCHAR });
}
public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
@@ -159,13 +164,14 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
long batchId = outgoingBatch.getBatchId();
if (batchId <= 0) {
- batchId = sequenceService.nextVal(transaction,Constants.SEQUENCE_OUTGOING_BATCH);
+ batchId = sequenceService.nextVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH);
}
transaction.prepareAndExecute(getSql("insertOutgoingBatchSql"), batchId, outgoingBatch
.getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus().name(),
- outgoingBatch.getLoadId(), outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0,
- outgoingBatch.getReloadEventCount(), outgoingBatch.getOtherEventCount(),
- outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getCreateBy());
+ outgoingBatch.getLoadId(), outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch
+ .isCommonFlag() ? 1 : 0, outgoingBatch.getReloadEventCount(), outgoingBatch
+ .getOtherEventCount(), outgoingBatch.getLastUpdatedHostName(),
+ outgoingBatch.getCreateBy());
outgoingBatch.setBatchId(batchId);
}
@@ -196,15 +202,15 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
public int countOutgoingBatchesInError() {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsSql"));
}
-
+
public int countOutgoingBatchesInError(String channelId) {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsOnChannelSql"), channelId);
}
-
- public int countOutgoingBatchesUnsent() {
+
+ public int countOutgoingBatchesUnsent() {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentSql"));
}
-
+
public int countOutgoingBatchesUnsent(String channelId) {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId);
}
@@ -216,9 +222,10 @@ public int countOutgoingBatches(List nodeIds, List channels,
params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(statuses));
- return sqlTemplate.queryForInt(
- getSql("selectCountBatchesPrefixSql", buildBatchWhere(nodeIds, channels, statuses)),
- params);
+ return sqlTemplate
+ .queryForInt(
+ getSql("selectCountBatchesPrefixSql",
+ buildBatchWhere(nodeIds, channels, statuses)), params);
}
public List listOutgoingBatches(List nodeIds, List channels,
@@ -263,7 +270,7 @@ protected boolean containsOnlyStatus(OutgoingBatch.Status status,
List statuses) {
return statuses.size() == 1 && statuses.get(0) == status;
}
-
+
/**
* Select batches to process. Batches that are NOT in error will be returned
* first. They will be ordered by batch id as the batches will have already
@@ -276,7 +283,8 @@ public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabled
ParameterConstants.OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000);
List list = (List) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql"),
- maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, true),
+ maxNumberOfBatchesToSelect,
+ new OutgoingBatchMapper(includeDisabledChannels, true),
new Object[] { nodeId, OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(),
@@ -292,7 +300,8 @@ maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, tru
for (NodeChannel channel : channels) {
if (parameterService.is(ParameterConstants.DATA_EXTRACTOR_ENABLED)
|| channel.getChannelId().equals(Constants.CHANNEL_CONFIG)) {
- keepers.addAll(getBatchesForChannelWindows(batches,
+ keepers.addAll(getBatchesForChannelWindows(
+ batches,
nodeId,
channel,
configurationService.getNodeGroupChannelWindows(
@@ -308,16 +317,17 @@ maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, tru
return batches;
}
-
- public List getBatchesForChannelWindows(OutgoingBatches batches, String targetNodeId, NodeChannel channel,
- List windows) {
+
+ public List getBatchesForChannelWindows(OutgoingBatches batches,
+ String targetNodeId, NodeChannel channel, List windows) {
List keeping = new ArrayList();
List current = batches.getBatches();
if (current != null && current.size() > 0) {
if (inTimeWindow(windows, targetNodeId)) {
int maxBatchesToSend = channel.getMaxBatchToSend();
for (OutgoingBatch outgoingBatch : current) {
- if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && maxBatchesToSend > 0) {
+ if (channel.getChannelId().equals(outgoingBatch.getChannelId())
+ && maxBatchesToSend > 0) {
keeping.add(outgoingBatch);
maxBatchesToSend--;
}
@@ -421,6 +431,70 @@ public List findOutgoingBatchSummary(Status... statuses) {
return sqlTemplate.query(sql, new OutgoingBatchSummaryMapper(), args);
}
+ public List getLoadSummaries(boolean activeOnly) {
+ final Map loadSummaries = new TreeMap();
+ sqlTemplate.query(getSql("getLoadSummariesSql"), new ISqlRowMapper() {
+ public OutgoingLoadSummary mapRow(Row rs) {
+ long loadId = rs.getLong("load_id");
+ OutgoingLoadSummary summary = loadSummaries.get(loadId);
+ if (summary == null) {
+ summary = new OutgoingLoadSummary();
+ summary.setLoadId(loadId);
+ summary.setNodeId(rs.getString("node_id"));
+ summary.setCreateBy(rs.getString("create_by"));
+ loadSummaries.put(loadId, summary);
+ }
+
+ Status status = Status.valueOf(rs.getString("status"));
+ DataEventType eventType = DataEventType.getEventType(rs.getString("event_type"));
+ int count = rs.getInt("count");
+
+ Date lastUpdateTime = rs.getDateTime("last_update_time");
+ if (summary.getLastUpdateTime() == null
+ || summary.getLastUpdateTime().before(lastUpdateTime)) {
+ summary.setLastUpdateTime(lastUpdateTime);
+ }
+
+ Date createTime = rs.getDateTime("create_time");
+ if (summary.getCreateTime() == null || summary.getCreateTime().after(createTime)) {
+ summary.setCreateTime(createTime);
+ }
+
+ if (eventType == DataEventType.RELOAD) {
+ summary.setReloadBatchCount(summary.getReloadBatchCount() + count);
+ }
+
+ if (status == Status.OK || status == Status.IG) {
+ summary.setFinishedBatchCount(summary.getFinishedBatchCount() + count);
+ } else {
+ summary.setPendingBatchCount(summary.getPendingBatchCount() + count);
+
+ boolean inError = rs.getBoolean("error_flag");
+ summary.setInError(inError || summary.isInError());
+
+ if (status != Status.NE && count == 1) {
+ summary.setCurrentBatchId(rs.getLong("current_batch_id"));
+ summary.setCurrentTable(rs.getString("current_table_name"));
+ summary.setCurrentDataEventCount(rs.getLong("current_data_event_count"));
+ }
+
+ }
+ return null;
+ }
+ });
+
+ List loads = new ArrayList(loadSummaries.values());
+ Iterator it = loads.iterator();
+ while (it.hasNext()) {
+ OutgoingLoadSummary loadSummary = it.next();
+ if (activeOnly && !loadSummary.isActive()) {
+ it.remove();
+ }
+ }
+
+ return loads;
+ }
+
class OutgoingBatchSummaryMapper implements ISqlRowMapper {
public OutgoingBatchSummary mapRow(Row rs) {
OutgoingBatchSummary summary = new OutgoingBatchSummary();
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java
index 6a1a1b7697..cc123f8bf1 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java
@@ -72,6 +72,17 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map