Skip to content

Commit

Permalink
0002067: Snapshot fails when run on nodes that are not registered
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Nov 13, 2014
1 parent c252a26 commit 66fc285
Showing 1 changed file with 95 additions and 85 deletions.
Expand Up @@ -193,100 +193,110 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,
String... tablesToExclude) {
Node sourceNode = nodeService.findIdentity();

Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION,
Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(),
sourceNode.getNodeId(), targetNode.getNodeId(), false);

NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(),
targetNode.getNodeGroupId());

List<TriggerRouter> triggerRouters = triggerRouterService
.buildTriggerRoutersForSymmetricTables(
StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version.version()
: targetNode.getSymmetricVersion(), nodeGroupLink, tablesToExclude);

List<SelectFromTableEvent> initialLoadEvents = new ArrayList<SelectFromTableEvent>(
triggerRouters.size() * 2);

for (int i = triggerRouters.size() - 1; i >= 0; i--) {
TriggerRouter triggerRouter = triggerRouters.get(i);
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, triggerRouter.getTrigger()
.getSourceTableName());
if (triggerHistory == null) {
Trigger trigger = triggerRouter.getTrigger();
Table table = symmetricDialect.getPlatform().getTableFromCache(
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false);
if (table == null) {
throw new IllegalStateException("Could not find a required table: "
+ triggerRouter.getTrigger().getSourceTableName());
if (targetNode != null && sourceNode != null) {

Batch batch = new Batch(BatchType.EXTRACT, Constants.VIRTUAL_BATCH_FOR_REGISTRATION,
Constants.CHANNEL_CONFIG, symmetricDialect.getBinaryEncoding(),
sourceNode.getNodeId(), targetNode.getNodeId(), false);

NodeGroupLink nodeGroupLink = new NodeGroupLink(parameterService.getNodeGroupId(),
targetNode.getNodeGroupId());

List<TriggerRouter> triggerRouters = triggerRouterService
.buildTriggerRoutersForSymmetricTables(
StringUtils.isBlank(targetNode.getSymmetricVersion()) ? Version
.version() : targetNode.getSymmetricVersion(), nodeGroupLink,
tablesToExclude);

List<SelectFromTableEvent> initialLoadEvents = new ArrayList<SelectFromTableEvent>(
triggerRouters.size() * 2);

for (int i = triggerRouters.size() - 1; i >= 0; i--) {
TriggerRouter triggerRouter = triggerRouters.get(i);
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId)
|| Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, triggerRouter.getTrigger()
.getSourceTableName());
if (triggerHistory == null) {
Trigger trigger = triggerRouter.getTrigger();
Table table = symmetricDialect.getPlatform().getTableFromCache(
trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false);
if (table == null) {
throw new IllegalStateException("Could not find a required table: "
+ triggerRouter.getTrigger().getSourceTableName());
}
triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(),
symmetricDialect.getTriggerTemplate());
triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i);
}
triggerHistory = new TriggerHistory(table, triggerRouter.getTrigger(),
symmetricDialect.getTriggerTemplate());
triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i);
}

StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor(
targetNode, triggerRouter, triggerHistory));
addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger()
.getSourceTableName(), sql);
String sourceTable = triggerHistory.getSourceTableName();
Data data = new Data(1, null, sql.toString(), DataEventType.SQL, sourceTable, null,
triggerHistory, triggerRouter.getTrigger().getChannelId(), null, null);
data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter().getRouterId());
initialLoadEvents.add(new SelectFromTableEvent(data));
}
}

for (int i = 0; i < triggerRouters.size(); i++) {
TriggerRouter triggerRouter = triggerRouters.get(i);
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, null);
if (triggerHistory == null) {
Trigger trigger = triggerRouter.getTrigger();
triggerHistory = new TriggerHistory(symmetricDialect.getPlatform()
.getTableFromCache(trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(), trigger.getSourceTableName(),
false), trigger, symmetricDialect.getTriggerTemplate());
triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i);
}

if (!triggerRouter.getTrigger().getSourceTableName()
.endsWith(TableConstants.SYM_NODE_IDENTITY)) {
initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter,
triggerHistory, null));
} else {
Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT,
triggerHistory.getSourceTableName(), null, triggerHistory,
triggerRouter.getTrigger().getChannelId(), null, null);
StringBuilder sql = new StringBuilder(symmetricDialect.createPurgeSqlFor(
targetNode, triggerRouter, triggerHistory));
addPurgeCriteriaToConfigurationTables(triggerRouter.getTrigger()
.getSourceTableName(), sql);
String sourceTable = triggerHistory.getSourceTableName();
Data data = new Data(1, null, sql.toString(), DataEventType.SQL, sourceTable,
null, triggerHistory, triggerRouter.getTrigger().getChannelId(), null,
null);
data.putAttribute(Data.ATTRIBUTE_ROUTER_ID, triggerRouter.getRouter()
.getRouterId());
initialLoadEvents.add(new SelectFromTableEvent(data));
}
}
}

SelectFromTableSource source = new SelectFromTableSource(batch, initialLoadEvents);
ExtractDataReader dataReader = new ExtractDataReader(this.symmetricDialect.getPlatform(),
source);
for (int i = 0; i < triggerRouters.size(); i++) {
TriggerRouter triggerRouter = triggerRouters.get(i);
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId)
|| Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, null);
if (triggerHistory == null) {
Trigger trigger = triggerRouter.getTrigger();
triggerHistory = new TriggerHistory(symmetricDialect.getPlatform()
.getTableFromCache(trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false), trigger,
symmetricDialect.getTriggerTemplate());
triggerHistory.setTriggerHistoryId(Integer.MAX_VALUE - i);
}

ProtocolDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer, targetNode.requires13Compatiblity());
DataProcessor processor = new DataProcessor(dataReader, dataWriter, "configuration extract");
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode);
processor.process(ctx);
if (!triggerRouter.getTrigger().getSourceTableName()
.endsWith(TableConstants.SYM_NODE_IDENTITY)) {
initialLoadEvents.add(new SelectFromTableEvent(targetNode, triggerRouter,
triggerHistory, null));
} else {
Data data = new Data(1, null, targetNode.getNodeId(), DataEventType.INSERT,
triggerHistory.getSourceTableName(), null, triggerHistory,
triggerRouter.getTrigger().getChannelId(), null, null);
initialLoadEvents.add(new SelectFromTableEvent(data));
}
}
}

if (triggerRouters.size() == 0) {
log.error("{} attempted registration, but was sent an empty configuration", targetNode);
SelectFromTableSource source = new SelectFromTableSource(batch, initialLoadEvents);
ExtractDataReader dataReader = new ExtractDataReader(
this.symmetricDialect.getPlatform(), source);

ProtocolDataWriter dataWriter = new ProtocolDataWriter(
nodeService.findIdentityNodeId(), writer, targetNode.requires13Compatiblity());
DataProcessor processor = new DataProcessor(dataReader, dataWriter,
"configuration extract");
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, sourceNode);
processor.process(ctx);

if (triggerRouters.size() == 0) {
log.error("{} attempted registration, but was sent an empty configuration",
targetNode);
}
}

}

private void addPurgeCriteriaToConfigurationTables(String sourceTableName, StringBuilder sql) {
Expand Down

0 comments on commit 66fc285

Please sign in to comment.