Skip to content

Commit

Permalink
0003096: Variables in router target catalog and schema
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 5, 2017
1 parent cf9b979 commit 7cc659c
Showing 1 changed file with 26 additions and 6 deletions.
Expand Up @@ -146,6 +146,7 @@
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.util.SymmetricUtils;
import org.jumpmind.util.CustomizableThreadFactory;
import org.jumpmind.util.FormatUtils;
import org.jumpmind.util.Statistics;

/**
Expand Down Expand Up @@ -1268,7 +1269,8 @@ protected TransformWriter createTransformDataWriter(Node identity, Node targetNo
}

protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition) {
TriggerHistory triggerHistory, Node sourceNode, Node targetNode,
boolean setTargetTableName, boolean useDatabaseDefinition) {
String catalogName = triggerHistory.getSourceCatalogName();
String schemaName = triggerHistory.getSourceSchemaName();
String tableName = triggerHistory.getSourceTableName();
Expand Down Expand Up @@ -1309,13 +1311,13 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetCatalogName())) {
table.setCatalog(null);
} else if (StringUtils.isNotBlank(router.getTargetCatalogName())) {
table.setCatalog(router.getTargetCatalogName());
table.setCatalog(replaceVariables(sourceNode, targetNode, router.getTargetCatalogName()));
}

if (StringUtils.equals(Constants.NONE_TOKEN, router.getTargetSchemaName())) {
table.setSchema(null);
} else if (StringUtils.isNotBlank(router.getTargetSchemaName())) {
table.setSchema(router.getTargetSchemaName());
table.setSchema(replaceVariables(sourceNode, targetNode, router.getTargetSchemaName()));
}

if (StringUtils.isNotBlank(router.getTargetTableName())) {
Expand All @@ -1325,6 +1327,16 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
return table;
}

protected String replaceVariables(Node sourceNode, Node targetNode, String str) {
str = FormatUtils.replace("sourceNodeId", sourceNode.getNodeGroupId(), str);
str = FormatUtils.replace("sourceExternalId", sourceNode.getExternalId(), str);
str = FormatUtils.replace("sourceNodeGroupId", sourceNode.getNodeGroupId(), str);
str = FormatUtils.replace("targetNodeId", targetNode.getNodeGroupId(), str);
str = FormatUtils.replace("targetExternalId", targetNode.getExternalId(), str);
str = FormatUtils.replace("targetNodeGroupId", targetNode.getNodeGroupId(), str);
return str;
}

public RemoteNodeStatuses queueWork(boolean force) {
final RemoteNodeStatuses statuses = new RemoteNodeStatuses(configurationService.getChannels(false));
Node identity = nodeService.findIdentity();
Expand Down Expand Up @@ -1592,6 +1604,13 @@ public ExtractRequest mapRow(Row row) {

class ColumnsAccordingToTriggerHistory {
Map<Integer, Table> cache = new HashMap<Integer, Table>();
Node sourceNode;
Node targetNode;

public ColumnsAccordingToTriggerHistory(Node sourceNode, Node targetNode) {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
}

public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition) {
final int prime = 31;
Expand All @@ -1601,7 +1620,8 @@ public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setT
key = prime * key + (useDatabaseDefinition ? 1231 : 1237);
Table table = cache.get(key);
if (table == null) {
table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, setTargetTableName, useDatabaseDefinition);
table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, sourceNode,
targetNode, setTargetTableName, useDatabaseDefinition);
cache.put(key, table);
}
return table;
Expand Down Expand Up @@ -1642,7 +1662,7 @@ public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
outgoingBatch.getChannelId(), symmetricDialect.getBinaryEncoding(),
sourceNode.getNodeId(), outgoingBatch.getNodeId(), outgoingBatch.isCommonFlag());
this.targetNode = targetNode;
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory();
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory(sourceNode, targetNode);
}

public Batch getBatch() {
Expand Down Expand Up @@ -1866,7 +1886,7 @@ protected void init(Batch batch, List<SelectFromTableEvent> initialLoadEvents) {
throw new SymmetricException("Could not find a node represented by %s",
this.batch.getTargetNodeId());
}
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory();
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory(nodeService.findIdentity(), node);
}

public Table getSourceTable() {
Expand Down

0 comments on commit 7cc659c

Please sign in to comment.