diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java deleted file mode 100644 index ddbbdff991..0000000000 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java +++ /dev/null @@ -1,248 +0,0 @@ -package org.jumpmind.symmetric.model; - -import java.io.Serializable; -import java.util.Date; - -public class LoadSummary implements Serializable { - - private static final long serialVersionUID = 1L; - - private long loadId; - private String nodeId; - private boolean inError; - private int finishedBatchCount; - private int pendingBatchCount; - private long currentBatchId; - private long currentDataEventCount; - private String createBy; - private Date createTime; - private Date lastUpdateTime; - private String channelQueue; - private int tableCount; - private boolean isFullLoad; - private boolean isCreateFirst; - private boolean isDeleteFirst; - private boolean isRequestProcessed; - private boolean isConditional; - private boolean isCustomSql; - private long batchCount; - private String currentTableName; - private long dataCount; - private String processStatus; - private String processName; - private int targetNodeCount; - private int ignoreCount; - - public boolean isActive() { - return pendingBatchCount > 0; - } - - public void setInError(boolean inError) { - this.inError = inError; - } - - public boolean isInError() { - return inError; - } - - public long getLoadId() { - return loadId; - } - - public void setIgnoreCount(int count){ - this.ignoreCount = count; - } - - public int getIgnoreCount(){ - return ignoreCount; - } - - public void setLoadId(long loadId) { - this.loadId = loadId; - } - - public String getNodeId() { - return nodeId; - } - - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - - public int getFinishedBatchCount() { - return finishedBatchCount; - } - - public void setFinishedBatchCount(int finishedBatchCount) { - this.finishedBatchCount = finishedBatchCount; - } - - public int getPendingBatchCount() { - return pendingBatchCount; - } - - public void setPendingBatchCount(int pendingBatchCount) { - this.pendingBatchCount = pendingBatchCount; - } - - public long getCurrentBatchId() { - return currentBatchId; - } - - public void setCurrentBatchId(long currentBatchId) { - this.currentBatchId = currentBatchId; - } - - public long getCurrentDataEventCount() { - return currentDataEventCount; - } - - public void setCurrentDataEventCount(long currentDataEventCount) { - this.currentDataEventCount = currentDataEventCount; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getLastUpdateTime() { - return lastUpdateTime; - } - - public void setLastUpdateTime(Date lastUpdateTime) { - this.lastUpdateTime = lastUpdateTime; - } - - public void setCreateBy(String createBy) { - this.createBy = createBy; - } - - public String getCreateBy() { - return createBy; - } - - public String getLoadNodeId() { - return String.format("%010d-%s", loadId, nodeId); - } - - public String getChannelQueue() { - return channelQueue; - } - - public void setChannelQueue(String channelQueue) { - this.channelQueue = channelQueue; - } - - public int getTableCount() { - return tableCount; - } - - public void setTableCount(int tableCount) { - this.tableCount = tableCount; - } - - public boolean isFullLoad() { - return isFullLoad; - } - - public void setFullLoad(boolean isFullLoad) { - this.isFullLoad = isFullLoad; - } - - public boolean isCreateFirst() { - return isCreateFirst; - } - - public void setCreateFirst(boolean isCreateFirst) { - this.isCreateFirst = isCreateFirst; - } - - public boolean isDeleteFirst() { - return isDeleteFirst; - } - - public void setDeleteFirst(boolean isDeleteFirst) { - this.isDeleteFirst = isDeleteFirst; - } - - public boolean isRequestProcessed() { - return isRequestProcessed; - } - - public void setRequestProcessed(boolean isRequestProcessed) { - this.isRequestProcessed = isRequestProcessed; - } - - public boolean isConditional() { - return isConditional; - } - - public void setConditional(boolean isConditional) { - this.isConditional = isConditional; - } - - public boolean isCustomSql() { - return isCustomSql; - } - - public void setCustomSql(boolean isCustomSql) { - this.isCustomSql = isCustomSql; - } - - public long getBatchCount() { - return batchCount; - } - - public void setBatchCount(long batchCount) { - this.batchCount = batchCount; - } - - public String getCurrentTableName() { - return currentTableName; - } - - public void setCurrentTableName(String currentTableName) { - this.currentTableName = currentTableName; - } - - public long getDataCount() { - return dataCount; - } - - public void setDataCount(long dataCount) { - this.dataCount = dataCount; - } - - public String getProcessStatus() { - return processStatus; - } - - public void setProcessStatus(String processStatus) { - this.processStatus = processStatus; - } - - public String getProcessName() { - return processName; - } - - public void setProcessName(String processName) { - this.processName = processName; - } - - public int getTargetNodeCount() { - return targetNodeCount; - } - - public void setTargetNodeCount(int targetNodeCount) { - this.targetNodeCount = targetNodeCount; - } - - - - -} - diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java deleted file mode 100644 index 6c5a62fec7..0000000000 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingLoadSummary.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU General Public License, version 3.0 (GPLv3) - * (the "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU General Public License, - * version 3.0 (GPLv3) along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jumpmind.symmetric.model; - -import java.io.Serializable; -import java.util.Date; - -public class OutgoingLoadSummary implements Serializable { - - private static final long serialVersionUID = 1L; - - private long loadId; - private String nodeId; - private boolean inError; - private int finishedBatchCount; - private int pendingBatchCount; - private long currentBatchId; - private long currentDataEventCount; - private String createBy; - private Date createTime; - private Date lastUpdateTime; - private int reloadBatchCount; - private String channelId; - - public boolean isActive() { - return pendingBatchCount > 0; - } - - public void setInError(boolean inError) { - this.inError = inError; - } - - public boolean isInError() { - return inError; - } - - public long getLoadId() { - return loadId; - } - - public void setLoadId(long loadId) { - this.loadId = loadId; - } - - public String getNodeId() { - return nodeId; - } - - public void setNodeId(String nodeId) { - this.nodeId = nodeId; - } - - public int getFinishedBatchCount() { - return finishedBatchCount; - } - - public void setFinishedBatchCount(int finishedBatchCount) { - this.finishedBatchCount = finishedBatchCount; - } - - public int getPendingBatchCount() { - return pendingBatchCount; - } - - public void setPendingBatchCount(int pendingBatchCount) { - this.pendingBatchCount = pendingBatchCount; - } - - public int getTotalBatchCount() { - return getPendingBatchCount() + getFinishedBatchCount(); - } - - public long getCurrentBatchId() { - return currentBatchId; - } - - public void setCurrentBatchId(long currentBatchId) { - this.currentBatchId = currentBatchId; - } - - public long getCurrentDataEventCount() { - return currentDataEventCount; - } - - public void setCurrentDataEventCount(long currentDataEventCount) { - this.currentDataEventCount = currentDataEventCount; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getLastUpdateTime() { - return lastUpdateTime; - } - - public void setLastUpdateTime(Date lastUpdateTime) { - this.lastUpdateTime = lastUpdateTime; - } - - public int getReloadBatchCount() { - return reloadBatchCount; - } - - public void setReloadBatchCount(int reloadBatchCount) { - this.reloadBatchCount = reloadBatchCount; - } - - public void setCreateBy(String createBy) { - this.createBy = createBy; - } - - public String getCreateBy() { - return createBy; - } - - public String getLoadNodeId() { - return String.format("%010d-%s", loadId, nodeId); - } - - public String getChannelId() { - return channelId; - } - - public void setChannelId(String channelId) { - this.channelId = channelId; - } - - - -} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index e18d9cab3e..94615297d8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -21,21 +21,16 @@ package org.jumpmind.symmetric.service; -import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Map; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.symmetric.model.AbstractBatch.Status; -import org.jumpmind.symmetric.model.LoadSummary; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchSummary; import org.jumpmind.symmetric.model.OutgoingBatches; -import org.jumpmind.symmetric.model.OutgoingLoadSummary; -import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadCounts; -import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadStatusSummary; /** * This service provides an API to access to the outgoing batch table. @@ -115,29 +110,12 @@ public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long sta public List findOutgoingBatchSummaryByNode(String nodeId, Date sinceCreateTime, Status... statuses); - public List findOutgoingBatchSummaryByNodeAndChannel(String nodeId, String channelId, - Date sinceCreateTime, Status... statuses); - public int countOutgoingBatches(List nodeIds, List channels, List statuses, List loads); public List listOutgoingBatches(List nodeIds, List channels, List statuses, List loads, long startAtBatchId, int rowsExpected, boolean ascending); - public List getLoadSummaries(boolean activeOnly); - - public Map getActiveLoadCounts(); - - public List getQueuedLoads(String sourceNodeId); - - public LoadSummary getLoadSummary(long loadId); - - public Map getLoadOverview(long loadId); - - public Collection getLoadHistory(String sourceNodeId, final String symTablePrefix, int rowsReturned); - - public Map> getLoadStatusSummaries(int loadId); - public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId); public List getAllBatches(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 22accdced7..a44cfca5b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -20,19 +20,15 @@ */ package org.jumpmind.symmetric.service.impl; -import java.io.Serializable; import java.sql.Types; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import org.apache.commons.lang.StringUtils; import org.jumpmind.db.sql.ISqlRowMapper; @@ -46,7 +42,6 @@ import org.jumpmind.symmetric.ext.IOutgoingBatchFilter; import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.Channel; -import org.jumpmind.symmetric.model.LoadSummary; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeGroupChannelWindow; import org.jumpmind.symmetric.model.NodeGroupLinkAction; @@ -55,7 +50,6 @@ import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchSummary; import org.jumpmind.symmetric.model.OutgoingBatches; -import org.jumpmind.symmetric.model.OutgoingLoadSummary; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IExtensionService; @@ -636,20 +630,6 @@ public List findOutgoingBatchSummaryByNode(String nodeId, "whereStatusAndNodeGroupByStatusSql").replace(":STATUS_LIST", inList.substring(0, inList.length() - 1)); return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryMapper(false, false), args); } - - public List findOutgoingBatchSummaryByNodeAndChannel(String nodeId, - String channelId, Date sinceCreateTime, Status... statuses) { - - Object[] args = new Object[statuses.length + 2]; - args[args.length - 1] = nodeId; - args[args.length - 2] = channelId; - StringBuilder inList = buildStatusList(args, statuses); - - String sql = getSql("selectOutgoingBatchSummaryPrefixSql", - "selectOutgoingBatchSummaryStatsPrefixSql", - "whereStatusAndNodeAndChannelGroupByStatusSql").replace(":STATUS_LIST", inList.substring(0, inList.length() - 1)); - return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryMapper(false, false), args); - } public List findOutgoingBatchSummary(Status... statuses) { Object[] args = new Object[statuses.length]; @@ -675,350 +655,6 @@ public List findOutgoingBatchSummaryByChannel(Status... st return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryMapper(true, true), args); } - public Map getActiveLoadCounts() { - return sqlTemplateDirty.queryForMap(getSql("getActiveLoadCountsSql"), new ISqlRowMapper() { - - @Override - public LoadCounts mapRow(Row rs) { - LoadCounts lc = new LoadCounts(); - lc.setLoadedBatchCount(rs.getInt("loaded_batch_count")); - lc.setLoadedByteCount(rs.getLong("loaded_byte_count")); - lc.setLoadedRowCount(rs.getLong("loaded_row_count")); - - return lc; - } - - }, "load_id"); - } - - public List getQueuedLoads(String sourceNodeId) { - return sqlTemplateDirty.query(getSql("getLoadSummaryUnprocessedSql"), new LoadSummaryMapper(), sourceNodeId); - } - - public LoadSummary getLoadSummary(long loadId) { - return sqlTemplateDirty.queryForObject(getSql("getLoadSummarySql"), new LoadSummaryMapper(), loadId); - } - - private static class LoadSummaryMapper implements ISqlRowMapper { - public LoadSummary mapRow(Row rs) { - LoadSummary summary = new LoadSummary(); - // summary.setLoadId(rs.getLong("load_id")); - summary.setNodeId(rs.getString("node_id")); - summary.setCreateBy(rs.getString("last_update_by")); - summary.setTableCount(rs.getInt("table_count")); - String triggerId = rs.getString("trigger_id"); - if (triggerId == null || triggerId.equals(ParameterConstants.ALL)) { - summary.setFullLoad(true); - } else { - summary.setFullLoad(false); - } - summary.setCreateFirst(rs.getBoolean("create_table")); - summary.setDeleteFirst(rs.getBoolean("delete_first")); - summary.setRequestProcessed(rs.getBoolean("processed")); - summary.setIgnoreCount(rs.getInt("ignore_count")); - // summary.setConditional(rs.getBoolean("reload_select")); - // summary.setCustomSql(rs.getBoolean("before_custom_sql")); - return summary; - } - } - - public Map> getLoadStatusSummaries(int loadId) { - LoadStatusByQueueMapper mapper = new LoadStatusByQueueMapper(this.symmetricDialect.getTablePrefix()); - sqlTemplateDirty.query(getSql("getLoadStatusSummarySql"), mapper, loadId); - return mapper.getResults(); - } - - private static class LoadStatusByQueueMapper implements ISqlRowMapper { - Map> results = new TreeMap>(Collections.reverseOrder()); - String tablePrefix; - - public LoadStatusByQueueMapper(String tablePrefix) { - this.tablePrefix = tablePrefix; - } - - @Override - public Object mapRow(Row rs) { - String queue = rs.getString("queue"); - String status = rs.getString("status"); - - Map statusMap = results.get(queue); - if (statusMap == null) { - statusMap = new HashMap(); - } - - LoadStatusSummary statusSummary = new LoadStatusSummary(); - statusSummary.setCreateTime(rs.getDateTime("create_time")); - statusSummary.setLastUpdateTime(rs.getDateTime("last_update_time")); - statusSummary.setByteCount(rs.getLong("byte_count")); - statusSummary.setDataEventCount(rs.getLong("data_events")); - statusSummary.setCount(rs.getInt("count_ids")); - statusSummary.setExtractStartTime(rs.getDateTime("min_extract_start_time")); - statusSummary.setTransferStartTime(rs.getDateTime("min_transfer_start_time")); - statusSummary.setLoadStartTime(rs.getDateTime("min_load_start_time")); - if (statusSummary.getExtractStartTime() != null) { - statusSummary.setExtractEndTime(new Date(statusSummary.getExtractStartTime().getTime() + rs.getLong("full_extract_millis"))); - } - if (statusSummary.getTransferStartTime() != null) { - statusSummary - .setTransferEndTime(new Date(statusSummary.getTransferStartTime().getTime() + rs.getLong("full_transfer_millis"))); - } - if (statusSummary.getLoadStartTime() != null) { - statusSummary.setLoadEndTime(new Date(statusSummary.getLoadStartTime().getTime() + rs.getLong("full_load_millis"))); - } - String minSummary = rs.getString("min_summary"); - String maxSummary = rs.getString("max_summary"); - if (minSummary != null && minSummary.startsWith(this.tablePrefix)) { - minSummary = maxSummary; - } - statusSummary.setTables(minSummary); - - statusMap.put(status, statusSummary); - - results.put(queue, statusMap); - - return null; - } - - public Map> getResults() { - return results; - } - } - - public static class LoadStatusSummary { - private long dataEventCount; - private long byteCount; - private String status; - private int count; - private Date createTime; - private Date lastUpdateTime; - private Date extractStartTime; - private Date transferStartTime; - private Date loadStartTime; - private Date extractEndTime; - private Date transferEndTime; - private Date loadEndTime; - private String tables; - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public int getCount() { - return count; - } - - public void setCount(int count) { - this.count = count; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getLastUpdateTime() { - return lastUpdateTime; - } - - public void setLastUpdateTime(Date lastUpdateTime) { - this.lastUpdateTime = lastUpdateTime; - } - - public long getDataEventCount() { - return dataEventCount; - } - - public void setDataEventCount(long dataEventCount) { - this.dataEventCount = dataEventCount; - } - - public long getByteCount() { - return byteCount; - } - - public void setByteCount(long byteCount) { - this.byteCount = byteCount; - } - - public Date getExtractStartTime() { - return extractStartTime; - } - - public void setExtractStartTime(Date extractStartTime) { - this.extractStartTime = extractStartTime; - } - - public Date getTransferStartTime() { - return transferStartTime; - } - - public void setTransferStartTime(Date transferStartTime) { - this.transferStartTime = transferStartTime; - } - - public Date getLoadStartTime() { - return loadStartTime; - } - - public void setLoadStartTime(Date loadStartTime) { - this.loadStartTime = loadStartTime; - } - - public Date getExtractEndTime() { - return extractEndTime; - } - - public void setExtractEndTime(Date extractEndTime) { - this.extractEndTime = extractEndTime; - } - - public Date getTransferEndTime() { - return transferEndTime; - } - - public void setTransferEndTime(Date transferEndTime) { - this.transferEndTime = transferEndTime; - } - - public Date getLoadEndTime() { - return loadEndTime; - } - - public void setLoadEndTime(Date loadEndTime) { - this.loadEndTime = loadEndTime; - } - - public String getTables() { - return tables; - } - - public void setTables(String tables) { - this.tables = tables; - } - - } - - public Map getLoadOverview(long loadId) { - return sqlTemplateDirty.queryForMap(getSql("getLoadOverviewSql"), "status", "count", loadId); - } - - public List getLoadSummaries(boolean activeOnly) { - final Map loadSummaries = new TreeMap(); - sqlTemplate.query(getSql("getLoadSummariesSql"), new ISqlRowMapper() { - public OutgoingLoadSummary mapRow(Row rs) { - long loadId = rs.getLong("load_id"); - String nodeId = rs.getString("node_id"); - String loadNodeId = String.format("%010d-%s", loadId, nodeId); - OutgoingLoadSummary summary = loadSummaries.get(loadNodeId); - if (summary == null) { - summary = new OutgoingLoadSummary(); - summary.setLoadId(loadId); - summary.setNodeId(nodeId); - summary.setChannelId(rs.getString("channel_id")); - summary.setCreateBy(rs.getString("create_by")); - loadSummaries.put(loadNodeId, summary); - } - - Status status = Status.valueOf(rs.getString("status")); - int count = rs.getInt("cnt"); - - Date lastUpdateTime = rs.getDateTime("last_update_time"); - if (summary.getLastUpdateTime() == null || summary.getLastUpdateTime().before(lastUpdateTime)) { - summary.setLastUpdateTime(lastUpdateTime); - } - - Date createTime = rs.getDateTime("create_time"); - if (summary.getCreateTime() == null || summary.getCreateTime().after(createTime)) { - summary.setCreateTime(createTime); - } - - summary.setReloadBatchCount(summary.getReloadBatchCount() + count); - - if (status == Status.OK || status == Status.IG) { - summary.setFinishedBatchCount(summary.getFinishedBatchCount() + count); - } else { - summary.setPendingBatchCount(summary.getPendingBatchCount() + count); - - boolean inError = rs.getBoolean("error_flag"); - summary.setInError(inError || summary.isInError()); - - if (status != Status.NE && count == 1) { - summary.setCurrentBatchId(rs.getLong("current_batch_id")); - summary.setCurrentDataEventCount(rs.getLong("current_data_event_count")); - } - - } - return null; - } - }); - - List loads = new ArrayList(loadSummaries.values()); - Iterator it = loads.iterator(); - while (it.hasNext()) { - OutgoingLoadSummary loadSummary = it.next(); - if (activeOnly && !loadSummary.isActive()) { - it.remove(); - } - } - - return loads; - } - - public Collection getLoadHistory(String sourceNodeId, final String symTablePrefix, final int rowsReturned) { - final Map loads = new TreeMap(Collections.reverseOrder()); - - sqlTemplateDirty.query(getSql("getLoadHistorySql"), new ISqlRowMapper() { - - @Override - public LoadSummary mapRow(Row rs) { - Date createTime = rs.getDateTime("create_time"); - Date lastUpdateTime = rs.getDateTime("last_update_time"); - - LoadSummary loadSummary; - - if (!loads.containsKey(createTime) && loads.size() < rowsReturned) { - loadSummary = new LoadSummary(); - loadSummary.setCreateTime(createTime); - loadSummary.setLastUpdateTime(lastUpdateTime); - loads.put(createTime, loadSummary); - } - - loadSummary = loads.get(createTime); - if (loadSummary != null) { - if (lastUpdateTime.after(loadSummary.getLastUpdateTime())) { - loadSummary.setLastUpdateTime(lastUpdateTime); - } - - loadSummary.setTableCount(rs.getInt("table_count")); - loadSummary.setTargetNodeCount(loadSummary.getTargetNodeCount() + 1); - - if (ParameterConstants.ALL.equals(rs.getString("trigger_id"))) { - loadSummary.setCurrentTableName(ParameterConstants.ALL); - loadSummary.setFullLoad(true); - } else if (loadSummary.getTableCount() == 1) { - if (rs.getString("min_table").toUpperCase().startsWith(symTablePrefix.toUpperCase())) { - loadSummary.setCurrentTableName(rs.getString("max_table")); - } else { - loadSummary.setCurrentTableName(rs.getString("min_table")); - } - - } else { - loadSummary.setCurrentTableName(null); - } - } - return null; - } - }, sourceNodeId); - return loads.values(); - } - @Override public List getAllBatches() { return sqlTemplateDirty.query(getSql("getAllBatchesSql"), new LongMapper()); @@ -1150,32 +786,4 @@ public OutgoingBatch mapRow(Row rs) { } } - - public class LoadCounts implements Serializable { - private static final long serialVersionUID = 1L; - private long loadedRowCount; - private int loadedBatchCount; - private long loadedByteCount; - - public long getLoadedRowCount() { - return loadedRowCount; - } - public void setLoadedRowCount(long loadedRowCount) { - this.loadedRowCount = loadedRowCount; - } - public int getLoadedBatchCount() { - return loadedBatchCount; - } - public void setLoadedBatchCount(int loadedBatchCount) { - this.loadedBatchCount = loadedBatchCount; - } - public long getLoadedByteCount() { - return loadedByteCount; - } - public void setLoadedByteCount(long loadedByteCount) { - this.loadedByteCount = loadedByteCount; - } - - - } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 0188738d10..4672d02a69 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -194,79 +194,6 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("updateOutgoingBatchesStatusSql", "update $(outgoing_batch) set status=? where status = ? "); - putSql("getLoadSummariesSql", - "select b.load_id, b.node_id, b.status, b.create_by, max(error_flag) as error_flag, count(*) as cnt, min(b.create_time) as create_time, " - + " max(b.last_update_time) as last_update_time, min(b.batch_id) as current_batch_id, " - + " min(b.data_row_count) as current_data_event_count, b.channel_id " - + "from $(outgoing_batch) b " - + " join $(channel) c on c.channel_id = b.channel_id " - + "where c.reload_flag = 1 " - + " and b.load_id > 0 " - + "group by b.load_id, b.node_id, b.status, b.channel_id, b.create_by " - + "order by b.load_id desc "); - - putSql("getActiveLoadCountsSql", - "select r.load_id, count(ob.batch_id) as loaded_batch_count, sum(ob.data_row_count) as loaded_row_count, " - + "sum(ob.byte_count) as loaded_byte_count " - + "from $(table_reload_request) r " - + "join $(outgoing_batch) ob on ob.load_id = r.load_id " - + "where r.completed = 0 and ob.status = 'OK' and ob.reload_row_count > 0 " - + "group by r.load_id"); - - putSql("getLoadSummaryUnprocessedSql", - "select r.source_node_id, r.target_node_id, " - + " count(TRIGGER_ID) as table_count, max(TRIGGER_ID) as trigger_id, " - + " max(create_table) as create_table, max(delete_first) as delete_first, max(processed) as processed, " - + " max(reload_select) as reload_select, max(before_custom_sql) as before_custom_sql, " - + " max(last_update_by) as last_update_by, min(last_update_time) as last_update_time " - + "from $(table_reload_request) r " - + "where processed = 0 and source_node_id = ? " - + "group by r.source_node_id, r.target_node_id"); - - putSql("getLoadSummarySql", - "select " - + "b.node_id, b.load_id, count(*) as table_count, max(trigger_id) as trigger_id, max(create_table) as create_table, " - + "max(delete_first) as delete_first, max(processed) as processed, max(ignore_count) as ignore_count, max(last_update_by) as last_update_by, max(b.last_update_time) as last_update_time " - + "from $(outgoing_batch) b join $(data_event) e on b.batch_id = e.batch_id join $(data) d on " - + "d.data_id = e.data_id left join $(table_reload_request) r on b.load_id = r.load_id where b.load_id = ? and d.event_type = 'R' " - + "group by b.load_id, b.node_id order by max(b.last_update_time) desc"); - - putSql("getLoadOverviewSql", - "select status, count(batch_id) as count " - + " from $(outgoing_batch) " - + " where load_id = ?" - + " group by status"); - - putSql("getLoadHistorySql", - "select r.load_id, max(trigger_id) as trigger_id, count(trigger_id) as table_count, max(target_node_id) as target_node_id, " - + "create_time, o.last_update_time, min_table, max_table " - + "from $(table_reload_request) r " - + "join ( " - + " select load_id, max(last_update_time) as last_update_time, min(summary) as min_table, max(summary) as max_table " - + " from $(outgoing_batch) " - + " group by load_id " - + ") o on o.load_id = r.load_id " - + "where source_node_id = ? " - + "group by r.load_id, create_time " - + "order by create_time desc " - ); - - - putSql("getLoadStatusSummarySql", - "select ob.load_id, count(ob.batch_id) as count_ids, ob.status, c.queue, max(ob.last_update_time) as last_update_time, " - + " min(ob.create_time) as create_time, sum(ob.data_row_count) as data_events, sum(ob.byte_count) as byte_count, " - + " min(extract_start_time) as min_extract_start_time, min(transfer_start_time) as min_transfer_start_time, " - + " min(load_start_time) as min_load_start_time, " - + " min(summary) as min_summary, max(summary) as max_summary, " - + " sum(extract_millis + transform_extract_millis) as full_extract_millis, " - + " sum(network_millis) as full_transfer_millis, " - + " sum(load_millis + transform_load_millis + filter_millis) as full_load_millis " - + " from $(outgoing_batch) ob " - + " join $(channel) c on c.channel_id = ob.channel_id " - + " where ob.load_id = ? " - + " group by ob.load_id, c.queue, ob.status" - + " order by ob.load_id asc"); - putSql("deleteOutgoingBatchesForNodeSql", "delete from $(outgoing_batch) where node_id=? and channel_id=? and batch_id < " + "(select max(batch_id) from $(outgoing_batch) where node_id=? and channel_id=?) "); @@ -280,8 +207,6 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("getAllBatchesSql", "select batch_id from $(outgoing_batch)"); - - }