diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 1596c812fb..ab17e9a2e3 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -147,9 +147,10 @@ public void extractConfigurationStandalone(Node node, OutputStream out) { * load for some reason on the client the batch status will NOT reflect the * failure. */ - public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) { + public void extractConfigurationStandalone(Node targetNode, Writer writer, + String... tablesToExclude) { Node sourceNode = nodeService.findIdentity(); - + Batch batch = new Batch(BatchAck.VIRTUAL_BATCH_FOR_REGISTRATION, Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), targetNode.getNodeId(), false); @@ -157,9 +158,9 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer, Strin targetNode.getNodeGroupId()); List triggerRouters = triggerRouterService - .buildTriggerRoutersForSymmetricTables(StringUtils.isBlank(targetNode - .getSymmetricVersion()) ? Version.version() : targetNode.getSymmetricVersion(), - nodeGroupLink, tablesToExclude); + .buildTriggerRoutersForSymmetricTables( + StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version.version() + : targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude); List initialLoadEvents = new ArrayList( triggerRouters.size() * 2); @@ -202,8 +203,8 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer, Strin if (!triggerRouter.getTrigger().getSourceTableName() .endsWith(TableConstants.SYM_NODE_IDENTITY)) { - initialLoadEvents - .add(new SelectFromTableEvent(targetNode, triggerRouter, triggerHistory)); + initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter, + triggerHistory)); } else { Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT, triggerHistory.getSourceTableName(), null, triggerHistory, triggerRouter @@ -439,6 +440,12 @@ protected OutgoingBatch extractOutgoingBatch(Node targetNode, currentBatch.isCommonFlag()); batch.setIgnored(true); try { + IStagedResource resource = stagingManager.find( + Constants.STAGING_CATEGORY_OUTGOING, batch.getStagedLocation(), + batch.getBatchId()); + if (resource != null) { + resource.delete(); + } DataContext ctx = new DataContext(batch); ctx.put("targetNode", targetNode); ctx.put("sourceNode", sourceNode); @@ -532,7 +539,8 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) { } } - protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch currentBatch, IDataWriter dataWriter) { + protected OutgoingBatch sendOutgoingBatch(Node targetNode, OutgoingBatch currentBatch, + IDataWriter dataWriter) { if (currentBatch.getStatus() != Status.OK) { currentBatch.setSentCount(currentBatch.getSentCount() + 1); @@ -669,7 +677,7 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, class SelectFromSymDataSource implements IExtractDataReaderSource { private Batch batch; - + private OutgoingBatch outgoingBatch; private Table currentTable; @@ -777,7 +785,7 @@ public void close() { } class SelectFromTableSource implements IExtractDataReaderSource { - + private OutgoingBatch outgoingBatch; private Batch batch; @@ -796,7 +804,8 @@ class SelectFromTableSource implements IExtractDataReaderSource { private TriggerRouter triggerRouter; - public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch, SelectFromTableEvent event) { + public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch, + SelectFromTableEvent event) { this.outgoingBatch = outgoingBatch; List initialLoadEvents = new ArrayList( 1); @@ -867,7 +876,7 @@ public CsvData next() { data = next(); } } - + if (data != null && outgoingBatch != null) { outgoingBatch.incrementDataEventCount(); outgoingBatch.incrementEventCount(data.getDataEventType()); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index c35d8820a6..a211560ffa 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -100,34 +100,29 @@ public void updateOutgoingBatches(List outgoingBatches) { public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { outgoingBatch.setLastUpdatedTime(new Date()); outgoingBatch.setLastUpdatedHostName(AppUtils.getServerId()); - sqlTemplate - .update(getSql("updateOutgoingBatchSql"), - new Object[] { outgoingBatch.getStatus().name(), - outgoingBatch.isLoadFlag() ? 1 : 0, - outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), - outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(), - outgoingBatch.getLoadCount(), outgoingBatch.getDataEventCount(), - outgoingBatch.getReloadEventCount(), - outgoingBatch.getInsertEventCount(), - outgoingBatch.getUpdateEventCount(), - outgoingBatch.getDeleteEventCount(), - outgoingBatch.getOtherEventCount(), - outgoingBatch.getIgnoreCount(), - outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), - outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), - outgoingBatch.getExtractMillis(), outgoingBatch.getSqlState(), - outgoingBatch.getSqlCode(), - StringUtils.abbreviate(outgoingBatch.getSqlMessage(), 1000), - outgoingBatch.getFailedDataId(), - outgoingBatch.getLastUpdatedHostName(), - outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, - new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.NUMERIC, - Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, - Types.NUMERIC, Types.VARCHAR }); + sqlTemplate.update( + getSql("updateOutgoingBatchSql"), + new Object[] { outgoingBatch.getStatus().name(), + outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0, + outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(), + outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), + outgoingBatch.getDataEventCount(), outgoingBatch.getReloadEventCount(), + outgoingBatch.getInsertEventCount(), outgoingBatch.getUpdateEventCount(), + outgoingBatch.getDeleteEventCount(), outgoingBatch.getOtherEventCount(), + outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), + outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(), + outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), + outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), + StringUtils.abbreviate(outgoingBatch.getSqlMessage(), 1000), + outgoingBatch.getFailedDataId(), outgoingBatch.getLastUpdatedHostName(), + outgoingBatch.getLastUpdatedTime(), outgoingBatch.getBatchId(), + outgoingBatch.getNodeId() }, + new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.VARCHAR, + Types.NUMERIC, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.TIMESTAMP, + Types.NUMERIC, Types.VARCHAR }); } public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { @@ -150,16 +145,17 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo } transaction.prepareAndExecute(getSql("insertOutgoingBatchSql"), batchId, outgoingBatch .getNodeId(), outgoingBatch.getChannelId(), outgoingBatch.getStatus().name(), - outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, outgoingBatch.getReloadEventCount(), - outgoingBatch.getOtherEventCount(), outgoingBatch.getLastUpdatedHostName()); + outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, + outgoingBatch.getReloadEventCount(), outgoingBatch.getOtherEventCount(), + outgoingBatch.getLastUpdatedHostName()); outgoingBatch.setBatchId(batchId); } public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) { List list = (List) sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"), - new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId }, - new int[] { Types.NUMERIC, Types.VARCHAR }); + new OutgoingBatchMapper(true, false), new Object[] { batchId, nodeId }, new int[] { + Types.NUMERIC, Types.VARCHAR }); if (list != null && list.size() > 0) { return list.get(0); } else { @@ -178,11 +174,18 @@ public int countOutgoingBatches(List nodeIds, List channels, params.put("NODES", nodeIds); params.put("CHANNELS", channels); params.put("STATUSES", toStringList(statuses)); - return sqlTemplate - .queryForInt( - getSql("selectCountBatchesPrefixSql", - containsOnlyErrorStatus(statuses) ? "selectOutgoingBatchByChannelWithErrorSql" - : "selectOutgoingBatchByChannelAndStatusSql"), params); + String sql = null; + if (containsOnlyStatus(Status.ER, statuses)) { + sql = getSql("selectCountBatchesPrefixSql", + "selectOutgoingBatchByChannelWithErrorSql"); + } else if (containsOnlyStatus(Status.IG, statuses)) { + sql = getSql("selectCountBatchesPrefixSql", + "selectOutgoingBatchByChannelWithIgnoreSql"); + } else { + sql = getSql("selectCountBatchesPrefixSql", + "selectOutgoingBatchByChannelAndStatusSql"); + } + return sqlTemplate.queryForInt(sql, params); } else { return 0; } @@ -206,10 +209,20 @@ public List listOutgoingBatches(List nodeIds, List toStringList(List statuses) { } - protected boolean containsOnlyErrorStatus(List statuses) { - return statuses.size() == 1 && statuses.get(0) == OutgoingBatch.Status.ER; + protected boolean containsOnlyStatus(OutgoingBatch.Status status, + List statuses) { + return statuses.size() == 1 && statuses.get(0) == status; } /** @@ -246,7 +260,8 @@ public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChan maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, true), new Object[] { node.getNodeId(), OutgoingBatch.Status.NE.name(), OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(), - OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(), OutgoingBatch.Status.IG.name() }, null); + OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name(), + OutgoingBatch.Status.IG.name() }, null); OutgoingBatches batches = new OutgoingBatches(list); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 9400486a13..56a9a4b5de 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -36,6 +36,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map 0 " ); putSql("findOutgoingBatchSql" ,"" + "where batch_id=? and node_id=? " ); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java index c5b96a7d4a..4c4be8d93f 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/AbstractTransportManager.java @@ -102,7 +102,11 @@ protected String getAcknowledgementData(String nodeId, List list) append(builder, WebConstants.ACK_NETWORK_MILLIS + batchId, batch.getNetworkMillis()); append(builder, WebConstants.ACK_FILTER_MILLIS + batchId, batch.getFilterMillis()); append(builder, WebConstants.ACK_DATABASE_MILLIS + batchId, batch.getDatabaseMillis()); - append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount()); + append(builder, WebConstants.ACK_BYTE_COUNT + batchId, batch.getByteCount()); + + if (batch.getIgnoreCount() > 0) { + append(builder, WebConstants.ACK_IGNORE_COUNT + batchId, batch.getIgnoreCount()); + } if (batch.getStatus() == Status.ER) { append(builder, WebConstants.ACK_SQL_STATE + batchId, batch.getSqlState()); @@ -151,7 +155,8 @@ private static BatchAck getBatchInfo(Map parameters, l batchInfo.setNetworkMillis(getParamAsNum(parameters, WebConstants.ACK_NETWORK_MILLIS + batchId)); batchInfo.setFilterMillis(getParamAsNum(parameters, WebConstants.ACK_FILTER_MILLIS + batchId)); batchInfo.setDatabaseMillis(getParamAsNum(parameters, WebConstants.ACK_DATABASE_MILLIS + batchId)); - batchInfo.setByteCount(getParamAsNum(parameters, WebConstants.ACK_BYTE_COUNT + batchId)); + batchInfo.setByteCount(getParamAsNum(parameters, WebConstants.ACK_BYTE_COUNT + batchId)); + batchInfo.setIgnored(getParamAsBoolean(parameters, WebConstants.ACK_IGNORE_COUNT + batchId)); String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "").trim(); batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK)); @@ -178,7 +183,11 @@ protected static Map getParametersFromQueryUrl(String parameterS private static long getParamAsNum(Map parameters, String parameterName) { return NumberUtils.toLong(getParam(parameters, parameterName)); - } + } + + private static boolean getParamAsBoolean(Map parameters, String parameterName) { + return getParamAsNum(parameters, parameterName) > 0; + } private static String getParam(Map parameters, String parameterName, String defaultValue) { String value = getParam(parameters, parameterName); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java index 798eeed863..5fb2e52efc 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/web/WebConstants.java @@ -58,7 +58,9 @@ public class WebConstants { public static final String ACK_DATABASE_MILLIS = "database-"; - public static final String ACK_BYTE_COUNT = "byteCount-"; + public static final String ACK_BYTE_COUNT = "byteCount-"; + + public static final String ACK_IGNORE_COUNT = "ignoreCount-"; public static final String ACK_SQL_STATE = "sqlState-"; diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java index 44c681bba0..f16ee52e2f 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java @@ -74,9 +74,6 @@ public void start(Batch batch) { println(CsvConstants.CHANNEL, batch.getChannelId()); } println(CsvConstants.BATCH, Long.toString(batch.getBatchId())); - if (batch.isIgnored()) { - println(CsvConstants.IGNORE); - } } public boolean start(Table table) { @@ -143,6 +140,11 @@ public void end(Table table) { } final public void end(Batch batch, boolean inError) { + + if (batch.isIgnored()) { + println(CsvConstants.IGNORE); + } + if (!inError) { println(CsvConstants.COMMIT, Long.toString(batch.getBatchId())); endBatch(batch); diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java index 47bd3c5ffc..779f14d568 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java @@ -738,6 +738,9 @@ public void end(Table table) { public void end(Batch batch, boolean inError) { this.lastData = null; this.currentDmlStatement = null; + if (batch.isIgnored()) { + getStatistics().get(batch).increment(DataWriterStatisticConstants.IGNORECOUNT); + } if (!inError) { notifyFiltersBatchComplete(); commit();