diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 38302fbd6d..4b1733d574 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -373,10 +373,12 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri } public RemoteNodeStatuses push() { + MDC.put("engineName", getEngineName()); return pushService.pushData(); } public void syncTriggers() { + MDC.put("engineName", getEngineName()); triggerRouterService.syncTriggers(); } @@ -385,14 +387,17 @@ public NodeStatus getNodeStatus() { } public RemoteNodeStatuses pull() { + MDC.put("engineName", getEngineName()); return pullService.pullData(); } public void route() { + MDC.put("engineName", getEngineName()); routerService.routeData(); } public void purge() { + MDC.put("engineName", getEngineName()); purgeService.purgeOutgoing(); purgeService.purgeIncoming(); purgeService.purgeDataGaps(); @@ -470,14 +475,17 @@ public boolean isConfigured() { } public void heartbeat(boolean force) { + MDC.put("engineName", getEngineName()); dataService.heartbeat(force); } public void openRegistration(String nodeGroupId, String externalId) { + MDC.put("engineName", getEngineName()); registrationService.openRegistration(nodeGroupId, externalId); } public void reOpenRegistration(String nodeId) { + MDC.put("engineName", getEngineName()); registrationService.reOpenRegistration(nodeId); } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index 6a6014ce48..8a218d8970 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -202,6 +202,7 @@ public boolean invoke(boolean force) { * This method is called from the job */ public void run() { + MDC.put("engineName", engine != null ? engine.getEngineName() : "unknown"); invoke(false); } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java index 010e53f6f1..34e52e0de0 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedFilter.java @@ -131,18 +131,18 @@ private boolean matchesTable(Table table, String tableSuffix) { public void batchCommitted( DataContext context) { if (context.get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) { - log.info("ChannelFlushed"); + log.info("Channels flushed because new channels came through the data loader"); configurationService.reloadChannels(); } if (context.get(CTX_KEY_RESYNC_NEEDED) != null && parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION) && parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { - log.info("."); + log.info("About to syncTriggers because new configuration came through the data loader"); triggerRouterService.syncTriggers(); } if (context.get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null && parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION)) { - log.info("."); + log.info("About to refresh the cache of transformation because new configuration come through the data loader"); transformService.resetCache(); } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index dd0ccaa6c3..0c26e336e7 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -93,7 +93,8 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData // if this is sym_node or sym_node_security determine which nodes it // goes to. if (tableMatches(dataMetaData, TableConstants.SYM_NODE) - || tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)) { + || tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY) + || tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)) { if (didNodeSecurityChangeForNodeInitialization(dataMetaData)) { return null; @@ -251,17 +252,17 @@ private boolean isLinked(String nodeIdInQuestion, Node nodeThatCouldBeRoutedTo, @Override public void contextCommitted(SimpleRouterContext routingContext) { if (routingContext.getContextCache().get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) { - log.info("Channels flushed because new channels came through the dataloader"); + log.info("Channels flushed because new channels came through the data router"); configurationService.reloadChannels(); } if (routingContext.getContextCache().get(CTX_KEY_RESYNC_NEEDED) != null && parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { - log.info("About to syncTriggers because new configuration came through the dataloader"); + log.info("About to syncTriggers because new configuration came through the data router"); triggerRouterService.syncTriggers(); } if (routingContext.getContextCache().get(CTX_KEY_FLUSH_TRANSFORMS_NEEDED) != null && parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION)) { - log.info("About to refresh the cache of transformation because new configuration come through the dataloader"); + log.info("About to refresh the cache of transformation because new configuration come through the data router"); transformService.resetCache(); } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index eeff8589c6..c289abfbaf 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -319,7 +319,10 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, int batchesSentCount = 0; OutgoingBatch currentBatch = null; + try { + IDataWriter dataWriter = null; + for (int i = 0; i < activeBatches.size(); i++) { currentBatch = activeBatches.get(i); @@ -382,7 +385,9 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport, currentBatch.getBatchId()); if (extractedBatch != null) { IDataReader dataReader = new ProtocolDataReader(extractedBatch); - IDataWriter dataWriter = new ProtocolDataWriter(targetTransport.open()); + if (dataWriter == null) { + dataWriter = new ProtocolDataWriter(targetTransport.open()); + } new DataProcessor(dataReader, dataWriter).process(); } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index 524acc1ed4..6be7e7fd50 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -230,7 +230,7 @@ private int purgeByMinMax(long[] minMax, MinMaxDeleteSql identifier, Date retent break; case STRANDED_DATA: deleteSql = getSql("deleteStrandedData"); - args = new Object[] { minId, maxId, cutoffTime, minId, maxId, minId, maxId }; + args = new Object[] { minId, maxId, cutoffTime, minId, maxId }; break; } @@ -274,7 +274,7 @@ private long purgeIncomingBatch(final Calendar time) { getSql("selectIncomingBatchRangeSql"), new ISqlRowMapper() { public NodeBatchRange mapRow(Row rs) { return new NodeBatchRange(rs.getString("node_id"), rs - .getLong("min_batch_id"), rs.getLong("max_batch_id")); + .getLong("min_id"), rs.getLong("max_id")); } }, time.getTime()); int incomingBatchesPurgedCount = purgeByNodeBatchRangeList( diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index 701926616d..2cfa1b903a 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -8,20 +8,22 @@ public class PurgeServiceSqlMap extends AbstractSqlMap { public PurgeServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) { super(platform, replacementTokens); + + // @formatter:off putSql("selectOutgoingBatchRangeSql" ,"" + "select min(batch_id) as min_id, max(batch_id) as max_id from $(outgoing_batch) where " + " create_time < ? and status in ('OK','IG') and batch_id < (select max(batch_id) from $(outgoing_batch)) " ); putSql("deleteOutgoingBatchSql" ,"" + -"delete from $(outgoing_batch) where status in ('OK','IG') and batch_id between :MIN " + -" and :MAX and batch_id not in (select batch_id from $(data_event) where batch_id between :MIN " + -" and :MAX) " ); +"delete from $(outgoing_batch) where status in ('OK','IG') and batch_id between ? \n" + +" and ? and batch_id not in (select batch_id from $(data_event) where batch_id between ? \n" + +" and ?) \n" ); putSql("deleteDataEventSql" ,"" + -"delete from $(data_event) where batch_id not in (select batch_id from " + -" $(outgoing_batch) where batch_id between :MIN and :MAX and status not in ('OK','IG')) " + -" and batch_id between :MIN and :MAX " ); +"delete from $(data_event) where batch_id not in (select batch_id from \n" + +" $(outgoing_batch) where batch_id between ? and ? and status not in ('OK','IG')) \n" + +" and batch_id between ? and ? \n" ); putSql("deleteUnroutedDataEventSql" ,"" + "delete from $(data_event) where " + @@ -35,27 +37,27 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac " in (select node_id from $(node) where sync_enabled=1) and status != 'OK' " ); putSql("deleteStrandedData" ,"" + -"delete from $(data) where " + -" data_id between :MIN and :MAX and " + -" data_id < (select max(ref_data_id) from $(data)_ref) and " + -" create_time < :CUTOFF_TIME and " + -" data_id not in (select e.data_id from $(data_event) e where " + -" e.data_id between :MIN and :MAX) " ); +"delete from $(data) where \n" + +" data_id between ? and ? and \n" + +" data_id < (select min(start_id) from sym_data_gap) and \n" + +" create_time < ? and \n" + +" data_id not in (select e.data_id from $(data_event) e where \n" + +" e.data_id between ? and ?) \n" ); putSql("deleteDataSql" ,"" + -"delete from $(data) where " + -" data_id between :MIN and :MAX and " + -" create_time < :CUTOFF_TIME and " + -" data_id in (select e.data_id from $(data_event) e where " + -" e.data_id between :MIN and :MAX) " + -" and " + -" data_id not in " + -" (select e.data_id from $(data_event) e where " + -" e.data_id between :MIN and :MAX and " + -" (e.data_id is null or " + -" e.batch_id in " + -" (select batch_id from $(outgoing_batch) where " + -" status not in ('OK','IG')))) " ); +"delete from $(data) where \n" + +" data_id between ? and ? and \n" + +" create_time < ? and \n" + +" data_id in (select e.data_id from $(data_event) e where \n" + +" e.data_id between ? and ?) \n" + +" and \n" + +" data_id not in \n" + +" (select e.data_id from $(data_event) e where \n" + +" e.data_id between ? and ? and \n" + +" (e.data_id is null or \n" + +" e.batch_id in \n" + +" (select batch_id from $(outgoing_batch) where \n" + +" status not in ('OK','IG')))) \n" ); putSql("selectIncomingBatchRangeSql" ,"" + "select node_id, min(batch_id) as min_id, max(batch_id) as max_id from $(incoming_batch) where " + diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index ba3cff18c9..43bd3eae84 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -120,6 +120,7 @@ public TriggerRouterService(IParameterService parameterService, configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_GROUP_LINK)); configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE)); + configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST)); configTables .add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY)); configTables.add(TableConstants.getTableName(tablePrefix, TableConstants.SYM_PARAMETER)); @@ -414,16 +415,12 @@ protected boolean isMatch(String catalogName, String schemaName, String tableNam */ protected List getConfigurationTablesTriggerRoutersForCurrentNode( String sourceNodeGroupId) { - List triggers = new ArrayList(); + List triggerRouters = new ArrayList(); List links = configurationService.getNodeGroupLinksFor(sourceNodeGroupId); for (NodeGroupLink nodeGroupLink : links) { - triggers.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), nodeGroupLink)); - if (NodeGroupLinkAction.P == nodeGroupLink.getDataEventAction()) { - triggers.add(buildTriggerRoutersForSymmetricTables(Version.version(), - buildTriggerForSymmetricTable(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST)), nodeGroupLink)); - } + triggerRouters.addAll(buildTriggerRoutersForSymmetricTables(Version.version(), nodeGroupLink)); } - return triggers; + return triggerRouters; } protected void mergeInConfigurationTablesTriggerRoutersForCurrentNode(String sourceNodeGroupId, diff --git a/symmetric/symmetric-core/src/test/resources/log4j.xml b/symmetric/symmetric-core/src/test/resources/log4j.xml index 72867a6814..2a8ad5fa03 100644 --- a/symmetric/symmetric-core/src/test/resources/log4j.xml +++ b/symmetric/symmetric-core/src/test/resources/log4j.xml @@ -12,6 +12,10 @@ + + + +