diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java index ba05f69d62..9348ad2b21 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java @@ -184,13 +184,10 @@ public List getBatchesForChannelWindows(Node targetNode, NodeChan if (windows != null) { if (batches != null && batches.size() > 0) { - if (channel.isEnabled() && inTimeWindow(windows, targetNode.getTimezoneOffset())) { - int max = channel.getMaxBatchToSend(); - int count = 0; + if (inTimeWindow(windows, targetNode.getTimezoneOffset())) { for (OutgoingBatch outgoingBatch : batches) { - if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && count < max) { + if (channel.getChannelId().equals(outgoingBatch.getChannelId())) { keeping.add(outgoingBatch); - count++; } } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index 540dadfe03..e76ada0129 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -78,7 +78,9 @@ public interface IConfigurationService { public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis); - public Channel getChannel (String channelId); + public Channel getChannel (String channelId); + + public Map getChannels(boolean refreshCache); public NodeChannel getNodeChannel(String channelId, String nodeId, boolean refreshExtractMillis); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 0cb9ea8e56..adee45b2de 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -40,7 +40,7 @@ public interface IOutgoingBatchService { public OutgoingBatch findOutgoingBatch(long batchId); - public OutgoingBatches getOutgoingBatches(Node node); + public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChannels); public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index b526e46f2d..e77b0f4d36 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -63,6 +63,10 @@ public class ConfigurationService extends AbstractService implements IConfigurat private INodeService nodeService; private Map> nodeChannelCache; + + private Map channelsCache; + + private long channelCacheTime; private long nodeChannelCacheTime; @@ -244,7 +248,7 @@ public List getNodeChannels(final String nodeId, boolean refreshExt nodeChannelCache = new HashMap>(); nodeChannelCacheTime = System.currentTimeMillis(); } - nodeChannels = sqlTemplate.query(getSql("selectChannelsSql"), + nodeChannels = sqlTemplate.query(getSql("selectNodeChannelsSql"), new ISqlRowMapper() { public NodeChannel mapRow(Row row) { NodeChannel nodeChannel = new NodeChannel(); @@ -309,6 +313,7 @@ public Object mapRow(Row row) { public void reloadChannels() { synchronized (this) { nodeChannelCache = null; + channelsCache = null; } } @@ -478,6 +483,52 @@ public ChannelMap getSuspendIgnoreChannelLists(final String nodeId) { } return map; } + + public Map getChannels(boolean refreshCache) { + long channelCacheTimeoutInMs = parameterService.getLong( + ParameterConstants.CACHE_TIMEOUT_CHANNEL_IN_MS, 60000); + Map channels = channelsCache; + if (System.currentTimeMillis() - channelCacheTime >= channelCacheTimeoutInMs + || channels == null || refreshCache) { + synchronized (this) { + channels = channelsCache; + if (System.currentTimeMillis() - channelCacheTime >= channelCacheTimeoutInMs + || channels == null || refreshCache) { + channels = new HashMap(); + List list = sqlTemplate.query(getSql("selectChannelsSql"), new ISqlRowMapper () { + public Channel mapRow(Row row) { + Channel channel = new Channel(); + channel.setChannelId(row.getString("channel_id")); + channel.setProcessingOrder(row.getInt("processing_order")); + channel.setMaxBatchSize(row.getInt("max_batch_size")); + channel.setEnabled(row.getBoolean("enabled")); + channel.setMaxBatchToSend(row.getInt("max_batch_to_send")); + channel.setMaxDataToRoute(row.getInt("max_data_to_route")); + channel.setUseOldDataToRoute(row + .getBoolean("use_old_data_to_route")); + channel.setUseRowDataToRoute(row + .getBoolean("use_row_data_to_route")); + channel.setUsePkDataToRoute(row + .getBoolean("use_pk_data_to_route")); + channel.setContainsBigLob(row + .getBoolean("contains_big_lob")); + channel.setBatchAlgorithm(row.getString("batch_algorithm")); + channel.setExtractPeriodMillis(row + .getLong("extract_period_millis")); + return channel; + } + }); + for (Channel channel : list) { + channels.put(channel.getChannelId(), channel); + } + channelsCache = channels; + channelCacheTime = System.currentTimeMillis(); + } + } + } + + return channels; + } public Channel getChannel(String channelId) { NodeChannel nodeChannel = getNodeChannel(channelId, false); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java index 7ded2aeca0..f815b5b717 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java @@ -42,8 +42,15 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform, + " $(node_group_link) where source_node_group_id = ? "); putSql("isChannelInUseSql", "select count(*) from $(trigger) where channel_id = ? "); - + putSql("selectChannelsSql", + "select c.channel_id, c.processing_order, c.max_batch_size, c.enabled, " + + " c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, " + + " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " + + " c.batch_algorithm, c.extract_period_millis " + + " from $(channel) c order by c.processing_order asc, c.channel_id "); + + putSql("selectNodeChannelsSql", "" + "select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, " + " c.max_batch_size, c.enabled, c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, c.batch_algorithm, nc.last_extract_time, c.extract_period_millis " diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index ed8d3b46ef..1819c28086 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -270,7 +270,7 @@ public List extract(Node targetNode, IOutgoingTransport targetTra routerService.routeData(); } - OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode); + OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode, false); if (batches.containsBatches()) { @@ -334,8 +334,8 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, currentBatch.getNodeId(), currentBatch.getBatchId()); if (previouslyExtracted != null && previouslyExtracted.exists()) { log.info( - "We have already extracted batch {}. Using the existing extraction. To force re-extraction, please restart this instance of SymmetricDS.", - currentBatch.getBatchId()); + "We have already extracted batch {}. Using the existing extraction: {}", + currentBatch.getBatchId(), previouslyExtracted); } else { currentBatch.setStatus(OutgoingBatch.Status.QY); currentBatch.setExtractCount(currentBatch.getExtractCount() + 1); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 3b75de9b17..8f5816abb4 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -24,7 +24,9 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.jumpmind.db.sql.ISqlRowMapper; @@ -37,6 +39,7 @@ import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.db.SequenceIdentifier; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeChannel; import org.jumpmind.symmetric.model.NodeSecurity; @@ -74,7 +77,7 @@ public OutgoingBatchService(IParameterService parameterService, public void markAllAsSentForNode(Node node) { OutgoingBatches batches = null; do { - batches = getOutgoingBatches(node); + batches = getOutgoingBatches(node, true); for (OutgoingBatch outgoingBatch : batches.getBatches()) { outgoingBatch.setStatus(Status.OK); outgoingBatch.setErrorFlag(false); @@ -152,7 +155,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo public OutgoingBatch findOutgoingBatch(long batchId) { List list = (List) sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"), - new OutgoingBatchMapper(), new Object[] { batchId }, new int[] { Types.NUMERIC }); + new OutgoingBatchMapper(true, false), new Object[] { batchId }, new int[] { Types.NUMERIC }); if (list != null && list.size() > 0) { return list.get(0); } else { @@ -197,7 +200,7 @@ public List listOutgoingBatches(List nodeIds, List(0); @@ -223,13 +226,13 @@ protected SqlList toStringList(String replacementToken, List list = (List) sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql"), - maxNumberOfBatchesToSelect, new OutgoingBatchMapper(), + maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, true), new Object[] { node.getNodeId(), OutgoingBatch.Status.NE.name(), OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(), OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name() }, null); @@ -265,7 +268,7 @@ public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatc OutgoingBatches batches = new OutgoingBatches(); batches.setBatches(sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchRangeSql"), - new OutgoingBatchMapper(), Long.parseLong(startBatchId), Long.parseLong(endBatchId))); + new OutgoingBatchMapper(true, false), Long.parseLong(startBatchId), Long.parseLong(endBatchId))); return batches; } @@ -273,7 +276,7 @@ public OutgoingBatches getOutgoingBatchErrors(int maxRows) { OutgoingBatches batches = new OutgoingBatches(); batches.setBatches(sqlTemplate.query( getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchErrorsSql"), maxRows, - new OutgoingBatchMapper(), null, null)); + new OutgoingBatchMapper(true, false), null, null)); return batches; } @@ -341,46 +344,64 @@ public OutgoingBatchSummary mapRow(Row rs) { } class OutgoingBatchMapper implements ISqlRowMapper { - public OutgoingBatch mapRow(Row rs) { - OutgoingBatch batch = new OutgoingBatch(); - batch.setNodeId(rs.getString("node_id")); - batch.setChannelId(rs.getString("channel_id")); - batch.setStatus(rs.getString("status")); - batch.setByteCount(rs.getLong("byte_count")); - batch.setExtractCount(rs.getLong("extract_count")); - batch.setSentCount(rs.getLong("sent_count")); - batch.setLoadCount(rs.getLong("load_count")); - batch.setDataEventCount(rs.getLong("data_event_count")); - batch.setReloadEventCount(rs.getLong("reload_event_count")); - batch.setInsertEventCount(rs.getLong("insert_event_count")); - batch.setUpdateEventCount(rs.getLong("update_event_count")); - batch.setDeleteEventCount(rs.getLong("delete_event_count")); - batch.setOtherEventCount(rs.getLong("other_event_count")); - batch.setRouterMillis(rs.getLong("router_millis")); - batch.setNetworkMillis(rs.getLong("network_millis")); - batch.setFilterMillis(rs.getLong("filter_millis")); - batch.setLoadMillis(rs.getLong("load_millis")); - batch.setExtractMillis(rs.getLong("extract_millis")); - batch.setSqlState(rs.getString("sql_state")); - batch.setSqlCode(rs.getInt("sql_code")); - batch.setSqlMessage(rs.getString("sql_message")); - batch.setFailedDataId(rs.getLong("failed_data_id")); - batch.setLastUpdatedHostName(rs.getString("last_update_hostname")); - batch.setLastUpdatedTime(rs.getDateTime("last_update_time")); - batch.setCreateTime(rs.getDateTime("create_time")); - batch.setBatchId(rs.getLong("batch_id")); - batch.setLoadFlag(rs.getBoolean("load_flag")); - batch.setErrorFlag(rs.getBoolean("error_flag")); - return batch; - } - } - public void setNodeService(INodeService nodeService) { - this.nodeService = nodeService; - } + private boolean includeDisabledChannels = false; + private boolean limitBasedOnMaxBatchToSend = false; + private Map channels; + private Map countByChannel; + + public OutgoingBatchMapper(boolean includeDisabledChannels, + boolean limitBasedOnMaxBatchToSend) { + this.includeDisabledChannels = includeDisabledChannels; + this.limitBasedOnMaxBatchToSend = limitBasedOnMaxBatchToSend; + this.channels = configurationService.getChannels(false); + this.countByChannel = new HashMap(); + } - public void setConfigurationService(IConfigurationService configurationService) { - this.configurationService = configurationService; + public OutgoingBatch mapRow(Row rs) { + String channelId = rs.getString("channel_id"); + Channel channel = channels.get(channelId); + Integer count = countByChannel.get(channelId); + if (count == null) { + count = 0; + } + if (channel != null && (includeDisabledChannels || channel.isEnabled()) + && (!limitBasedOnMaxBatchToSend || count <= channel.getMaxBatchToSend())) { + count++; + OutgoingBatch batch = new OutgoingBatch(); + batch.setChannelId(channelId); + batch.setNodeId(rs.getString("node_id")); + batch.setStatus(rs.getString("status")); + batch.setByteCount(rs.getLong("byte_count")); + batch.setExtractCount(rs.getLong("extract_count")); + batch.setSentCount(rs.getLong("sent_count")); + batch.setLoadCount(rs.getLong("load_count")); + batch.setDataEventCount(rs.getLong("data_event_count")); + batch.setReloadEventCount(rs.getLong("reload_event_count")); + batch.setInsertEventCount(rs.getLong("insert_event_count")); + batch.setUpdateEventCount(rs.getLong("update_event_count")); + batch.setDeleteEventCount(rs.getLong("delete_event_count")); + batch.setOtherEventCount(rs.getLong("other_event_count")); + batch.setRouterMillis(rs.getLong("router_millis")); + batch.setNetworkMillis(rs.getLong("network_millis")); + batch.setFilterMillis(rs.getLong("filter_millis")); + batch.setLoadMillis(rs.getLong("load_millis")); + batch.setExtractMillis(rs.getLong("extract_millis")); + batch.setSqlState(rs.getString("sql_state")); + batch.setSqlCode(rs.getInt("sql_code")); + batch.setSqlMessage(rs.getString("sql_message")); + batch.setFailedDataId(rs.getLong("failed_data_id")); + batch.setLastUpdatedHostName(rs.getString("last_update_hostname")); + batch.setLastUpdatedTime(rs.getDateTime("last_update_time")); + batch.setCreateTime(rs.getDateTime("create_time")); + batch.setBatchId(rs.getLong("batch_id")); + batch.setLoadFlag(rs.getBoolean("load_flag")); + batch.setErrorFlag(rs.getBoolean("error_flag")); + return batch; + } else { + return null; + } + } } } diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties index 9853d4cd61..d46033f848 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties @@ -809,7 +809,7 @@ data.id.increment.by=1 # into memory for the next data extraction. # DatabaseOverridable: true # Tags: extract -outgoing.batches.max.to.select=1000 +outgoing.batches.max.to.select=50000 # The class name for the Security Service to use for encrypting and # decrypting database passwords diff --git a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java index a693453e94..d2ed3a8b85 100644 --- a/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java +++ b/symmetric/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java @@ -65,19 +65,19 @@ public void testMultiChannelRoutingToEveryone() { final int EXPECTED_BATCHES = getDbDialect().supportsTransactionId() ? 16 : 17; - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel, otherChannel); Assert.assertEquals(EXPECTED_BATCHES, batches.getBatches().size()); Assert.assertEquals(getDbDialect().supportsTransactionId() ? 1 : 2, countBatchesForChannel(batches, testChannel)); Assert.assertEquals(15, countBatchesForChannel(batches, otherChannel)); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false); filterForChannels(batches, testChannel, otherChannel); // Node 2 has sync disabled Assert.assertEquals(0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel, otherChannel); Assert.assertEquals(EXPECTED_BATCHES, batches.getBatches().size()); @@ -92,7 +92,7 @@ public void testMultiChannelRoutingToEveryone() { insert(TEST_TABLE_1, 50, false); getRouterService().routeData(); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel, otherChannel); Assert.assertEquals(getDbDialect().supportsTransactionId() ? 3 : 17, batches.getBatches() .size()); @@ -242,17 +242,17 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { final int EXPECTED_BATCHES = getDbDialect().supportsTransactionId() ? 51 : 100; - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); Assert.assertEquals(EXPECTED_BATCHES, batches.getBatches().size()); Assert.assertEquals(EXPECTED_BATCHES, countBatchesForChannel(batches, testChannel)); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false); filterForChannels(batches, testChannel); // Node 2 has sync disabled Assert.assertEquals(0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel); // Batch was targeted only at node 1 Assert.assertEquals(0, batches.getBatches().size()); @@ -262,19 +262,19 @@ public void testColumnMatchTransactionalOnlyRoutingToNode1() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); execute("delete from " + TEST_TABLE_1, null); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); Assert.assertEquals( getDbDialect().supportsTransactionId() ? 1 : 100, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -304,17 +304,17 @@ public void testSubSelectNonTransactionalRoutingToNode1() { final int EXPECTED_BATCHES = 100; - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); Assert.assertEquals(EXPECTED_BATCHES, batches.getBatches().size()); Assert.assertEquals(EXPECTED_BATCHES, countBatchesForChannel(batches, testChannel)); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false); filterForChannels(batches, testChannel); // Node 2 has sync disabled Assert.assertEquals(0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel); // Batch was targeted only at node 1 Assert.assertEquals(0, batches.getBatches().size()); @@ -345,13 +345,13 @@ public void testSyncIncomingBatch() throws Exception { getRouterService().routeData(); - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); Assert.assertEquals( "Should have been 0. We did the insert as if the data had come from node 1.", 0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel); Assert.assertEquals(1, batches.getBatches().size()); @@ -413,7 +413,7 @@ protected void testUseOfDefaultChannel(boolean deleteChannel) throws Exception { getRouterService().routeData(); OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches( - NODE_GROUP_NODE_1); + NODE_GROUP_NODE_1, false); Assert.assertEquals(3, batches.getBatches().size()); Assert.assertEquals(Constants.CHANNEL_DEFAULT, batches.getBatches().get(2) .getChannelId()); @@ -495,19 +495,19 @@ public void testBshTransactionalRoutingOnUpdate() { logger.info("Just routed " + count + " rows in " + TEST_TABLE_1 + " in " + (System.currentTimeMillis() - ts) + "ms"); - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); Assert.assertEquals(getDbDialect().supportsTransactionId() ? 1 : 530, batches.getBatches() .size()); Assert.assertEquals(getDbDialect().supportsTransactionId() ? count : 1, (int) batches .getBatches().get(0).getDataEventCount()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false); filterForChannels(batches, testChannel); // Node 2 has sync disabled Assert.assertEquals(0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel); Assert.assertEquals(getDbDialect().supportsTransactionId() ? 1 : 530, batches.getBatches() .size()); @@ -538,16 +538,16 @@ public void testBshRoutingDeletesToNode3() { int count = getJdbcTemplate().update(String.format("delete from %s", TEST_TABLE_1)); getRouterService().routeData(); - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false); filterForChannels(batches, testChannel); Assert.assertEquals(count / MAX_BATCH_SIZE + (count % MAX_BATCH_SIZE > 0 ? 1 : 0), batches .getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false); // Node 2 has sync disabled Assert.assertEquals(0, batches.getBatches().size()); - batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); // Batch was targeted only at node 3 Assert.assertEquals(0, batches.getBatches().size()); @@ -593,7 +593,7 @@ public void testColumnMatchSubtableRoutingToNode1() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); @@ -601,17 +601,17 @@ public void testColumnMatchSubtableRoutingToNode1() { Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false), testChannel)); resetBatches(); @@ -619,7 +619,7 @@ public void testColumnMatchSubtableRoutingToNode1() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); execute("delete from " + TEST_SUBTABLE, null); @@ -627,34 +627,34 @@ public void testColumnMatchSubtableRoutingToNode1() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false), testChannel)); getRouterService().routeData(); Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_2, false), testChannel)); Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_3, false), testChannel)); resetBatches(); @@ -680,7 +680,7 @@ public void testColumnMatchOnNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); @@ -688,7 +688,7 @@ public void testColumnMatchOnNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -698,7 +698,7 @@ public void testColumnMatchOnNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); @@ -706,7 +706,7 @@ public void testColumnMatchOnNull() { Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); } @@ -730,7 +730,7 @@ public void testColumnMatchOnNotNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); @@ -738,7 +738,7 @@ public void testColumnMatchOnNotNull() { Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -748,7 +748,7 @@ public void testColumnMatchOnNotNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); getRouterService().routeData(); @@ -756,7 +756,7 @@ public void testColumnMatchOnNotNull() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); } @@ -782,7 +782,7 @@ public void testSyncOnColumnChange() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); insert(TEST_TABLE_1, 1, true); @@ -791,7 +791,7 @@ public void testSyncOnColumnChange() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -806,7 +806,7 @@ public void testSyncOnColumnChange() { Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -818,7 +818,7 @@ public void testSyncOnColumnChange() { Assert.assertEquals( 0, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); resetBatches(); @@ -830,7 +830,7 @@ public void testSyncOnColumnChange() { Assert.assertEquals( 1, countBatchesForChannel( - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1), + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false), testChannel)); } @@ -860,7 +860,7 @@ public void testSyncIncomingBatchWhenUnrouted() throws Exception { getRouterService().routeData(); - OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1); + OutgoingBatches batches = getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false); filterForChannels(batches, testChannel); Assert.assertEquals( "Should have been 0. We did the insert as if the data had come from node 1.", 0, @@ -883,14 +883,14 @@ public void testDefaultRouteToTargetNodeGroupOnly() throws Exception { getRouterService().routeData(); - Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); Node node2 = getNodeService().findNode("00030"); Assert.assertNotNull(node2); - Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(node2).getBatches() + Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(node2, false).getBatches() .size()); resetBatches(); @@ -925,7 +925,7 @@ public void testGapRouting() throws Exception { getRouterService().routeData(); Assert.assertEquals(1, - getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); List gaps = getDataService().findDataGaps(); @@ -970,7 +970,7 @@ public void testGapWithGapAtBegining() { routeAndCreateGaps(); - Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); gaps = getDataService().findDataGaps(); @@ -1014,7 +1014,7 @@ public void testGapWithGapAtEnd() { routeAndCreateGaps(); - Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(1, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); List gaps = getDataService().findDataGaps(); @@ -1065,7 +1065,7 @@ public void testLotsOfGaps() { routeAndCreateGaps(); - Assert.assertEquals(10, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(10, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); List gaps = getDataService().findDataGaps(); @@ -1088,14 +1088,14 @@ public void testLotsOfGaps() { public void testNoResend() { resetBatches(); - Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); getJdbcTemplate().update("delete from sym_data_gap"); routeAndCreateGaps(); - Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1) + Assert.assertEquals(0, getOutgoingBatchService().getOutgoingBatches(NODE_GROUP_NODE_1, false) .getBatches().size()); } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java index 3025e3cfe7..c6e758e264 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/stage/StagedResource.java @@ -172,5 +172,10 @@ public File getFile() { public String getPath() { return path; } + + @Override + public String toString() { + return file.exists() ? file.getAbsolutePath() : String.format("%d bytes in memory",memoryBuffer.length()); + } }