Skip to content

Commit

Permalink
0002752: Optimize DataExtractorService
Browse files Browse the repository at this point in the history
lookupAndOrderColumnsAccordingToTriggerHistory
  • Loading branch information
erilong committed Aug 25, 2016
1 parent f2ea114 commit 418702d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
Expand Up @@ -1516,6 +1516,24 @@ protected void startNewBatch() {

}

class ColumnsAccordingToTriggerHistory {
Map<Integer, Table> cache;

public Table lookup(String routerId, TriggerHistory triggerHistory, boolean setTargetTableName, boolean useDatabaseDefinition) {
final int prime = 31;
int key = prime + ((routerId == null) ? 0 : routerId.hashCode());
key = prime * key + (setTargetTableName ? 1231 : 1237);
key = prime * key + ((triggerHistory == null) ? 0 : triggerHistory.getTriggerHistoryId());
key = prime * key + (useDatabaseDefinition ? 1231 : 1237);
Table table = cache.get(key);
if (table == null) {
table = lookupAndOrderColumnsAccordingToTriggerHistory(routerId, triggerHistory, setTargetTableName, useDatabaseDefinition);
cache.put(key, table);
}
return table;
}
}

class SelectFromSymDataSource implements IExtractDataReaderSource {

private Batch batch;
Expand All @@ -1539,6 +1557,8 @@ class SelectFromSymDataSource implements IExtractDataReaderSource {
private Node targetNode;

private ProcessInfo processInfo;

private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;

public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
Node sourceNode, Node targetNode, ProcessInfo processInfo) {
Expand All @@ -1548,6 +1568,7 @@ public SelectFromSymDataSource(OutgoingBatch outgoingBatch,
outgoingBatch.getChannelId(), symmetricDialect.getBinaryEncoding(),
sourceNode.getNodeId(), outgoingBatch.getNodeId(), outgoingBatch.isCommonFlag());
this.targetNode = targetNode;
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory();
}

public Batch getBatch() {
Expand Down Expand Up @@ -1618,9 +1639,9 @@ public CsvData next() {
if (lastTriggerHistory == null || lastTriggerHistory
.getTriggerHistoryId() != triggerHistory.getTriggerHistoryId() ||
lastRouterId == null || !lastRouterId.equals(routerId)) {
this.sourceTable = lookupAndOrderColumnsAccordingToTriggerHistory(
this.sourceTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, false, true);
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
this.targetTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, true, false);
this.requiresLobSelectedFromSource = trigger == null ? false : trigger.isUseStreamLobs();
}
Expand Down Expand Up @@ -1654,7 +1675,7 @@ public CsvData next() {
this.sourceTable = platform.getTableFromCache(sourceTable.getCatalog(),
sourceTable.getSchema(), sourceTable.getName(), true);

this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
this.targetTable = columnsAccordingToTriggerHistory.lookup(
routerId, triggerHistory, true, true);
Table copyTargetTable = this.targetTable.copy();

Expand Down Expand Up @@ -1743,10 +1764,13 @@ class SelectFromTableSource implements IExtractDataReaderSource {
private Node node;

private TriggerRouter triggerRouter;

private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;

public SelectFromTableSource(OutgoingBatch outgoingBatch, Batch batch,
SelectFromTableEvent event) {
this.outgoingBatch = outgoingBatch;
this.columnsAccordingToTriggerHistory = new ColumnsAccordingToTriggerHistory();
List<SelectFromTableEvent> initialLoadEvents = new ArrayList<DataExtractorService.SelectFromTableEvent>(
1);
initialLoadEvents.add(event);
Expand Down Expand Up @@ -1807,9 +1831,9 @@ protected CsvData selectNext() {
if (this.currentInitialLoadEvent.containsData()) {
data = this.currentInitialLoadEvent.getData();
this.currentInitialLoadEvent = null;
this.sourceTable = lookupAndOrderColumnsAccordingToTriggerHistory(
this.sourceTable = columnsAccordingToTriggerHistory.lookup(
(String) data.getAttribute(CsvData.ATTRIBUTE_ROUTER_ID), history, false, true);
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(
this.targetTable = columnsAccordingToTriggerHistory.lookup(
(String) data.getAttribute(CsvData.ATTRIBUTE_ROUTER_ID), history, true, false);
} else {
this.triggerRouter = this.currentInitialLoadEvent.getTriggerRouter();
Expand All @@ -1820,9 +1844,9 @@ protected CsvData selectNext() {
this.routingContext = new SimpleRouterContext(batch.getTargetNodeId(),
channel);
}
this.sourceTable = lookupAndOrderColumnsAccordingToTriggerHistory(triggerRouter
this.sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter
.getRouter().getRouterId(), history, false, true);
this.targetTable = lookupAndOrderColumnsAccordingToTriggerHistory(triggerRouter
this.targetTable = columnsAccordingToTriggerHistory.lookup(triggerRouter
.getRouter().getRouterId(), history, true, false);
this.startNewCursor(history, triggerRouter,
this.currentInitialLoadEvent.getInitialLoadSelect());
Expand Down
Expand Up @@ -66,6 +66,8 @@ public String toDbValue() {
protected Date lastUpdateTime;
protected String lastUpdateBy;

protected String sourceColumnNameLowerCase;
protected String targetColumnNameLowerCase;

public TransformColumn(String transformId) {
this.transformId = transformId;
Expand All @@ -75,8 +77,8 @@ public TransformColumn() {
}

public TransformColumn(String sourceColumnName, String targetColumnName, boolean pk) {
this.sourceColumnName = sourceColumnName;
this.targetColumnName = targetColumnName;
setSourceColumnName(sourceColumnName);
setTargetColumnName(targetColumnName);
this.pk = pk;
}

Expand All @@ -91,16 +93,26 @@ public String getSourceColumnName() {
return sourceColumnName;
}

public String getSourceColumnNameLowerCase() {
return sourceColumnNameLowerCase;
}

public void setSourceColumnName(String sourceColumnName) {
this.sourceColumnName = sourceColumnName;
this.sourceColumnNameLowerCase = sourceColumnName == null ? null : sourceColumnName.toLowerCase();
}

public String getTargetColumnName() {
return targetColumnName;
}

public String getTargetColumnNameLowerCase() {
return targetColumnNameLowerCase;
}

public void setTargetColumnName(String targetColumnName) {
this.targetColumnName = targetColumnName;
this.targetColumnNameLowerCase = targetColumnName == null ? null : targetColumnName.toLowerCase();
}

public boolean isPk() {
Expand Down
Expand Up @@ -365,11 +365,12 @@ public TransformTable enhanceWithImpliedColumns(String[] keyNames, String[] colu
boolean hasInsert = false;
boolean hasUpdate = false;
boolean hasDelete = false;
String columnLowerCase = column.toLowerCase();

if (primaryKeyColumns != null) {
for (TransformColumn xCol : transformColumns) {
if ((StringUtils.isNotBlank(xCol.getSourceColumnName()) && StringUtils.equalsIgnoreCase(xCol.getSourceColumnName(),column)) ||
StringUtils.isNotBlank(xCol.getTargetColumnName()) && StringUtils.equalsIgnoreCase(xCol.getTargetColumnName(),column)) {
if ((StringUtils.isNotBlank(xCol.getSourceColumnName()) && columnLowerCase.equals(xCol.getSourceColumnNameLowerCase())) ||
StringUtils.isNotBlank(xCol.getTargetColumnName()) && columnLowerCase.equals(xCol.getTargetColumnNameLowerCase())) {
if (xCol.includeOn == IncludeOnType.ALL) {
hasInsert = hasUpdate = hasDelete = true;
break;
Expand Down Expand Up @@ -412,10 +413,11 @@ public TransformTable enhanceWithImpliedColumns(String[] keyNames, String[] colu
boolean hasInsert = false;
boolean hasUpdate = false;
boolean hasDelete = false;
String columnLowerCase = column.toLowerCase();

for (TransformColumn xCol : copiedVersion.transformColumns) {
if ((StringUtils.isNotBlank(xCol.getSourceColumnName()) && StringUtils.equalsIgnoreCase(xCol.getSourceColumnName(),column)) ||
(StringUtils.isNotBlank(xCol.getTargetColumnName()) && StringUtils.equalsIgnoreCase(xCol.getTargetColumnName(),column))) {
if ((StringUtils.isNotBlank(xCol.getSourceColumnName()) && columnLowerCase.equals(xCol.getSourceColumnNameLowerCase())) ||
(StringUtils.isNotBlank(xCol.getTargetColumnName()) && columnLowerCase.equals(xCol.getTargetColumnNameLowerCase()))) {
if (xCol.includeOn == IncludeOnType.ALL) {
hasInsert = hasUpdate = hasDelete = true;
break;
Expand Down

0 comments on commit 418702d

Please sign in to comment.