diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 38e6876382..9d890569b3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -193,100 +193,110 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer, String... tablesToExclude) { Node sourceNode = nodeService.findIdentity(); - Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, - Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), - sourceNode.getNodeId(), targetNode.getNodeId(), false); - - NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(), - targetNode.getNodeGroupId()); - - List triggerRouters = triggerRouterService - .buildTriggerRoutersForSymmetricTables( - StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version.version() - : targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude); - - List initialLoadEvents = new ArrayList( - triggerRouters.size() * 2); - - for (int i = triggerRouters.size() - 1; i >= 0; i--) { - TriggerRouter triggerRouter = triggerRouters.get(i); - String channelId = triggerRouter.getTrigger().getChannelId(); - if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { - TriggerHistory triggerHistory = triggerRouterService - .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() - .getTriggerId(), null, null, triggerRouter.getTrigger() - .getSourceTableName()); - if (triggerHistory == null) { - Trigger trigger = triggerRouter.getTrigger(); - Table table = symmetricDialect.getPlatform().getTableFromCache( - trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), false); - if (table == null) { - throw new IllegalStateException("Could not find a required table: " - + triggerRouter.getTrigger().getSourceTableName()); + if (targetNode != null && sourceNode != null) { + + Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION, + Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(), + sourceNode.getNodeId(), targetNode.getNodeId(), false); + + NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(), + targetNode.getNodeGroupId()); + + List triggerRouters = triggerRouterService + .buildTriggerRoutersForSymmetricTables( + StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version + .version() : targetNode.getSymmetricVersion(), nodeGroupLink, + tablesToExclude); + + List initialLoadEvents = new ArrayList( + triggerRouters.size() * 2); + + for (int i = triggerRouters.size() - 1; i >= 0; i--) { + TriggerRouter triggerRouter = triggerRouters.get(i); + String channelId = triggerRouter.getTrigger().getChannelId(); + if (Constants.CHANNEL_CONFIG.equals(channelId) + || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { + TriggerHistory triggerHistory = triggerRouterService + .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() + .getTriggerId(), null, null, triggerRouter.getTrigger() + .getSourceTableName()); + if (triggerHistory == null) { + Trigger trigger = triggerRouter.getTrigger(); + Table table = symmetricDialect.getPlatform().getTableFromCache( + trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), + trigger.getSourceTableName(), false); + if (table == null) { + throw new IllegalStateException("Could not find a required table: " + + triggerRouter.getTrigger().getSourceTableName()); + } + triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(), + symmetricDialect.getTriggerTemplate()); + triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); } - triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(), - symmetricDialect.getTriggerTemplate()); - triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); - } - StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor( - targetNode, triggerRouter, triggerHistory)); - addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger() - .getSourceTableName(), sql); - String sourceTable = triggerHistory.getSourceTableName(); - Data data = new Data(1, null, sql.toString(), DataEventType.SQL, sourceTable, null, - triggerHistory, triggerRouter.getTrigger().getChannelId(), null, null); - data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter().getRouterId()); - initialLoadEvents.add(new SelectFromTableEvent(data)); - } - } - - for (int i = 0; i < triggerRouters.size(); i++) { - TriggerRouter triggerRouter = triggerRouters.get(i); - String channelId = triggerRouter.getTrigger().getChannelId(); - if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { - TriggerHistory triggerHistory = triggerRouterService - .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() - .getTriggerId(), null, null, null); - if (triggerHistory == null) { - Trigger trigger = triggerRouter.getTrigger(); - triggerHistory = new TriggerHistory(symmetricDialect.getPlatform() - .getTableFromCache(trigger.getSourceCatalogName(), - trigger.getSourceSchemaName(), trigger.getSourceTableName(), - false), trigger, symmetricDialect.getTriggerTemplate()); - triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); - } - - if (!triggerRouter.getTrigger().getSourceTableName() - .endsWith(TableConstants.SYM_NODE_IDENTITY)) { - initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter, - triggerHistory, null)); - } else { - Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT, - triggerHistory.getSourceTableName(), null, triggerHistory, - triggerRouter.getTrigger().getChannelId(), null, null); + StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor( + targetNode, triggerRouter, triggerHistory)); + addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger() + .getSourceTableName(), sql); + String sourceTable = triggerHistory.getSourceTableName(); + Data data = new Data(1, null, sql.toString(), DataEventType.SQL, sourceTable, + null, triggerHistory, triggerRouter.getTrigger().getChannelId(), null, + null); + data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter() + .getRouterId()); initialLoadEvents.add(new SelectFromTableEvent(data)); } } - } - SelectFromTableSource source = new SelectFromTableSource(batch, initialLoadEvents); - ExtractDataReader dataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(), - source); + for (int i = 0; i < triggerRouters.size(); i++) { + TriggerRouter triggerRouter = triggerRouters.get(i); + String channelId = triggerRouter.getTrigger().getChannelId(); + if (Constants.CHANNEL_CONFIG.equals(channelId) + || Constants.CHANNEL_HEARTBEAT.equals(channelId)) { + TriggerHistory triggerHistory = triggerRouterService + .getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger() + .getTriggerId(), null, null, null); + if (triggerHistory == null) { + Trigger trigger = triggerRouter.getTrigger(); + triggerHistory = new TriggerHistory(symmetricDialect.getPlatform() + .getTableFromCache(trigger.getSourceCatalogName(), + trigger.getSourceSchemaName(), + trigger.getSourceTableName(), false), trigger, + symmetricDialect.getTriggerTemplate()); + triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i); + } - ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(), - writer, targetNode.requires13Compatiblity()); - DataProcessor processor = new DataProcessor(dataReader, dataWriter, "configuration extract"); - DataContext ctx = new DataContext(); - ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); - ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode); - processor.process(ctx); + if (!triggerRouter.getTrigger().getSourceTableName() + .endsWith(TableConstants.SYM_NODE_IDENTITY)) { + initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter, + triggerHistory, null)); + } else { + Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT, + triggerHistory.getSourceTableName(), null, triggerHistory, + triggerRouter.getTrigger().getChannelId(), null, null); + initialLoadEvents.add(new SelectFromTableEvent(data)); + } + } + } - if (triggerRouters.size() == 0) { - log.error("{} attempted registration, but was sent an empty configuration", targetNode); + SelectFromTableSource source = new SelectFromTableSource(batch, initialLoadEvents); + ExtractDataReader dataReader = new ExtractDataReader( + this.symmetricDialect.getPlatform(), source); + + ProtocolDataWriter dataWriter = new ProtocolDataWriter( + nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity()); + DataProcessor processor = new DataProcessor(dataReader, dataWriter, + "configuration extract"); + DataContext ctx = new DataContext(); + ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode); + ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode); + processor.process(ctx); + + if (triggerRouters.size() == 0) { + log.error("{} attempted registration, but was sent an empty configuration", + targetNode); + } } - } private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) {