Skip to content

Commit

Permalink
0006064: Slow extract and load when using $(targetExternalId) variable
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 30, 2023
1 parent 464d695 commit b11bcc7
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 109 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.model.Column;
Expand All @@ -49,15 +50,19 @@
import org.jumpmind.symmetric.util.SymmetricUtils;

public class ColumnsAccordingToTriggerHistory {
private static Map<String, Map<String, Table>> cacheByEngine = new ConcurrentHashMap<String, Map<String, Table>>();
private Map<CacheKey, Table> cache = new HashMap<CacheKey, Table>();
private ISymmetricEngine engine;
private Node sourceNode;
private Node targetNode;
private ITriggerRouterService triggerRouterService;
private ITransformService transformService;
private ISymmetricDialect symmetricDialect;
private String tablePrefix;
private boolean isUsingTargetExternalId;

public ColumnsAccordingToTriggerHistory(ISymmetricEngine engine, Node sourceNode, Node targetNode) {
this.engine = engine;
triggerRouterService = engine.getTriggerRouterService();
transformService = engine.getTransformService();
symmetricDialect = engine.getSymmetricDialect();
Expand Down Expand Up @@ -87,19 +92,11 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
String tableNameLowerCase = triggerHistory.getSourceTableNameLowerCase();
Table table = null;
if (useDatabaseDefinition) {
table = getTargetPlatform(tableNameLowerCase).getTableFromCache(catalogName, schemaName, tableName, false);
if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) {
/*
* If the column count is less than what trigger history reports, then chances are the table cache is out of date.
*/
table = getTargetPlatform(tableNameLowerCase).getTableFromCache(catalogName, schemaName, tableName, true);
}
if (table != null) {
table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true, addMissingColumns);
if (isUsingTargetExternalId && !tableName.startsWith(tablePrefix)) {
table = lookupTableExpanded(getTargetPlatform(tableNameLowerCase), catalogName, schemaName, tableName, triggerHistory,
addMissingColumns);
} else {
throw new SymmetricException("Could not find the following table. It might have been dropped: %s",
Table.getFullyQualifiedTableName(catalogName, schemaName, tableName));
table = lookupTable(getTargetPlatform(tableNameLowerCase), catalogName, schemaName, tableName, triggerHistory, addMissingColumns);
}
} else {
table = new Table(tableName);
Expand Down Expand Up @@ -162,6 +159,58 @@ protected IDatabasePlatform getTargetPlatform(String tableName) {
return tableName.startsWith(tablePrefix) ? symmetricDialect.getPlatform() : symmetricDialect.getTargetDialect().getPlatform();
}

protected Table lookupTable(IDatabasePlatform platform, String catalogName, String schemaName, String tableName, TriggerHistory triggerHistory,
boolean addMissingColumns) {
Table table = platform.getTableFromCache(catalogName, schemaName, tableName, false);
if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) {
/*
* If the column count is less than what trigger history reports, then chances are the table cache is out of date.
*/
table = platform.getTableFromCache(catalogName, schemaName, tableName, true);
}
if (table != null) {
table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
triggerHistory.getParsedPkColumnNames(), true, addMissingColumns);
} else {
throw new SymmetricException("Could not find the following table. It might have been dropped: %s",
Table.getFullyQualifiedTableName(catalogName, schemaName, tableName));
}
return table;
}

protected Table lookupTableExpanded(IDatabasePlatform platform, String catalogName, String schemaName, String tableName,
TriggerHistory triggerHistory, boolean addMissingColumns) {
Table table = null;
if (!tableName.contains(targetNode.getExternalId())) {
table = lookupTable(platform, catalogName, schemaName, tableName, triggerHistory, addMissingColumns);
} else {
String baseTableName = tableName.replace(targetNode.getExternalId(), "") + (addMissingColumns ? "-t" : "-f");
Map<String, Table> sourceTableMap = getSourceTableMap(engine.getEngineName());
table = sourceTableMap.get(baseTableName);
if (table == null) {
table = lookupTable(platform, catalogName, schemaName, tableName, triggerHistory, addMissingColumns);
sourceTableMap.put(baseTableName, table);
}
if (table != null) {
try {
table = (Table) table.clone();
table.setName(tableName);
} catch (CloneNotSupportedException e) {
}
}
}
return table;
}

protected Map<String, Table> getSourceTableMap(String engineName) {
Map<String, Table> map = cacheByEngine.get(engineName);
if (map == null) {
map = new ConcurrentHashMap<String, Table>();
cacheByEngine.put(engineName, map);
}
return map;
}

protected TransformTable getTransform(String sourceNodeGroupId, String targetNodeGroupId, Table table,
TransformPoint transformPoint, int order) {
List<TransformTableNodeGroupLink> transforms = transformService.findTransformsFor(sourceNode.getNodeGroupId(),
Expand Down

0 comments on commit b11bcc7

Please sign in to comment.