From 45ff69ecfe81e3bde421fcfaffa6f0cf664bd227 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Mon, 3 Dec 2018 10:21:21 -0500 Subject: [PATCH] Load enhancements based on table_reload_request and extract_request to reduce dependencies on outgoing_batch table. --- .../symmetric/model/ExtractRequest.java | 82 +++++ .../symmetric/model/TableReloadRequest.java | 123 +++++++ .../model/TableReloadRequestKey.java | 18 +- .../service/IDataExtractorService.java | 11 +- .../symmetric/service/IDataService.java | 10 + .../service/IOutgoingBatchService.java | 6 +- .../service/impl/AcknowledgeService.java | 17 +- .../service/impl/DataExtractorService.java | 94 +++++- .../impl/DataExtractorServiceSqlMap.java | 14 +- .../symmetric/service/impl/DataService.java | 304 ++++++++++++++++-- .../service/impl/DataServiceSqlMap.java | 44 ++- .../service/impl/OutgoingBatchService.java | 52 ++- .../impl/OutgoingBatchServiceSqlMap.java | 7 +- .../service/impl/StatisticService.java | 9 +- .../service/impl/StatisticServiceSqlMap.java | 11 +- .../symmetric/statistic/ChannelStats.java | 40 +++ .../statistic/IStatisticManager.java | 6 + .../symmetric/statistic/StatisticManager.java | 27 ++ .../src/main/resources/symmetric-schema.xml | 30 +- .../statistic/MockStatisticManager.java | 20 ++ 20 files changed, 852 insertions(+), 73 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java index 7fbd518115..4d47261c15 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ExtractRequest.java @@ -42,6 +42,15 @@ public enum ExtractStatus { private Date lastUpdateTime; private Date createTime; private String queue; + private long loadId; + private String tableName; + private long rows; + private long transferredRows; + private long loadedRows; + private long lastTransferredBatchId; + private long lastLoadedBatchId; + private long transferredMillis; + private long loadedMillis; public long getRequestId() { return requestId; @@ -131,4 +140,77 @@ public void setRouterId(String routerId) { this.routerId = routerId; } + public long getLoadId() { + return loadId; + } + + public void setLoadId(long loadId) { + this.loadId = loadId; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public long getRows() { + return rows; + } + + public void setRows(long rows) { + this.rows = rows; + } + + public long getTransferredRows() { + return transferredRows; + } + + public void setTransferredRows(long transferredRows) { + this.transferredRows = transferredRows; + } + + public long getLoadedRows() { + return loadedRows; + } + + public void setLoadedRows(long loadedRows) { + this.loadedRows = loadedRows; + } + + public long getLastTransferredBatchId() { + return lastTransferredBatchId; + } + + public void setLastTransferredBatchId(long lastTransferredBatchId) { + this.lastTransferredBatchId = lastTransferredBatchId; + } + + public long getLastLoadedBatchId() { + return lastLoadedBatchId; + } + + public void setLastLoadedBatchId(long lastLoadedBatchId) { + this.lastLoadedBatchId = lastLoadedBatchId; + } + + public long getTransferredMillis() { + return transferredMillis; + } + + public void setTransferredMillis(long transferredMillis) { + this.transferredMillis = transferredMillis; + } + + public long getLoadedMillis() { + return loadedMillis; + } + + public void setLoadedMillis(long loadedMillis) { + this.loadedMillis = loadedMillis; + } + + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java index 6d1f318eb0..76a8636ed6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java @@ -39,12 +39,26 @@ public class TableReloadRequest { protected Date createTime = new Date(); protected Date lastUpdateTime = new Date(); protected String lastUpdateBy; + protected int batchCount; + protected int loadedBatchCount; + protected Long rowCount; + protected Long loadedRowCount; + protected int tableCount; + protected boolean errorFlag; + protected String sqlState; + protected int sqlCode; + protected String sqlMessage; + protected int loadId; + protected boolean processed; + protected boolean completed; + protected boolean cancelled; public TableReloadRequest(TableReloadRequestKey key) { this.targetNodeId = key.getTargetNodeId(); this.sourceNodeId = key.getSourceNodeId(); this.triggerId = key.getTriggerId(); this.routerId = key.getRouterId(); + this.createTime = key.getCreateTime(); } public TableReloadRequest() { @@ -161,4 +175,113 @@ public boolean isFullLoadRequest() { public String getIdentifier() { return getTriggerId() + getRouterId(); } + + public int getBatchCount() { + return batchCount; + } + + public void setBatchCount(int batchCount) { + this.batchCount = batchCount; + } + + public Long getRowCount() { + return rowCount == null ? 0 : rowCount; + } + + public void setRowCount(Long rowCount) { + this.rowCount = rowCount; + } + + public int getTableCount() { + return tableCount; + } + + public void setTableCount(int tableCount) { + this.tableCount = tableCount; + } + + public boolean isErrorFlag() { + return errorFlag; + } + + public void setErrorFlag(boolean errorFlag) { + this.errorFlag = errorFlag; + } + + public String getSqlState() { + return sqlState; + } + + public void setSqlState(String sqlState) { + this.sqlState = sqlState; + } + + public int getSqlCode() { + return sqlCode; + } + + public void setSqlCode(int sqlCode) { + this.sqlCode = sqlCode; + } + + public String getSqlMessage() { + return sqlMessage; + } + + public void setSqlMessage(String sqlMessage) { + this.sqlMessage = sqlMessage; + } + + public int getLoadId() { + return loadId; + } + + public void setLoadId(int loadId) { + this.loadId = loadId; + } + + public boolean isProcessed() { + return processed; + } + + public void setProcessed(boolean processed) { + this.processed = processed; + } + + public boolean isCompleted() { + return completed; + } + + public void setCompleted(boolean completed) { + this.completed = completed; + } + + public int getLoadedBatchCount() { + return loadedBatchCount; + } + + public void setLoadedBatchCount(int loadedBatchCount) { + this.loadedBatchCount = loadedBatchCount; + } + + public Long getLoadedRowCount() { + return loadedRowCount == null ? 0 : loadedRowCount ; + } + + public void setLoadedRowCount(Long loadedRowCount) { + this.loadedRowCount = loadedRowCount; + } + + public boolean isCancelled() { + return cancelled; + } + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + public TableReloadRequestKey getTableReloadRequestKey() { + return new TableReloadRequestKey(this.targetNodeId, this.sourceNodeId, this.triggerId, this.routerId, this.createTime); + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequestKey.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequestKey.java index 51977e2834..fc9e5af6bf 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequestKey.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequestKey.java @@ -20,6 +20,8 @@ */ package org.jumpmind.symmetric.model; +import java.util.Date; + public class TableReloadRequestKey { protected String targetNodeId; @@ -27,7 +29,8 @@ public class TableReloadRequestKey { protected String triggerId; protected String routerId; protected String receivedFromNodeId; - + protected Date createTime; + public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String triggerId, String routerId, String receivedFromNodeId) { this.targetNodeId = targetNodeId; @@ -37,6 +40,15 @@ public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String tr this.receivedFromNodeId = receivedFromNodeId; } + public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String triggerId, + String routerId, Date createTime) { + this.targetNodeId = targetNodeId; + this.sourceNodeId = sourceNodeId; + this.triggerId = triggerId; + this.routerId = routerId; + this.createTime = createTime; + } + public String getRouterId() { return routerId; } @@ -60,5 +72,9 @@ public void setReceivedFromNodeId(String receivedFromNodeId) { public String getReceivedFromNodeId() { return receivedFromNodeId; } + + public Date getCreateTime() { + return createTime; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index 2147336bb7..e4fecccba5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -27,6 +27,7 @@ import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType; +import org.jumpmind.symmetric.model.ExtractRequest; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchWithPayload; @@ -64,10 +65,18 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi public RemoteNodeStatuses queueWork(boolean force); - public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId); + public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String tableName, long rows); public void resetExtractRequest(OutgoingBatch batch); public void removeBatchFromStaging(OutgoingBatch batch); + public List getPendingTablesForExtractByLoadId(long loadId); + + public List getCompletedTablesForExtractByLoadId(long loadId); + + public void updateExtractRequestLoadTime(Date loadTime, OutgoingBatch batch); + + public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis); + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index e8dba2268b..5028605765 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -51,8 +51,18 @@ public interface IDataService { public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); + public TableReloadRequest getTableReloadRequest(int loadId); + public List getTableReloadRequestToProcess(final String sourceNodeId); + public List getTableReloadRequestsByLoadId(); + + public void updateTableReloadRequestsCounts(long loadId, int batchCount, long rowsCount); + + public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transcation, long loadId, int batchCount, long rowsCount); + + public void updateTableReloadRequestsCancelled(long loadId); + public String reloadNode(String nodeId, boolean reverseLoad, String createBy); public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 1e2cd9f319..c732ddb200 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -25,16 +25,16 @@ import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Set; import org.jumpmind.db.sql.ISqlTransaction; +import org.jumpmind.symmetric.model.AbstractBatch.Status; import org.jumpmind.symmetric.model.LoadSummary; import org.jumpmind.symmetric.model.NodeGroupLinkAction; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchSummary; import org.jumpmind.symmetric.model.OutgoingBatches; import org.jumpmind.symmetric.model.OutgoingLoadSummary; -import org.jumpmind.symmetric.model.AbstractBatch.Status; +import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadCounts; import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadStatusSummary; /** @@ -121,7 +121,7 @@ public List listOutgoingBatches(List nodeIds, List getLoadSummaries(boolean activeOnly); - public Set getActiveLoads(String sourceNodeId); + public Map getActiveLoadCounts(); public List getQueuedLoads(String sourceNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index fd11d0cc04..d4f5f26e50 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -36,6 +36,7 @@ import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.service.IAcknowledgeService; +import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IRegistrationService; import org.jumpmind.symmetric.statistic.RouterStats; @@ -59,8 +60,7 @@ public BatchAckResult ack(final BatchAck batch) { IRegistrationService registrationService = engine.getRegistrationService(); IOutgoingBatchService outgoingBatchService = engine.getOutgoingBatchService(); - - BatchAckResult result = new BatchAckResult(batch); + BatchAckResult result = new BatchAckResult(batch); for (IAcknowledgeEventListener listener : engine.getExtensionService().getExtensionPointList(IAcknowledgeEventListener.class)) { listener.onAcknowledgeEvent(batch); @@ -74,6 +74,7 @@ public BatchAckResult ack(final BatchAck batch) { OutgoingBatch outgoingBatch = outgoingBatchService .findOutgoingBatch(batch.getBatchId(), batch.getNodeId()); Status status = batch.isResend() ? Status.RS : batch.isOk() ? Status.OK : Status.ER; + Status oldStatus = null; if (outgoingBatch != null) { // Allow an outside system/user to indicate that a batch // is OK. @@ -84,7 +85,7 @@ public BatchAckResult ack(final BatchAck batch) { } else { // clearing the error flag in case the user set the batch // status to OK - Status oldStatus = outgoingBatch.getStatus(); + oldStatus = outgoingBatch.getStatus(); outgoingBatch.setStatus(Status.OK); outgoingBatch.setErrorFlag(false); log.info("Batch {} for {} was set to {}. Updating the status to OK", @@ -137,6 +138,9 @@ public BatchAckResult ack(final BatchAck batch) { if (routerStats != null) { log.info("Router stats for batch " + outgoingBatch.getBatchId() + ": " + routerStats.toString()); } + if (isNewError) { + engine.getStatisticManager().incrementDataLoadedOutgoingErrors(outgoingBatch.getChannelId(), 1); + } if (isNewError && outgoingBatch.getSqlCode() == ErrorConstants.FK_VIOLATION_CODE && parameterService.is(ParameterConstants.AUTO_RESOLVE_FOREIGN_KEY_VIOLATION)) { Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId()); @@ -164,6 +168,13 @@ public BatchAckResult ack(final BatchAck batch) { outgoingBatchService.updateOutgoingBatch(outgoingBatch); if (status == Status.OK) { + if (!Status.OK.equals(oldStatus)) { + if (outgoingBatch.getLoadId() > 0) { + engine.getDataExtractorService().updateExtractRequestLoadTime(new Date(), outgoingBatch); + } + engine.getStatisticManager().incrementDataLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getLoadRowCount()); + engine.getStatisticManager().incrementDataBytesLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getByteCount()); + } Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId()); if (channel != null && channel.isFileSyncFlag()){ /* Acknowledge the file_sync in case the file needs deleted. */ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index a66017aae6..63ecfe6e0f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -773,7 +773,12 @@ protected List extract(final ProcessInfo extractInfo, final Node currentBatch.setLoadCount(currentBatch.getLoadCount() + 1); changeBatchStatus(Status.LD, currentBatch, mode); } - + + if (currentBatch.getLoadId() > 0) { + long transferMillis = transferInfo.getEndTime() == null ? new Date().getTime() - transferInfo.getStartTime().getTime() + : transferInfo.getEndTime().getTime() - transferInfo.getStartTime().getTime(); + updateExtractRequestTransferred(currentBatch, transferMillis); + } transferInfo.setCurrentTableName(currentBatch.getSummary()); transferInfo.setStatus(ProcessStatus.OK); @@ -1316,6 +1321,13 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo long ts = System.currentTimeMillis(); IStagedResource extractedBatch = getStagedResource(currentBatch); if (extractedBatch != null) { + processInfo.setCurrentLoadId(currentBatch.getLoadId()); + processInfo.setTotalDataCount(currentBatch.getDataRowCount()); + + if (currentBatch.getLoadId() > 0) { + processInfo.setCurrentTableName(currentBatch.getSummary()); + } + if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) { if (!isRetry && parameterService.is(ParameterConstants.OUTGOING_BATCH_COPY_TO_INCOMING_STAGING) && !parameterService.is(ParameterConstants.NODE_OFFLINE, false)) { @@ -1333,6 +1345,7 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo currentBatch.getBatchId()); try { SymmetricUtils.copyFile(extractedBatch.getFile(), targetResource.getFile()); + processInfo.setCurrentDataCount(currentBatch.getDataRowCount()); if(log.isDebugEnabled()) { log.debug("Copied file to incoming staging of remote engine {}", targetResource.getFile().getAbsolutePath()); } @@ -1532,6 +1545,70 @@ protected int findStatsIndex(String bufferString, String prevBuffer) { return index; } + + @Override + public List getPendingTablesForExtractByLoadId(long loadId) { + return sqlTemplate.query(getSql("selectIncompleteTablesForExtractByLoadId"), new ExtractRequestMapper(), loadId); + } + + @Override + public List getCompletedTablesForExtractByLoadId(long loadId) { + return sqlTemplate.query(getSql("selectCompletedTablesForExtractByLoadId"), new ExtractRequestMapper(), loadId); + } + + @Override + public void updateExtractRequestLoadTime(Date loadTime, OutgoingBatch outgoingBatch) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + + dataService.updateTableReloadRequestsLoadedCounts(transaction, outgoingBatch.getLoadId(), 1, + outgoingBatch.getReloadRowCount() > 0 ? outgoingBatch.getDataRowCount() : 0); + + + transaction.prepareAndExecute(getSql("updateExtractRequestLoadTime"), outgoingBatch.getBatchId(), outgoingBatch.getDataRowCount(), + outgoingBatch.getLoadMillis(), outgoingBatch.getBatchId(), outgoingBatch.getBatchId(), outgoingBatch.getBatchId(), + outgoingBatch.getNodeId(), outgoingBatch.getLoadId()); + + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + @Override + public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + transaction.prepareAndExecute(getSql("updateExtractRequestTransferred"), batch.getBatchId(), batch.getDataRowCount(), transferMillis, + batch.getBatchId(), batch.getBatchId(), batch.getNodeId(), batch.getLoadId(), batch.getBatchId()); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch) throws IOException { String bufferString = new String(buffer); @@ -1819,13 +1896,13 @@ public void resetExtractRequest(OutgoingBatch batch) { } public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue, - TriggerRouter triggerRouter, long startBatchId, long endBatchId) { + TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows) { long requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ); transaction.prepareAndExecute(getSql("insertExtractRequestSql"), new Object[] { requestId, nodeId, queue, ExtractStatus.NE.name(), startBatchId, endBatchId, triggerRouter.getTrigger().getTriggerId(), - triggerRouter.getRouter().getRouterId() }, new int[] { Types.BIGINT, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR }); + triggerRouter.getRouter().getRouterId(), loadId, table, rows }, new int[] { Types.BIGINT, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.BIGINT }); } protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId, @@ -2059,6 +2136,15 @@ public ExtractRequest mapRow(Row row) { request.setTriggerRouter(triggerRouterService.findTriggerRouterById( row.getString("trigger_id"), row.getString("router_id"), false)); request.setQueue(row.getString("queue")); + request.setLoadId(row.getLong("load_id")); + request.setTableName(row.getString("table_name")); + request.setRows(row.getLong("total_rows")); + request.setTransferredRows(row.getLong("transferred_rows")); + request.setLastTransferredBatchId(row.getLong("last_transferred_batch_id")); + request.setLoadedRows(row.getLong("loaded_rows")); + request.setLastLoadedBatchId(row.getLong("last_loaded_batch_id")); + request.setTransferredMillis(row.getLong("transferred_millis")); + request.setLoadedMillis(row.getLong("loaded_millis")); return request; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java index 17ddb73d08..c94c25c8c5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java @@ -35,11 +35,23 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform, putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? order by request_id"); - putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)"); + putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, load_id, table_name, total_rows, last_update_time, create_time) " + + " values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)"); putSql("updateExtractRequestStatus", "update $(extract_request) set status=?, last_update_time=current_timestamp where request_id=?"); + putSql("updateExtractRequestLoadTime", "update $(extract_request) set loaded_time = (case when end_batch_id = ? then current_timestamp else null end), " + + " loaded_rows = loaded_rows + ?, loaded_millis = loaded_millis + ?, last_loaded_batch_id = ?, " + + " last_update_time=current_timestamp where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=?"); + + putSql("updateExtractRequestTransferred", "update $(extract_request) set last_transferred_batch_id=?, transferred_rows = transferred_rows + ?, transferred_millis = ?" + + " where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=? and (last_transferred_batch_id is null or last_transferred_batch_id < ?)"); + putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, last_update_time= current_timestamp where start_batch_id <= ? and end_batch_id >= ? and node_id=?"); + + putSql("selectIncompleteTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is null order by request_id desc"); + + putSql("selectCompletedTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is not null order by request_id"); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 781731b9bd..66dffddb2f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -264,10 +264,21 @@ public TableReloadRequest mapRow(Row rs) { request.setCreateTime(rs.getDateTime("create_time")); request.setLastUpdateBy(rs.getString("last_update_by")); request.setLastUpdateTime(rs.getDateTime("last_update_time")); + request.setLoadId(rs.getInt("load_id")); return request; } }, key.getSourceNodeId(), key.getTargetNodeId(), key.getTriggerId(), - key.getRouterId()); + key.getRouterId(), key.getCreateTime()); + } + + + @Override + public TableReloadRequest getTableReloadRequest(int loadId) { + List requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadId"), + new TableReloadRequestMapper(), loadId); + + List collapsedRequests = collapseTableReloadRequestsByLoadId(requests); + return collapsedRequests == null || collapsedRequests.size() == 0 ? null : collapsedRequests.get(0); } public List getTableReloadRequestToProcess(final String sourceNodeId) { @@ -292,7 +303,190 @@ public TableReloadRequest mapRow(Row rs) { } }, sourceNodeId); } + + public List getTableReloadRequests() { + return sqlTemplateDirty.query(getSql("selectTableReloadRequests"), + new TableReloadRequestMapper()); + } + + public List getTableReloadRequestsByLoadId() { + return collapseTableReloadRequestsByLoadId(getTableReloadRequests()); + } + + public List collapseTableReloadRequestsByLoadId(List requests) { + List collapsedRequests = new ArrayList(); + + int previousLoadId = -1; + + TableReloadRequest summary = null; + for (TableReloadRequest request : requests) { + if (request.getLoadId() != previousLoadId) { + if (summary != null) { + collapsedRequests.add(summary); + } + summary = new TableReloadRequest(); + summary.setTargetNodeId(request.getTargetNodeId()); + summary.setSourceNodeId(request.getSourceNodeId()); + summary.setCreateTable(request.isCreateTable()); + summary.setDeleteFirst(request.isDeleteFirst()); + summary.setBeforeCustomSql(request.getBeforeCustomSql()); + summary.setCreateTime(request.getCreateTime()); + summary.setLastUpdateTime(request.getLastUpdateTime()); + summary.setLastUpdateBy(request.getLastUpdateBy()); + summary.setProcessed(request.isProcessed()); + summary.setCompleted(request.isCompleted()); + summary.setLoadId(request.getLoadId()); + summary.setLoadedBatchCount(request.getLoadedBatchCount()); + summary.setLoadedRowCount(request.getLoadedRowCount()); + summary.setProcessed(request.isProcessed()); + summary.setCompleted(request.isCompleted()); + summary.setChannelId(request.getChannelId()); + summary.setCancelled(request.isCancelled()); + + if (request.isFullLoadRequest()) { + summary.setTriggerId(request.getTriggerId()); + summary.setRouterId(request.getRouterId()); + } + } + summary.setBatchCount(summary.getBatchCount() + request.getBatchCount()); + summary.setTableCount(summary.getTableCount() + request.getTableCount()); + summary.setRowCount((summary.getRowCount() == null ? 0 : summary.getRowCount()) + (request.getRowCount() == null ? 0 : request.getRowCount())); + + if (request.isErrorFlag()) { + summary.setSqlCode(request.getSqlCode()); + summary.setSqlState(request.getSqlState()); + summary.setSqlMessage(request.getSqlMessage()); + } + + previousLoadId = request.getLoadId(); + } + if (summary != null) { + collapsedRequests.add(summary); + } + return collapsedRequests; + } + + public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transaction, long loadId, int batchCount, long rowsCount) { + transaction.prepareAndExecute(getSql("updateTableReloadRequestLoadedCounts"), + new Object[] { batchCount, rowsCount, new Date(), loadId }, + new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.NUMERIC}); + + } + + public void updateTableReloadRequestsCounts(long loadId, int batchCount, long rowsCount) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + transaction.prepareAndExecute(getSql("updateTableReloadRequestCounts"), + new Object[] { batchCount, rowsCount, new Date(), loadId }, + new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.NUMERIC}); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + public void updateTableReloadRequestsLoadAndTableCounts(long loadId, int tableCount, TableReloadRequest request) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + transaction.prepareAndExecute(getSql("updateTableReloadRequestLoadId"), + new Object[] { + loadId, tableCount, new Date(), request.getTargetNodeId(), request.getSourceNodeId(), + request.getTriggerId(), request.getRouterId(), request.getCreateTime() + }, + new int[] { Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP}); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + public void updateTableReloadRequestsCancelled(long loadId) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + transaction.prepareAndExecute(getSql("updateTableReloadRequestCancelled"), + new Object[] { + new Date(), loadId + }, + new int[] { Types.TIMESTAMP,Types.NUMERIC}); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + + protected class TableReloadRequestMapper implements ISqlRowMapper { + + @Override + public TableReloadRequest mapRow(Row rs) { + TableReloadRequest request = new TableReloadRequest(); + + request.setSourceNodeId(rs.getString("source_node_id")); + request.setTargetNodeId(rs.getString("target_node_id")); + request.setCreateTable(rs.getBoolean("create_table")); + request.setDeleteFirst(rs.getBoolean("delete_first")); + request.setReloadSelect(rs.getString("reload_select")); + request.setReloadTime(rs.getDateTime("reload_time")); + request.setBeforeCustomSql(rs.getString("before_custom_sql")); + request.setChannelId(rs.getString("channel_id")); + request.setTriggerId(rs.getString("trigger_id")); + request.setRouterId(rs.getString("router_id")); + request.setCreateTime(rs.getDateTime("create_time")); + request.setLastUpdateBy(rs.getString("last_update_by")); + request.setLastUpdateTime(rs.getDateTime("last_update_time")); + request.setBatchCount(rs.getInt("batch_count")); + request.setRowCount(rs.getLong("row_count")); + request.setTableCount(rs.getInt("table_count")); + request.setErrorFlag(rs.getBoolean("error_flag")); + request.setSqlCode(rs.getInt("sql_code")); + request.setSqlState(rs.getString("sql_state")); + request.setSqlMessage(rs.getString("sql_message")); + request.setLoadId(rs.getInt("load_id")); + request.setLoadedBatchCount(rs.getInt("batch_loaded_count")); + request.setLoadedRowCount(rs.getLong("row_loaded_count")); + request.setCompleted(rs.getBoolean("completed")); + request.setProcessed(rs.getBoolean("processed")); + request.setCancelled(rs.getBoolean("cancelled")); + + return request; + } + } + /** * @return If isLoad then return the inserted batch id otherwise return the * data id @@ -480,32 +674,40 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List 0) { + for (TableReloadRequest request : reloadRequests) { + updateTableReloadRequestsLoadAndTableCounts(loadId, totalTableCount, request); + } + } + + int batchCount = 0; + + batchCount += insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy, transactional, transaction, reverse, triggerHistories, triggerRoutersByHistoryId, mapReloadRequests, isFullLoad, symNodeSecurityReloadChannel); - insertCreateBatchesForReload(targetNode, loadId, createBy, + batchCount += insertCreateBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests); - insertDeleteBatchesForReload(targetNode, loadId, createBy, + batchCount += insertDeleteBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests); - insertSQLBatchesForReload(targetNode, loadId, createBy, + batchCount += insertSQLBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests); - insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories, + batchCount += insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests, processInfo, null); - insertSqlEventsAfterReload(targetNode, nodeIdRecord, loadId, createBy, + batchCount += insertSqlEventsAfterReload(targetNode, nodeIdRecord, loadId, createBy, transactional, transaction, reverse, triggerHistories, triggerRoutersByHistoryId, mapReloadRequests, isFullLoad, symNodeSecurityReloadChannel); - insertFileSyncBatchForReload(targetNode, loadId, createBy, transactional, + batchCount += insertFileSyncBatchForReload(targetNode, loadId, createBy, transactional, transaction, mapReloadRequests, isFullLoad, processInfo); if (isFullLoad) { @@ -523,20 +725,17 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List 0) { for (TableReloadRequest request : reloadRequests) { - int rowsAffected = transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), loadId, new Date(), - request.getTargetNodeId(), request.getSourceNodeId(), request.getTriggerId(), - request.getRouterId(), request.getCreateTime()); + int rowsAffected = transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), new Date(), batchCount, loadId); if (rowsAffected == 0) { - throw new SymmetricException(String.format("Failed to update a table_reload_request for loadId '%s' " - + "targetNodeId '%s' sourceNodeId '%s' triggerId '%s' routerId '%s' createTime '%s'", - loadId, request.getTargetNodeId(), request.getSourceNodeId(), request.getTriggerId(), - request.getRouterId(), request.getCreateTime())); + throw new SymmetricException(String.format("Failed to update a table_reload_request as processed for loadId '%s' ", + loadId)); } } log.info("Table reload request(s) for load id " + loadId + " have been processed."); @@ -653,12 +852,13 @@ private String formatCommandForScript(String command) { return builder.substring(0, builder.length()-1); } - private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, long loadId, + private int insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, long loadId, String createBy, boolean transactional, ISqlTransaction transaction, boolean reverse, List triggerHistories, Map> triggerRoutersByHistoryId, Map reloadRequests, boolean isFullLoad, String channelId) { + int batchCount = 0; if (isFullLoad && !Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) { /* * Insert node security so the client doing the initial load knows @@ -666,7 +866,8 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, */ insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true, loadId, createBy, channelId); - + batchCount++; + /* * Mark incoming batches as OK at the target node because we marked * outgoing batches as OK at the source @@ -678,6 +879,7 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, "update %s_incoming_batch set status='OK', error_flag=0 where node_id='%s' and status != 'OK'", tablePrefix, engine.getNodeService().findIdentityNodeId()), true, loadId, createBy); + batchCount++; } if (isFullLoad) { @@ -688,33 +890,39 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, transaction, targetNode, beforeSql, true, - loadId, createBy); + loadId, createBy); + batchCount++; } } + return batchCount; } - private void insertSqlEventsAfterReload(Node targetNode, String nodeIdRecord, long loadId, + private int insertSqlEventsAfterReload(Node targetNode, String nodeIdRecord, long loadId, String createBy, boolean transactional, ISqlTransaction transaction, boolean reverse, List triggerHistories, Map> triggerRoutersByHistoryId, Map reloadRequests, boolean isFullLoad, String channelId) { + int totalBatchCount = 0; if (isFullLoad) { String afterSql = parameterService.getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_AFTER_SQL : ParameterConstants.INITIAL_LOAD_AFTER_SQL); if (isNotBlank(afterSql)) { insertSqlEvent(transaction, targetNode, afterSql, true, loadId, createBy); + totalBatchCount++; } } + return totalBatchCount; } - private void insertCreateBatchesForReload(Node targetNode, long loadId, String createBy, + private int insertCreateBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction, Map reloadRequests) { + int createEventsSent = 0; + if (reloadRequests != null && reloadRequests.size() > 0) { - int createEventsSent = 0; for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); @@ -756,6 +964,7 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c targetNode)) { insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true, loadId, createBy); + createEventsSent++; if (!transactional) { transaction.commit(); } @@ -764,15 +973,17 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c } } } + return createEventsSent; } - private void insertDeleteBatchesForReload(Node targetNode, long loadId, String createBy, + private int insertDeleteBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction, Map reloadRequests) { + int deleteEventsSent = 0; + if (reloadRequests != null && reloadRequests.size() > 0) { - int deleteEventsSent = 0; for (ListIterator triggerHistoryIterator = triggerHistories .listIterator(triggerHistories.size()); triggerHistoryIterator.hasPrevious();) { @@ -825,6 +1036,8 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c .isEmpty(triggerRouter.getInitialLoadDeleteStmt()))) { insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, true, null, loadId, createBy); + deleteEventsSent++; + if (!transactional) { transaction.commit(); } @@ -833,15 +1046,17 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c } } } + return deleteEventsSent; } - private void insertSQLBatchesForReload(Node targetNode, long loadId, String createBy, + private int insertSQLBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction, Map reloadRequests) { + int sqlEventsSent = 0; + if (reloadRequests != null && reloadRequests.size() > 0) { - int sqlEventsSent = 0; List copyTriggerHistories = new ArrayList(triggerHistories); Collections.reverse(copyTriggerHistories); @@ -885,15 +1100,18 @@ private void insertSQLBatchesForReload(Node targetNode, long loadId, String crea loadId, targetNode, sqlEventsSent }); } } + return sqlEventsSent; } - private void insertLoadBatchesForReload(Node targetNode, long loadId, String createBy, + private int insertLoadBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction, Map reloadRequests, ProcessInfo processInfo, String selectSqlOverride) { Map channels = engine.getConfigurationService().getChannels(false); + int totalBatchCount = 0; + for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); @@ -931,8 +1149,12 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre processInfo.setCurrentTableName(table.getName()); long rowCount = getDataCountForReload(table, targetNode, selectSql); + try { + Thread.sleep(1000); + } + catch (Exception e) {} long transformMultiplier = getTransformMultiplier(table, triggerRouter); - + // calculate the number of batches needed for table. long numberOfBatches = 1; long lastBatchSize = channel.getMaxBatchSize(); @@ -953,9 +1175,12 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre startBatchId = endBatchId; } } - + + updateTableReloadRequestsCounts(loadId, new Long(numberOfBatches).intValue(), rowCount); + totalBatchCount += numberOfBatches; + engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(), - triggerRouter, startBatchId, endBatchId); + triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount); } else { log.warn("The table defined by trigger_hist row %d no longer exists. A load will not be queue'd up for the table", triggerHistory.getTriggerHistoryId()); @@ -972,9 +1197,14 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre } } + + return totalBatchCount; } - protected int getDataCountForReload(Table table, Node targetNode, String selectSql) { + + + + protected long getDataCountForReload(Table table, Node targetNode, String selectSql) { DatabaseInfo dbInfo = platform.getDatabaseInfo(); String quote = dbInfo.getDelimiterToken(); String catalogSeparator = dbInfo.getCatalogSeparator(); @@ -990,7 +1220,7 @@ protected int getDataCountForReload(Table table, Node targetNode, String selectS } try { - int rowCount = sqlTemplate.queryForInt(sql); + long rowCount = sqlTemplateDirty.queryForLong(sql); return rowCount; } catch (Exception ex) { throw new SymmetricException("Failed to execute row count SQL while starting reload. If this is a syntax error, check your input and check " @@ -1013,8 +1243,10 @@ protected int getTransformMultiplier(Table table, TriggerRouter triggerRouter) { return transformMultiplier; } - private void insertFileSyncBatchForReload(Node targetNode, long loadId, String createBy, + private int insertFileSyncBatchForReload(Node targetNode, long loadId, String createBy, boolean transactional, ISqlTransaction transaction, Map reloadRequests, boolean isFullLoad, ProcessInfo processInfo) { + + int totalBatchCount = 0; if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE) && !Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) { ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); @@ -1031,7 +1263,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c routerid, true); if(!isFullLoad && reloadRequests != null && reloadRequests.get(fileSyncSnapshotTriggerRouter.getTriggerId() + fileSyncSnapshotTriggerRouter.getRouterId()) == null){ - return; + return totalBatchCount; } List triggerHistories = Arrays.asList(fileSyncSnapshotHistory); @@ -1044,7 +1276,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c final String FILTER_ENABLED_FILE_SYNC_TRIGGER_ROUTERS = String.format("1=(select initial_load_enabled from %s tr where t.trigger_id = tr.trigger_id AND t.router_id = tr.router_id)", TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_TRIGGER_ROUTER)); - insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories, + totalBatchCount += insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, transaction, null, processInfo, FILTER_ENABLED_FILE_SYNC_TRIGGER_ROUTERS); } else { List channels = engine.getConfigurationService().getFileSyncChannels(); @@ -1054,6 +1286,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c fileSyncSnapshotHistory, "reload_channel_id='" + channel.getChannelId() + "'", true, loadId, createBy, Status.NE, channel.getChannelId(), -1); + totalBatchCount++; if (!transactional) { transaction.commit(); } @@ -1062,6 +1295,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c } } } + return totalBatchCount; } private TriggerHistory lookupTriggerHistory(Trigger trigger) { @@ -2551,4 +2785,6 @@ public String mapRow(Row row) { return null; } } + + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index cadd6a7f4d..a14ec0e748 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -29,7 +29,7 @@ public class DataServiceSqlMap extends AbstractSqlMap { public DataServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) { super(platform, replacementTokens); - putSql("selectTableReloadRequest", "select reload_select, before_custom_sql, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); + putSql("selectTableReloadRequest", "select load_id, reload_select, before_custom_sql, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=? and create_time=?"); putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first, channel_id) values (?,?,?,?,?,?,?,?,?,?,?,?)"); @@ -41,11 +41,47 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " reload_time, channel_id, create_time, last_update_by, " + " last_update_time, trigger_id, router_id " + " from $(table_reload_request) " - + " where source_node_id=? and processed = 0 " + + " where source_node_id=? and processed = 0 and cancelled = 0 " + " order by create_time, target_node_id"); - putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set load_id = ?, last_update_time = ?, processed = 1 where target_node_id = ? and source_node_id = ? and trigger_id = ? and router_id = ? and create_time = ?"); - + putSql("selectTableReloadRequests", "select source_node_id, target_node_id, load_id, " + + " batch_count, batch_loaded_count, table_count, row_count, row_loaded_count, " + + " create_table, delete_first, reload_select, channel_id, " + + " before_custom_sql, processed, completed, cancelled, " + + " reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id, error_flag, sql_state, sql_code, sql_message" + + " from $(table_reload_request) " + + " order by load_id desc, processed, completed, last_update_time desc"); + + putSql("selectTableReloadRequestsByLoadId", "select source_node_id, target_node_id, load_id, " + + " batch_count, batch_loaded_count, table_count, row_count, row_loaded_count, " + + " create_table, delete_first, reload_select, channel_id, " + + " before_custom_sql, processed, completed, cancelled, " + + " reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id, error_flag, sql_state, sql_code, sql_message" + + " from $(table_reload_request) " + + " where load_id = ? " + + " order by processed, completed, last_update_time desc"); + + putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, batch_count = ?, processed = 1 where load_id = ?"); + + putSql("updateTableReloadRequestLoadId", "update $(table_reload_request) set load_id = ?, table_count = ?, last_update_time = ? where target_node_id = ? and source_node_id = ? and trigger_id = ? and router_id = ? and create_time = ?"); + + putSql("updateTableReloadRequestCounts", "update $(table_reload_request) set batch_count = ?, row_count = ? where load_id = ?"); + + putSql("updateTableReloadRequestLoadedCounts", "update $(table_reload_request) set " + + " completed = (case when batch_loaded_count + 1 >= batch_count then 1 else 0 end), " + + " batch_loaded_count = batch_loaded_count + ?, row_loaded_count = row_loaded_count + ?, last_update_time = ? " + + " where load_id = ?"); + + putSql("updateTableReloadRequestCounts", "update $(table_reload_request) set " + + " batch_count = batch_count + ?, row_count = row_count + ?, last_update_time = ? " + + " where load_id = ?"); + + putSql("updateTableReloadRequestCancelled", "update $(table_reload_request) set " + + " cancelled = 1, completed = 1, last_update_time = ? " + + " where load_id = ?"); + // Note that the order by data_id is done appended in code putSql("selectEventDataToExtractSql", "" diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 1c6e1cb0f1..4983f46532 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -23,6 +23,7 @@ import static org.jumpmind.symmetric.common.TableConstants.SYM_NODE_HOST; import static org.jumpmind.symmetric.common.TableConstants.getTableName; +import java.io.Serializable; import java.sql.Types; import java.util.ArrayList; import java.util.Collection; @@ -30,7 +31,6 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -675,18 +675,20 @@ public List findOutgoingBatchSummaryByChannel(Status... st return sqlTemplateDirty.query(sql, new OutgoingBatchSummaryMapper(true, true), args); } - public Set getActiveLoads(String sourceNodeId) { - Set loads = new HashSet(); + public Map getActiveLoadCounts() { + return sqlTemplateDirty.queryForMap(getSql("getActiveLoadCountsSql"), new ISqlRowMapper() { - List inProcess = sqlTemplateDirty.query(getSql("getActiveLoadsSql"), new ISqlRowMapper() { @Override - public Long mapRow(Row rs) { - return rs.getLong("load_id"); + public LoadCounts mapRow(Row rs) { + LoadCounts lc = new LoadCounts(); + lc.setLoadedBatchCount(rs.getInt("loaded_batch_count")); + lc.setLoadedByteCount(rs.getLong("loaded_byte_count")); + lc.setLoadedRowCount(rs.getLong("loaded_row_count")); + + return lc; } - }, sourceNodeId); - loads.addAll(inProcess); - - return loads; + + }, "load_id"); } public List getQueuedLoads(String sourceNodeId) { @@ -699,7 +701,7 @@ public LoadSummary getLoadSummary(long loadId) { private class LoadSummaryMapper implements ISqlRowMapper { public LoadSummary mapRow(Row rs) { - LoadSummary summary = new LoadSummary(); + LoadSummary summary = new LoadSummary(); // summary.setLoadId(rs.getLong("load_id")); summary.setNodeId(rs.getString("node_id")); summary.setCreateBy(rs.getString("last_update_by")); @@ -1148,4 +1150,32 @@ public OutgoingBatch mapRow(Row rs) { } } + + public class LoadCounts implements Serializable { + private static final long serialVersionUID = 1L; + private long loadedRowCount; + private int loadedBatchCount; + private long loadedByteCount; + + public long getLoadedRowCount() { + return loadedRowCount; + } + public void setLoadedRowCount(long loadedRowCount) { + this.loadedRowCount = loadedRowCount; + } + public int getLoadedBatchCount() { + return loadedBatchCount; + } + public void setLoadedBatchCount(int loadedBatchCount) { + this.loadedBatchCount = loadedBatchCount; + } + public long getLoadedByteCount() { + return loadedByteCount; + } + public void setLoadedByteCount(long loadedByteCount) { + this.loadedByteCount = loadedByteCount; + } + + + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 7e355a30da..21477464d9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -198,11 +198,12 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + "group by b.load_id, b.node_id, b.status, b.channel_id, b.create_by " + "order by b.load_id desc "); - putSql("getActiveLoadsSql", - "select r.load_id " + putSql("getActiveLoadCountsSql", + "select r.load_id, count(ob.batch_id) as loaded_batch_count, sum(ob.data_row_count) as loaded_row_count, " + + "sum(ob.byte_count) as loaded_byte_count " + "from $(table_reload_request) r " + "join $(outgoing_batch) ob on ob.load_id = r.load_id " - + "where ob.status != 'OK' and ob.status != 'IG' and r.source_node_id = ? " + + "where r.completed = 0 and ob.status = 'OK' and ob.reload_row_count > 0 " + "group by r.load_id"); putSql("getLoadSummaryUnprocessedSql", diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java index 6bcd3d0cca..14f9f9464e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java @@ -59,11 +59,13 @@ public void save(ChannelStats stats) { stats.getDataExtracted(), stats.getDataBytesExtracted(), stats.getDataExtractedErrors(), stats.getDataSent(), stats.getDataBytesSent(), stats.getDataSentErrors(), stats.getDataLoaded(), - stats.getDataBytesLoaded(), stats.getDataLoadedErrors() }, new int[] { + stats.getDataBytesLoaded(), stats.getDataLoadedErrors(), + stats.getDataLoadedOutgoing(), stats.getDataBytesLoadedOutgoing(), + stats.getDataLoadedOutgoingErrors()}, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.BIGINT }); + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT }); } public void save(JobStats stats) { @@ -172,6 +174,9 @@ public ChannelStats mapRow(Row rs) { stats.setDataLoaded(rs.getLong("data_loaded")); stats.setDataBytesLoaded(rs.getLong("data_bytes_loaded")); stats.setDataLoadedErrors(rs.getLong("data_loaded_errors")); + stats.setDataLoadedOutgoing(rs.getLong("data_loaded_outgoing")); + stats.setDataLoadedOutgoingErrors(rs.getLong("data_loaded_outgoing_errors")); + stats.setDataBytesLoadedOutgoing(rs.getLong("data_bytes_loaded_outgoing")); return stats; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java index cc6a03c3d7..5841012b87 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticServiceSqlMap.java @@ -35,15 +35,17 @@ public StatisticServiceSqlMap(IDatabasePlatform platform, Map re " data_routed, data_unrouted, data_event_inserted, " + " data_extracted, data_bytes_extracted, data_extracted_errors, " + " data_sent, data_bytes_sent, data_sent_errors, " + -" data_loaded, data_bytes_loaded, data_loaded_errors) " + -" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " ); +" data_loaded, data_bytes_loaded, data_loaded_errors, " + +" data_loaded_outgoing, data_bytes_loaded_outgoing, data_loaded_outgoing_errors) " + +" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " ); putSql("selectChannelStatsSql" ,"" + "select node_id, host_name, channel_id, start_time, end_time, " + " data_routed, data_unrouted, data_event_inserted, " + " data_extracted, data_bytes_extracted, data_extracted_errors, " + " data_sent, data_bytes_sent, data_sent_errors, " + -" data_loaded, data_bytes_loaded, data_loaded_errors " + +" data_loaded, data_bytes_loaded, data_loaded_errors, " + +" data_loaded_outgoing, data_bytes_loaded_outgoing, data_loaded_outgoing_errors " + " from $(node_host_channel_stats) " + " where start_time >= ? and end_time <= ? and node_id=? order by start_time asc " ); @@ -56,6 +58,9 @@ public StatisticServiceSqlMap(IDatabasePlatform platform, Map re " sum(data_bytes_sent) as data_bytes_sent, sum(data_sent_errors) as data_sent_errors, " + " sum(data_loaded) as data_loaded, sum(data_bytes_loaded) as data_bytes_loaded, " + " sum(data_loaded_errors) as data_loaded_errors " + +" sum(data_loaded_outgoing) as data_loaded_outgoing, " + +" sum(data_bytes_loaded_outgoing) as data_bytes_loaded_outgoing, " + +" sum(data_loaded_outgoing_errors) as data_loaded_outgoing_errors " + " from $(node_host_channel_stats) " + " where start_time >= ? and end_time <= ? and node_id=? " + " and channel_id not in ('heartbeat', 'config') " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/ChannelStats.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/ChannelStats.java index 024e7e24d2..26b5b3f460 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/ChannelStats.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/ChannelStats.java @@ -37,6 +37,9 @@ public class ChannelStats extends AbstractNodeHostStats { private long dataLoaded; private long dataBytesLoaded; private long dataLoadedErrors; + private long dataLoadedOutgoing; + private long dataBytesLoadedOutgoing; + private long dataLoadedOutgoingErrors; public ChannelStats() {} @@ -59,6 +62,9 @@ public void add(ChannelStats stats) { dataLoaded += stats.getDataLoaded(); dataBytesLoaded += stats.getDataBytesLoaded(); dataLoadedErrors += stats.getDataLoadedErrors(); + dataLoadedOutgoing += stats.getDataLoadedOutgoing(); + dataBytesLoadedOutgoing += stats.getDataBytesLoadedOutgoing(); + dataLoadedOutgoingErrors += stats.getDataLoadedOutgoingErrors(); } public String getChannelId() { @@ -213,6 +219,40 @@ public void incrementDataSent(long count) { this.dataSent += count; } + public long getDataLoadedOutgoing() { + return dataLoadedOutgoing; + } + + public void setDataLoadedOutgoing(long dataLoadedOutgoing) { + this.dataLoadedOutgoing = dataLoadedOutgoing; + } + + public void incrementDataLoadedOutgoing(long dataLoadedOutgoing) { + this.dataLoadedOutgoing += dataLoadedOutgoing; + } + + public long getDataBytesLoadedOutgoing() { + return dataBytesLoadedOutgoing; + } + + public void setDataBytesLoadedOutgoing(long dataBytesLoadedOutgoing) { + this.dataBytesLoadedOutgoing = dataBytesLoadedOutgoing; + } + public void incrementDataBytesLoadedOutgoing(long dataBytesLoadedOutgoing) { + this.dataBytesLoadedOutgoing += dataBytesLoadedOutgoing; + } + + public long getDataLoadedOutgoingErrors() { + return dataLoadedOutgoingErrors; + } + + public void setDataLoadedOutgoingErrors(long dataLoadedOutgoingErrors) { + this.dataLoadedOutgoingErrors = dataLoadedOutgoingErrors; + } + + public void incrementDataLoadedOutgoingErrors(long dataLoadedOutgoingErrors) { + this.dataLoadedOutgoingErrors += dataLoadedOutgoingErrors; + } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java index a8b19a83f9..f6d8ab5ef2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java @@ -65,6 +65,12 @@ public void addRouterStats(long startDataId, long endDataId, long dataReadCount, public void incrementDataLoaded(String channelId, long count); + public void incrementDataLoadedOutgoingErrors(String channelId, long count); + + public void incrementDataBytesLoadedOutgoing(String channelId, long count); + + public void incrementDataLoadedOutgoing(String channelId, long count); + public void incrementDataBytesSent(String channelId, long count); public void incrementDataSent(String channelId, long count); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 1f8e59fdcc..bdcb1b490b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -314,6 +314,33 @@ public void incrementDataLoadedErrors(String channelId, long count) { channelStatsLock.release(); } } + + public void incrementDataLoadedOutgoing(String channelId, long count) { + channelStatsLock.acquireUninterruptibly(); + try { + getChannelStats(channelId).incrementDataLoadedOutgoing(count); + } finally { + channelStatsLock.release(); + } + } + + public void incrementDataBytesLoadedOutgoing(String channelId, long count) { + channelStatsLock.acquireUninterruptibly(); + try { + getChannelStats(channelId).incrementDataBytesLoadedOutgoing(count); + } finally { + channelStatsLock.release(); + } + } + + public void incrementDataLoadedOutgoingErrors(String channelId, long count) { + channelStatsLock.acquireUninterruptibly(); + try { + getChannelStats(channelId).incrementDataLoadedOutgoingErrors(count); + } finally { + channelStatsLock.release(); + } + } public void incrementRestart() { hostStatsLock.acquireUninterruptibly(); diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 0ec71b1792..2f0f7d5cc5 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -139,6 +139,16 @@ + + + + + + + + + + @@ -481,6 +491,9 @@ + + + @@ -762,11 +775,22 @@ - + + + + + + + + + + + + - + @@ -917,4 +941,4 @@
- + \ No newline at end of file diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java index 3f86fd2073..ef6be61c0b 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java @@ -172,6 +172,8 @@ public RouterStats getRouterStatsByBatch(Long batchId) { public void removeRouterStatsByBatch(Long batchId) { } + + @Override public void addJobStats(String targetNodeId, int targetNodeCount, String jobName, long startTime, long endTime, long processedCount) { @@ -184,5 +186,23 @@ public TreeMap> getNodeStatsForPeriod(Date start // TODO Auto-generated method stub return null; } + + @Override + public void incrementDataLoadedOutgoingErrors(String channelId, long count) { + // TODO Auto-generated method stub + + } + + @Override + public void incrementDataBytesLoadedOutgoing(String channelId, long count) { + // TODO Auto-generated method stub + + } + + @Override + public void incrementDataLoadedOutgoing(String channelId, long count) { + // TODO Auto-generated method stub + + } } \ No newline at end of file