From 99ef8d48dc0eb5dca298d179ab4b2481c5e7aca2 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Tue, 16 Aug 2016 08:29:24 -0400 Subject: [PATCH] Updates to monitor process information about loads that are running. --- .../jumpmind/symmetric/model/LoadSummary.java | 230 ++++++++++++++++++ .../jumpmind/symmetric/model/ProcessInfo.java | 16 +- .../symmetric/service/IDataService.java | 5 +- .../service/IOutgoingBatchService.java | 8 + .../service/impl/DataExtractorService.java | 37 ++- .../symmetric/service/impl/DataService.java | 27 +- .../service/impl/OutgoingBatchService.java | 97 +++++++- .../impl/OutgoingBatchServiceSqlMap.java | 37 ++- .../symmetric/service/impl/RouterService.java | 16 +- 9 files changed, 435 insertions(+), 38 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java new file mode 100644 index 0000000000..ec869fcaf5 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadSummary.java @@ -0,0 +1,230 @@ +package org.jumpmind.symmetric.model; + +import java.io.Serializable; +import java.util.Date; + +public class LoadSummary implements Serializable { + + private static final long serialVersionUID = 1L; + + private long loadId; + private String nodeId; + private boolean inError; + private int finishedBatchCount; + private int pendingBatchCount; + private long currentBatchId; + private long currentDataEventCount; + private String createBy; + private Date createTime; + private Date lastUpdateTime; + private String channelQueue; + private int tableCount; + private boolean isFullLoad; + private boolean isCreateFirst; + private boolean isDeleteFirst; + private boolean isRequestProcessed; + private boolean isConditional; + private boolean isCustomSql; + private long batchCount; + private String currentTableName; + private long dataCount; + private String processStatus; + private String processName; + + public boolean isActive() { + return pendingBatchCount > 0; + } + + public void setInError(boolean inError) { + this.inError = inError; + } + + public boolean isInError() { + return inError; + } + + public long getLoadId() { + return loadId; + } + + public void setLoadId(long loadId) { + this.loadId = loadId; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public int getFinishedBatchCount() { + return finishedBatchCount; + } + + public void setFinishedBatchCount(int finishedBatchCount) { + this.finishedBatchCount = finishedBatchCount; + } + + public int getPendingBatchCount() { + return pendingBatchCount; + } + + public void setPendingBatchCount(int pendingBatchCount) { + this.pendingBatchCount = pendingBatchCount; + } + + public long getCurrentBatchId() { + return currentBatchId; + } + + public void setCurrentBatchId(long currentBatchId) { + this.currentBatchId = currentBatchId; + } + + public long getCurrentDataEventCount() { + return currentDataEventCount; + } + + public void setCurrentDataEventCount(long currentDataEventCount) { + this.currentDataEventCount = currentDataEventCount; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Date getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(Date lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + } + + public void setCreateBy(String createBy) { + this.createBy = createBy; + } + + public String getCreateBy() { + return createBy; + } + + public String getLoadNodeId() { + return String.format("%010d-%s", loadId, nodeId); + } + + public String getChannelQueue() { + return channelQueue; + } + + public void setChannelQueue(String channelQueue) { + this.channelQueue = channelQueue; + } + + public int getTableCount() { + return tableCount; + } + + public void setTableCount(int tableCount) { + this.tableCount = tableCount; + } + + public boolean isFullLoad() { + return isFullLoad; + } + + public void setFullLoad(boolean isFullLoad) { + this.isFullLoad = isFullLoad; + } + + public boolean isCreateFirst() { + return isCreateFirst; + } + + public void setCreateFirst(boolean isCreateFirst) { + this.isCreateFirst = isCreateFirst; + } + + public boolean isDeleteFirst() { + return isDeleteFirst; + } + + public void setDeleteFirst(boolean isDeleteFirst) { + this.isDeleteFirst = isDeleteFirst; + } + + public boolean isRequestProcessed() { + return isRequestProcessed; + } + + public void setRequestProcessed(boolean isRequestProcessed) { + this.isRequestProcessed = isRequestProcessed; + } + + public boolean isConditional() { + return isConditional; + } + + public void setConditional(boolean isConditional) { + this.isConditional = isConditional; + } + + public boolean isCustomSql() { + return isCustomSql; + } + + public void setCustomSql(boolean isCustomSql) { + this.isCustomSql = isCustomSql; + } + + public long getBatchCount() { + return batchCount; + } + + public void setBatchCount(long batchCount) { + this.batchCount = batchCount; + } + + public String getCurrentTableName() { + return currentTableName; + } + + public void setCurrentTableName(String currentTableName) { + this.currentTableName = currentTableName; + } + + public long getDataCount() { + return dataCount; + } + + public void setDataCount(long dataCount) { + this.dataCount = dataCount; + } + + public String getProcessStatus() { + return processStatus; + } + + public void setProcessStatus(String processStatus) { + this.processStatus = processStatus; + } + + public String getProcessName() { + return processName; + } + + public void setProcessName(String processName) { + this.processName = processName; + } + + + + +} + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java index 334ef60337..198de149a5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java @@ -74,6 +74,8 @@ public String toString() { private long currentBatchId; + private long currentBatchCount; + private String currentChannelId; private boolean threadPerChannel; @@ -156,8 +158,20 @@ public void incrementCurrentDataCount() { public void incrementBatchCount() { this.batchCount++; } + + public void incrementCurrentBatchCount() { + this.currentBatchCount++; + } + + public long getCurrentBatchCount() { + return currentBatchCount; + } + + public void setCurrentBatchCount(long currentBatchCount) { + this.currentBatchCount = currentBatchCount; + } - public long getCurrentBatchId() { + public long getCurrentBatchId() { return currentBatchId; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index f063dffa38..767d60ecae 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -33,6 +33,7 @@ import org.jumpmind.symmetric.model.DataGap; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.OutgoingBatch.Status; +import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.TableReloadRequestKey; import org.jumpmind.symmetric.model.TriggerHistory; @@ -67,9 +68,9 @@ public interface IDataService { */ public String sendSQL(String nodeId, String catalogName, String schemaName, String tableName, String sql); - public void insertReloadEvents(Node targetNode, boolean reverse); + public void insertReloadEvents(Node targetNode, boolean reverse, ProcessInfo processInfo); - public void insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests); + public void insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo); public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index ec64e29831..e0a426eba3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -24,8 +24,10 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Set; import org.jumpmind.db.sql.ISqlTransaction; +import org.jumpmind.symmetric.model.LoadSummary; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchSummary; import org.jumpmind.symmetric.model.OutgoingBatches; @@ -99,6 +101,12 @@ public List listOutgoingBatches(List nodeIds, List statuses, List loads, long startAtBatchId, int rowsExpected, boolean ascending); public List getLoadSummaries(boolean activeOnly); + + public Set getActiveLoads(String sourceNodeId); + + public LoadSummary getLoadSummary(long loadId); + + public Map> getLoadStatusSummarySql(long loadId); public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index f254b759ee..b4f20e8867 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -536,14 +536,14 @@ protected List extract(final ProcessInfo processInfo, final Node executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory()); List> futures = new ArrayList>(); + processInfo.setBatchCount(activeBatches.size()); for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); - channelsProcessed.add(currentBatch.getChannelId()); - processInfo.setDataCount(currentBatch.getDataEventCount()); - processInfo.setCurrentBatchId(currentBatch.getBatchId()); processInfo.setCurrentLoadId(currentBatch.getLoadId()); - + + channelsProcessed.add(currentBatch.getChannelId()); + currentBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, currentBatch); processInfo.setStatus(ProcessInfo.Status.EXTRACTING); final OutgoingBatch extractBatch = currentBatch; @@ -617,13 +617,14 @@ public FutureOutgoingBatch call() throws Exception { if (!extractBatch.isExtractSkipped && (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT)) { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); + processInfo.setCurrentLoadId(currentBatch.getLoadId()); currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, extractBatch.isRetry(), dataWriter, writer, mode); } processedBatches.add(currentBatch); isSent = true; - + if (currentBatch.getStatus() != Status.OK) { currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); changeBatchStatus(Status.LD, currentBatch, mode); @@ -884,7 +885,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe } } - + processInfo.incrementCurrentBatchCount(); return currentBatch; } @@ -1208,8 +1209,9 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status request.getStartBatchId(), request.getEndBatchId()).getBatches(); ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity - .getNodeId(), nodeCommunication.getNodeId(), + .getNodeId(), nodeCommunication.getQueue(), nodeCommunication.getNodeId(), ProcessInfoKey.ProcessType.INITIAL_LOAD_EXTRACT_JOB)); + processInfo.setBatchCount(batches.size()); try { boolean areBatchesOk = true; @@ -1231,9 +1233,10 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status * "Trick" the extractor to extract one reload batch, but we * will split it across the N batches when writing it */ + processInfo.setCurrentLoadId(batches.get(0).getLoadId()); extractOutgoingBatch(processInfo, targetNode, new MultiBatchStagingWriter(identity.getNodeId(), stagingManager, - batches, channel.getMaxBatchSize()), batches.get(0), false, + batches, channel.getMaxBatchSize(), processInfo), batches.get(0), false, false, ExtractMode.FOR_SYM_CLIENT); } else { @@ -1350,14 +1353,17 @@ public class MultiBatchStagingWriter implements IDataWriter { Batch batch; boolean inError = false; - + + ProcessInfo processInfo; + public MultiBatchStagingWriter(String sourceNodeId, IStagingManager stagingManager, - List batches, long maxBatchSize) { + List batches, long maxBatchSize, ProcessInfo processInfo) { this.sourceNodeId = sourceNodeId; this.maxBatchSize = maxBatchSize; this.stagingManager = stagingManager; this.batches = new ArrayList(batches); this.finishedBatches = new ArrayList(batches.size()); + this.processInfo = processInfo; } public void open(DataContext context) { @@ -1383,11 +1389,20 @@ public Map getStatistics() { public void start(Batch batch) { this.batch = batch; + if (batch != null) { + processInfo.setCurrentBatchId(batch.getBatchId()); + processInfo.setCurrentChannelId(batch.getChannelId()); + processInfo.incrementBatchCount(); + processInfo.setCurrentDataCount(0); + } this.currentDataWriter.start(batch); } public boolean start(Table table) { this.table = table; + if (table != null) { + processInfo.setCurrentTableName(table.getFullyQualifiedTableName()); + } this.currentDataWriter.start(table); return true; } @@ -1483,6 +1498,8 @@ protected void startNewBatch() { sourceNodeId, outgoingBatch.getNodeId(), false); this.currentDataWriter.open(context); this.currentDataWriter.start(batch); + processInfo.incrementBatchCount(); + this.currentDataWriter.start(table); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index a13752eff6..6b7bf17a0b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -66,6 +66,7 @@ import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatch.Status; +import org.jumpmind.symmetric.model.ProcessInfo; import org.jumpmind.symmetric.model.Router; import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.TableReloadRequestKey; @@ -340,11 +341,11 @@ private String getReloadChannelIdForTrigger(Trigger trigger, Map reloadRequests) { + public void insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo) { if (engine.getClusterService().lock(ClusterConstants.SYNCTRIGGERS)) { try { @@ -389,6 +390,8 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, - ISqlTransaction transaction, Map reloadRequests) { + ISqlTransaction transaction, Map reloadRequests, ProcessInfo processInfo) { Map channels = engine.getConfigurationService().getChannels(false); DatabaseInfo dbInfo = platform.getDatabaseInfo(); String quote = dbInfo.getDelimiterToken(); String catalogSeparator = dbInfo.getCatalogSeparator(); String schemaSeparator = dbInfo.getSchemaSeparator(); + + processInfo.setBatchCount(triggerHistories.size()); + for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); + for (TriggerRouter triggerRouter : triggerRouters) { if (triggerRouter.getInitialLoadOrder() >= 0 && engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { @@ -854,6 +861,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName(), false); + processInfo.setCurrentTableName(table.getName()); String sql = String.format("select count(*) from %s where %s", table .getQualifiedTableName(quote, catalogSeparator, schemaSeparator), selectSql); @@ -877,6 +885,10 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre numberOfBatches = 1; } + processInfo.setCurrentTableName(table.getFullyQualifiedTableName()); + processInfo.setBatchCount(numberOfBatches); + processInfo.setDataCount(rowCount); + long startBatchId = -1; long endBatchId = -1; for (int i = 0; i < numberOfBatches; i++) { @@ -885,9 +897,10 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre triggerHistory, selectSql, true, loadId, createBy, Status.RQ); if (startBatchId == -1) { startBatchId = endBatchId; + processInfo.setCurrentBatchId(startBatchId); } - } + engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId); } else { @@ -899,6 +912,8 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre transaction.commit(); } } + processInfo.incrementCurrentBatchCount(); + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 6081130400..ca1d73ef5b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IOutgoingBatchFilter; import org.jumpmind.symmetric.model.Channel; +import org.jumpmind.symmetric.model.LoadSummary; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeGroupChannelWindow; import org.jumpmind.symmetric.model.NodeHost; @@ -209,14 +211,14 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(), outgoingBatch.getLastUpdatedHostName(), - outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(), - outgoingBatch.getNodeId() }, new int[] { Types.CHAR, Types.BIGINT, + outgoingBatch.getLastUpdatedTime(), outgoingBatch.getSummary(), + outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, new int[] { Types.CHAR, Types.BIGINT, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.NUMERIC, - Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, symmetricDialect.getSqlTypeForIds(), - Types.VARCHAR }); + Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, + symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); } public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { @@ -252,7 +254,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1: 0, outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch .isCommonFlag() ? 1 : 0, outgoingBatch.getReloadEventCount(), outgoingBatch .getOtherEventCount(), outgoingBatch.getLastUpdatedHostName(), - outgoingBatch.getCreateBy()); + outgoingBatch.getCreateBy(), outgoingBatch.getSummary()); outgoingBatch.setBatchId(batchId); } @@ -288,10 +290,12 @@ public int countOutgoingBatchesInError(String channelId) { return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsOnChannelSql"), channelId); } + @Override public int countOutgoingBatchesUnsent() { return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentSql")); } + @Override public int countOutgoingBatchesUnsent(String channelId) { return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId); } @@ -308,7 +312,7 @@ public Map countOutgoingBatchesPendingByChannel(String nodeId) Set channelIds = configurationService.getChannels(false).keySet(); for (String channelId : channelIds) { - if (!results.containsKey(channelId)) { + if (!results.containsKey(channelId) && !Constants.CHANNEL_HEARTBEAT.equals(channelId)) { results.put(channelId, 0); } } @@ -586,6 +590,84 @@ public List findOutgoingBatchSummary(Status... statuses) { return sqlTemplate.query(sql, new OutgoingBatchSummaryMapper(), args); } + public Set getActiveLoads(String sourceNodeId) { + Set loads = new HashSet(); + + List inProcess = sqlTemplate.query(getSql("getActiveLoadsSql"), new ISqlRowMapper() { + @Override + public Long mapRow(Row rs) { + return rs.getLong("load_id"); + } + }, sourceNodeId); + List requests = sqlTemplate.query(getSql("getUnprocessedReloadRequestsSql"), new ISqlRowMapper() { + @Override + public Long mapRow(Row rs) { + return rs.getLong("load_id"); + } + }, sourceNodeId); + + loads.addAll(inProcess); + loads.addAll(requests); + return loads; + } + + public LoadSummary getLoadSummary(long loadId) { + return sqlTemplate.queryForObject(getSql("getLoadSummarySql"), + new ISqlRowMapper() { + + public LoadSummary mapRow(Row rs) { + LoadSummary summary = new LoadSummary(); + summary.setLoadId(rs.getLong("load_id")); + summary.setNodeId(rs.getString("target_node_id")); + summary.setCreateBy(rs.getString("last_update_by")); + summary.setTableCount(rs.getInt("table_count")); + String triggerId = rs.getString("trigger_id"); + if (triggerId != null && triggerId.equals(ParameterConstants.ALL)) { + summary.setFullLoad(true); + } else { + summary.setFullLoad(false); + } + summary.setCreateFirst(rs.getBoolean("create_table")); + summary.setDeleteFirst(rs.getBoolean("delete_first")); + summary.setRequestProcessed(rs.getBoolean("processed")); + summary.setConditional(rs.getBoolean("reload_select")); + summary.setCustomSql(rs.getBoolean("before_custom_sql")); + return summary; + } + }, loadId); + } + + public Map> getLoadStatusSummarySql(long loadId) { + LoadStatusByQueueMapper mapper = new LoadStatusByQueueMapper(); + + List obj = sqlTemplate.query(getSql("getLoadStatusSummarySql"), mapper, loadId); + return mapper.getResults(); + } + + private class LoadStatusByQueueMapper implements ISqlRowMapper { + Map> results = new HashMap>(); + + @Override + public Object mapRow(Row rs) { + String queue = rs.getString("queue"); + Integer count = rs.getInt("count"); + String status = rs.getString("status"); + + if (results.get(queue) == null) { + results.put(queue, new HashMap()); + } + + results.get(queue).put(status, count); + + return null; + } + + public Map> getResults() { + return results; + } + } + + public List getLoadSummaries(boolean activeOnly) { final Map loadSummaries = new TreeMap(); sqlTemplate.query(getSql("getLoadSummariesSql"), new ISqlRowMapper() { @@ -648,7 +730,7 @@ public OutgoingLoadSummary mapRow(Row rs) { return loads; } - + class OutgoingBatchSummaryMapper implements ISqlRowMapper { public OutgoingBatchSummary mapRow(Row rs) { OutgoingBatchSummary summary = new OutgoingBatchSummary(); @@ -716,6 +798,7 @@ public OutgoingBatch mapRow(Row rs) { batch.setExtractJobFlag(rs.getBoolean("extract_job_flag")); batch.setLoadId(rs.getLong("load_id")); batch.setCreateBy(rs.getString("create_by")); + batch.setSummary(rs.getString("summary")); } return batch; } else { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 01d1b3bfeb..b4c26952bc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -48,8 +48,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("insertOutgoingBatchSql", "insert into $(outgoing_batch) " - + " (batch_id, node_id, channel_id, status, load_id, extract_job_flag, load_flag, common_flag, reload_event_count, other_event_count, last_update_hostname, last_update_time, create_time, create_by) " - + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?) "); + + " (batch_id, node_id, channel_id, status, load_id, extract_job_flag, load_flag, common_flag, reload_event_count, other_event_count, last_update_hostname, last_update_time, create_time, create_by, summary) " + + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?, ?) "); putSql("updateOutgoingBatchSql", "update $(outgoing_batch) set status=?, load_id=?, extract_job_flag=?, load_flag=?, error_flag=?, " @@ -57,7 +57,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " reload_event_count=?, insert_event_count=?, update_event_count=?, delete_event_count=?, other_event_count=?, " + " ignore_count=?, router_millis=?, network_millis=?, filter_millis=?, " + " load_millis=?, extract_millis=?, sql_state=?, sql_code=?, sql_message=?, " - + " failed_data_id=?, last_update_hostname=?, last_update_time=? where batch_id=? and node_id=? "); + + " failed_data_id=?, last_update_hostname=?, last_update_time=?, summary=? where batch_id=? and node_id=? "); putSql("findOutgoingBatchSql", "where batch_id=? and node_id=? "); @@ -84,7 +84,7 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " b.reload_event_count, b.insert_event_count, b.update_event_count, b.delete_event_count, b.other_event_count, " + " b.ignore_count, b.router_millis, b.network_millis, b.filter_millis, b.load_millis, b.extract_millis, b.sql_state, b.sql_code, " + " b.sql_message, " - + " b.failed_data_id, b.last_update_hostname, b.last_update_time, b.create_time, b.batch_id, b.extract_job_flag, b.load_flag, b.error_flag, b.common_flag, b.load_id, b.create_by from " + + " b.failed_data_id, b.last_update_hostname, b.last_update_time, b.create_time, b.batch_id, b.extract_job_flag, b.load_flag, b.error_flag, b.common_flag, b.load_id, b.create_by, b.summary from " + " $(outgoing_batch) b "); putSql("selectOutgoingBatchErrorsSql", " where error_flag=1 order by batch_id "); @@ -127,6 +127,35 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + "group by b.load_id, b.node_id, b.status, b.channel_id, b.create_by " + "order by b.load_id desc "); + putSql("getActiveLoadsSql", + "select r.load_id " + + "from $(table_reload_request) r " + + "join $(outgoing_batch) ob on ob.load_id = r.load_id " + + "where ob.status != 'OK' and ob.status != 'IG' and r.source_node_id = ? " + + "group by r.load_id;"); + + putSql("getUnprocessedReloadRequestsSql", + "select r.load_id " + + "from $(table_reload_request) r " + + "where processed = 0 and source_node_id = ? "); + + putSql("getLoadSummarySql", + "select " + + " target_node_id, load_id, count(TRIGGER_ID) as table_count, max(TRIGGER_ID) as trigger_id, " + + " max(create_table) as create_table, max(delete_first) as delete_first, max(processed) as processed, " + + " max(reload_select) as reload_select, max(before_custom_sql) as before_custom_sql, " + + " max(last_update_by) as last_update_by, min(last_update_time) as last_update_time " + + "from $(table_reload_request) " + + " where load_id = ? " + + " group by load_id, target_node_id "); + + putSql("getLoadStatusSummarySql", + "select count(ob.batch_id) as count, ob.status, c.queue " + + " from $(outgoing_batch) ob " + + " join $(channel) c on c.channel_id = ob.channel_id " + + " where ob.load_id = ? " + + " group by c.queue, ob.status"); + putSql("getNextOutgoingBatchForEachNodeSql", "select min(b.batch_id) as batch_id, b.node_id, b.status, b.channel_id " + " from $(outgoing_batch) b where status != 'OK' and status != 'RT' " diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index ca8dbc7db8..6db0867e67 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -255,13 +255,13 @@ protected void insertInitialLoadEvents() { boolean registered = security.getRegistrationTime() != null; if (thisMySecurityRecord && reverseLoadQueued && (reverseLoadFirst || !initialLoadQueued)) { - sendReverseInitialLoad(); + sendReverseInitialLoad(processInfo); } else if (!thisMySecurityRecord && registered && initialLoadQueued && (!reverseLoadFirst || !reverseLoadQueued)) { long ts = System.currentTimeMillis(); engine.getDataService().insertReloadEvents( engine.getNodeService().findNode(security.getNodeId()), - false); + false, processInfo); isInitialLoadQueued = true; ts = System.currentTimeMillis() - ts; if (ts > Constants.LONG_OPERATION_THRESHOLD) { @@ -296,7 +296,7 @@ protected void insertInitialLoadEvents() { } } - processTableRequestLoads(identity); + processTableRequestLoads(identity, processInfo); } } @@ -308,7 +308,7 @@ protected void insertInitialLoadEvents() { } - public void processTableRequestLoads(Node source) { + public void processTableRequestLoads(Node source, ProcessInfo processInfo) { List loadsToProcess = engine.getDataService().getTableReloadRequestToProcess(source.getNodeId()); if (loadsToProcess.size() > 0) { log.info("Found " + loadsToProcess.size() + " table reload requests to process."); @@ -322,7 +322,7 @@ public void processTableRequestLoads(Node source) { engine.getDataService().insertReloadEvents( engine.getNodeService().findNode(load.getTargetNodeId()), - false, fullLoad); + false, fullLoad, processInfo); } else { NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(load.getTargetNodeId()); @@ -341,7 +341,7 @@ public void processTableRequestLoads(Node source) { for (Map.Entry> entry : requestsSplitByLoad.entrySet()) { engine.getDataService().insertReloadEvents( engine.getNodeService().findNode(entry.getKey().split("::")[0]), - false, entry.getValue()); + false, entry.getValue(), processInfo); } @@ -384,14 +384,14 @@ public List findNodesThatAreReadyForInitialLoad() { return toReturn; } - protected void sendReverseInitialLoad() { + protected void sendReverseInitialLoad(ProcessInfo processInfo) { INodeService nodeService = engine.getNodeService(); boolean queuedLoad = false; List nodes = new ArrayList(); nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.P)); nodes.addAll(nodeService.findTargetNodesFor(NodeGroupLinkAction.W)); for (Node node : nodes) { - engine.getDataService().insertReloadEvents(node, true); + engine.getDataService().insertReloadEvents(node, true, processInfo); queuedLoad = true; }