From b11bcc7138a2ce3d5fa580d78634e7dd2b685f1c Mon Sep 17 00:00:00 2001 From: Eric Long Date: Mon, 30 Oct 2023 10:28:29 -0400 Subject: [PATCH] 0006064: Slow extract and load when using $(targetExternalId) variable --- .../ColumnsAccordingToTriggerHistory.java | 73 +++++- .../load/DefaultDataLoaderFactory.java | 220 +++++++++++------- .../java/org/jumpmind/db/model/Table.java | 18 +- .../io/data/writer/DefaultDatabaseWriter.java | 18 +- 4 files changed, 220 insertions(+), 109 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ColumnsAccordingToTriggerHistory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ColumnsAccordingToTriggerHistory.java index 7a7b6cc0e2..023db68fa3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ColumnsAccordingToTriggerHistory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/extract/ColumnsAccordingToTriggerHistory.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.StringUtils; import org.jumpmind.db.model.Column; @@ -49,15 +50,19 @@ import org.jumpmind.symmetric.util.SymmetricUtils; public class ColumnsAccordingToTriggerHistory { + private static Map> cacheByEngine = new ConcurrentHashMap>(); private Map cache = new HashMap(); + private ISymmetricEngine engine; private Node sourceNode; private Node targetNode; private ITriggerRouterService triggerRouterService; private ITransformService transformService; private ISymmetricDialect symmetricDialect; private String tablePrefix; + private boolean isUsingTargetExternalId; public ColumnsAccordingToTriggerHistory(ISymmetricEngine engine, Node sourceNode, Node targetNode) { + this.engine = engine; triggerRouterService = engine.getTriggerRouterService(); transformService = engine.getTransformService(); symmetricDialect = engine.getSymmetricDialect(); @@ -87,19 +92,11 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, String tableNameLowerCase = triggerHistory.getSourceTableNameLowerCase(); Table table = null; if (useDatabaseDefinition) { - table = getTargetPlatform(tableNameLowerCase).getTableFromCache(catalogName, schemaName, tableName, false); - if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) { - /* - * If the column count is less than what trigger history reports, then chances are the table cache is out of date. - */ - table = getTargetPlatform(tableNameLowerCase).getTableFromCache(catalogName, schemaName, tableName, true); - } - if (table != null) { - table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(), - triggerHistory.getParsedPkColumnNames(), true, addMissingColumns); + if (isUsingTargetExternalId && !tableName.startsWith(tablePrefix)) { + table = lookupTableExpanded(getTargetPlatform(tableNameLowerCase), catalogName, schemaName, tableName, triggerHistory, + addMissingColumns); } else { - throw new SymmetricException("Could not find the following table. It might have been dropped: %s", - Table.getFullyQualifiedTableName(catalogName, schemaName, tableName)); + table = lookupTable(getTargetPlatform(tableNameLowerCase), catalogName, schemaName, tableName, triggerHistory, addMissingColumns); } } else { table = new Table(tableName); @@ -162,6 +159,58 @@ protected IDatabasePlatform getTargetPlatform(String tableName) { return tableName.startsWith(tablePrefix) ? symmetricDialect.getPlatform() : symmetricDialect.getTargetDialect().getPlatform(); } + protected Table lookupTable(IDatabasePlatform platform, String catalogName, String schemaName, String tableName, TriggerHistory triggerHistory, + boolean addMissingColumns) { + Table table = platform.getTableFromCache(catalogName, schemaName, tableName, false); + if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) { + /* + * If the column count is less than what trigger history reports, then chances are the table cache is out of date. + */ + table = platform.getTableFromCache(catalogName, schemaName, tableName, true); + } + if (table != null) { + table = table.copyAndFilterColumns(triggerHistory.getParsedColumnNames(), + triggerHistory.getParsedPkColumnNames(), true, addMissingColumns); + } else { + throw new SymmetricException("Could not find the following table. It might have been dropped: %s", + Table.getFullyQualifiedTableName(catalogName, schemaName, tableName)); + } + return table; + } + + protected Table lookupTableExpanded(IDatabasePlatform platform, String catalogName, String schemaName, String tableName, + TriggerHistory triggerHistory, boolean addMissingColumns) { + Table table = null; + if (!tableName.contains(targetNode.getExternalId())) { + table = lookupTable(platform, catalogName, schemaName, tableName, triggerHistory, addMissingColumns); + } else { + String baseTableName = tableName.replace(targetNode.getExternalId(), "") + (addMissingColumns ? "-t" : "-f"); + Map sourceTableMap = getSourceTableMap(engine.getEngineName()); + table = sourceTableMap.get(baseTableName); + if (table == null) { + table = lookupTable(platform, catalogName, schemaName, tableName, triggerHistory, addMissingColumns); + sourceTableMap.put(baseTableName, table); + } + if (table != null) { + try { + table = (Table) table.clone(); + table.setName(tableName); + } catch (CloneNotSupportedException e) { + } + } + } + return table; + } + + protected Map getSourceTableMap(String engineName) { + Map map = cacheByEngine.get(engineName); + if (map == null) { + map = new ConcurrentHashMap(); + cacheByEngine.put(engineName, map); + } + return map; + } + protected TransformTable getTransform(String sourceNodeGroupId, String targetNodeGroupId, Table table, TransformPoint transformPoint, int order) { List transforms = transformService.findTransformsFor(sourceNode.getNodeGroupId(), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index b1b48b3619..f7d54fb68e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -25,7 +25,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IAlterDatabaseInterceptor; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.cassandra.CassandraPlatform; @@ -67,6 +69,7 @@ public class DefaultDataLoaderFactory extends AbstractDataLoaderFactory implemen protected final Logger log = LoggerFactory.getLogger(getClass()); protected ISymmetricEngine engine; protected Set conflictLosingParentRows = new HashSet(); + protected Map targetTableMap = new ConcurrentHashMap(); public DefaultDataLoaderFactory() { } @@ -129,107 +132,146 @@ public IDataWriter getDataWriter(final String sourceNodeId, final ISymmetricDial throw new RuntimeException(e); } } - DynamicDefaultDatabaseWriter writer = new DynamicDefaultDatabaseWriter(symmetricDialect.getPlatform(), - symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(), - new DefaultTransformWriterConflictResolver(transformWriter) { - @Override - protected void beforeResolutionAttempt(CsvData csvData, Conflict conflict) { - if (conflict.getPingBack() != PingBack.OFF) { - DynamicDefaultDatabaseWriter writer = transformWriter - .getNestedWriterOfType(DynamicDefaultDatabaseWriter.class); - ISqlTransaction transaction = writer.getTransaction(); - if (transaction != null) { - symmetricDialect.enableSyncTriggers(transaction); - } - } + DefaultTransformWriterConflictResolver resolver = new DefaultTransformWriterConflictResolver(transformWriter) { + @Override + protected void beforeResolutionAttempt(CsvData csvData, Conflict conflict) { + if (conflict.getPingBack() != PingBack.OFF) { + DynamicDefaultDatabaseWriter writer = transformWriter + .getNestedWriterOfType(DynamicDefaultDatabaseWriter.class); + ISqlTransaction transaction = writer.getTransaction(); + if (transaction != null) { + symmetricDialect.enableSyncTriggers(transaction); } + } + } - @Override - protected void afterResolutionAttempt(CsvData csvData, Conflict conflict) { - DynamicDefaultDatabaseWriter writer = transformWriter.getNestedWriterOfType(DynamicDefaultDatabaseWriter.class); - if (Boolean.TRUE.equals(writer.getContext().get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) { - return; - } - if (conflict.getPingBack() == PingBack.SINGLE_ROW) { - ISqlTransaction transaction = writer.getTransaction(); - if (transaction != null) { - symmetricDialect.disableSyncTriggers(transaction, sourceNodeId); - } + @Override + protected void afterResolutionAttempt(CsvData csvData, Conflict conflict) { + DynamicDefaultDatabaseWriter writer = transformWriter.getNestedWriterOfType(DynamicDefaultDatabaseWriter.class); + if (Boolean.TRUE.equals(writer.getContext().get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) { + return; + } + if (conflict.getPingBack() == PingBack.SINGLE_ROW) { + ISqlTransaction transaction = writer.getTransaction(); + if (transaction != null) { + symmetricDialect.disableSyncTriggers(transaction, sourceNodeId); + } + } + if (conflict.getResolveType() == ResolveConflict.NEWER_WINS && + conflict.getDetectType() != DetectConflict.USE_TIMESTAMP && + conflict.getDetectType() != DetectConflict.USE_VERSION) { + Boolean isWinner = (Boolean) writer.getContext().get(DatabaseConstants.IS_CONFLICT_WINNER); + if (isWinner != null && isWinner == true) { + writer.getContext().remove(DatabaseConstants.IS_CONFLICT_WINNER); + ISqlTransaction transaction = writer.getTransaction(); + if (transaction != null) { + handleWinnerForNewerCaptureWins(transaction, csvData); } - if (conflict.getResolveType() == ResolveConflict.NEWER_WINS && - conflict.getDetectType() != DetectConflict.USE_TIMESTAMP && - conflict.getDetectType() != DetectConflict.USE_VERSION) { - Boolean isWinner = (Boolean) writer.getContext().get(DatabaseConstants.IS_CONFLICT_WINNER); - if (isWinner != null && isWinner == true) { - writer.getContext().remove(DatabaseConstants.IS_CONFLICT_WINNER); - ISqlTransaction transaction = writer.getTransaction(); - if (transaction != null) { - handleWinnerForNewerCaptureWins(transaction, csvData); - } - } + } + } + } + + /** + * When using new captured row wins, the winning row is saved to sym_data so other conflicts can see it. When two nodes are in conflict, they race + * to update the third node, but the first node will get no conflict, so we send a script back to all but winning node to ask if they have a newer + * row. + */ + protected void handleWinnerForNewerCaptureWins(ISqlTransaction transaction, CsvData csvData) { + String tableName = csvData.getAttribute(CsvData.ATTRIBUTE_TABLE_NAME); + Timestamp loadingTs = csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME); + List hists = engine.getTriggerRouterService().getActiveTriggerHistories(tableName); + if (hists != null && hists.size() > 0 && loadingTs != null) { + TriggerHistory hist = hists.get(0); + Data data = new Data(hist.getSourceTableName(), csvData.getDataEventType(), + csvData.getCsvData(CsvData.ROW_DATA), csvData.getCsvData(CsvData.PK_DATA), hist, + csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID), null, csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID)); + data.setOldData(csvData.getCsvData(CsvData.OLD_DATA)); + data.setPreRouted(true); + data.setCreateTime(csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME)); + engine.getDataService().insertData(transaction, data); + String channelId = csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID); + if (channelId != null && !channelId.equals(Constants.CHANNEL_RELOAD)) { + String pkCsvData = CsvUtils.escapeCsvData(getPkCsvData(csvData, hist)); + String nodeTableName = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE); + List nodeHists = engine.getTriggerRouterService().getActiveTriggerHistories(nodeTableName); + if (nodeHists != null && nodeHists.size() > 0 && pkCsvData != null) { + String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID); + long createTime = data.getCreateTime() != null ? data.getCreateTime().getTime() : 0; + String script = "if (context != void && context != null && org.jumpmind.symmetric.Version.isOlderVersion(\"3.12.4\")) { " + + "engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" + + tableName + "\", " + pkCsvData + ", new Date(" + + createTime + "L), \"" + sourceNodeId + "\"); }"; + Data scriptData = new Data(nodeTableName, DataEventType.BSH, + CsvUtils.escapeCsvData(script), null, nodeHists.get(0), Constants.CHANNEL_RELOAD, null, null); + scriptData.setSourceNodeId(sourceNodeId); + engine.getDataService().insertData(transaction, scriptData); } } + } + } - /** - * When using new captured row wins, the winning row is saved to sym_data so other conflicts can see it. When two nodes are in conflict, - * they race to update the third node, but the first node will get no conflict, so we send a script back to all but winning node to ask if - * they have a newer row. - */ - protected void handleWinnerForNewerCaptureWins(ISqlTransaction transaction, CsvData csvData) { - String tableName = csvData.getAttribute(CsvData.ATTRIBUTE_TABLE_NAME); - Timestamp loadingTs = csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME); - List hists = engine.getTriggerRouterService().getActiveTriggerHistories(tableName); - if (hists != null && hists.size() > 0 && loadingTs != null) { - TriggerHistory hist = hists.get(0); - Data data = new Data(hist.getSourceTableName(), csvData.getDataEventType(), - csvData.getCsvData(CsvData.ROW_DATA), csvData.getCsvData(CsvData.PK_DATA), hist, - csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID), null, csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID)); - data.setOldData(csvData.getCsvData(CsvData.OLD_DATA)); - data.setPreRouted(true); - data.setCreateTime(csvData.getAttribute(CsvData.ATTRIBUTE_CREATE_TIME)); - engine.getDataService().insertData(transaction, data); - String channelId = csvData.getAttribute(CsvData.ATTRIBUTE_CHANNEL_ID); - if (channelId != null && !channelId.equals(Constants.CHANNEL_RELOAD)) { - String pkCsvData = CsvUtils.escapeCsvData(getPkCsvData(csvData, hist)); - String nodeTableName = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE); - List nodeHists = engine.getTriggerRouterService().getActiveTriggerHistories(nodeTableName); - if (nodeHists != null && nodeHists.size() > 0 && pkCsvData != null) { - String sourceNodeId = csvData.getAttribute(CsvData.ATTRIBUTE_SOURCE_NODE_ID); - long createTime = data.getCreateTime() != null ? data.getCreateTime().getTime() : 0; - String script = "if (context != void && context != null && org.jumpmind.symmetric.Version.isOlderVersion(\"3.12.4\")) { " + - "engine.getDataService().sendNewerDataToNode(context.findTransaction(), SOURCE_NODE_ID, \"" + - tableName + "\", " + pkCsvData + ", new Date(" + - createTime + "L), \"" + sourceNodeId + "\"); }"; - Data scriptData = new Data(nodeTableName, DataEventType.BSH, - CsvUtils.escapeCsvData(script), null, nodeHists.get(0), Constants.CHANNEL_RELOAD, null, null); - scriptData.setSourceNodeId(sourceNodeId); - engine.getDataService().insertData(transaction, scriptData); - } - } + protected String getPkCsvData(CsvData csvData, TriggerHistory hist) { + String pkCsvData = csvData.getCsvData(CsvData.PK_DATA); + if (pkCsvData == null) { + if (hist.getParsedPkColumnNames() != null && hist.getParsedPkColumnNames().length > 0) { + String[] pkData = new String[hist.getParsedPkColumnNames().length]; + Map values = csvData.toColumnNameValuePairs(hist.getParsedPkColumnNames(), CsvData.ROW_DATA); + int i = 0; + for (String name : hist.getParsedPkColumnNames()) { + pkData[i++] = values.get(name); } + pkCsvData = CsvUtils.escapeCsvData(pkData); + } else { + pkCsvData = csvData.getCsvData(CsvData.ROW_DATA); } + } + if (pkCsvData != null) { + pkCsvData = pkCsvData.replace("\n", "\\n").replace("\r", "\\r"); + } + return pkCsvData; + } + }; + DynamicDefaultDatabaseWriter writer = null; + if (engine.getCacheManager().isUsingTargetExternalId(false)) { + writer = new DynamicDefaultDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), + symmetricDialect.getTablePrefix(), resolver, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData)) { + protected String getTableKey(Table table) { + if (!table.getName().contains(batch.getSourceNodeId())) { + return super.getTableKey(table); + } else { + try { + table = (Table) table.clone(); + table.setName(table.getName().replace(batch.getSourceNodeId(), "")); + } catch (CloneNotSupportedException e) { + } + return table.getTableKey(); + } + } - protected String getPkCsvData(CsvData csvData, TriggerHistory hist) { - String pkCsvData = csvData.getCsvData(CsvData.PK_DATA); - if (pkCsvData == null) { - if (hist.getParsedPkColumnNames() != null && hist.getParsedPkColumnNames().length > 0) { - String[] pkData = new String[hist.getParsedPkColumnNames().length]; - Map values = csvData.toColumnNameValuePairs(hist.getParsedPkColumnNames(), CsvData.ROW_DATA); - int i = 0; - for (String name : hist.getParsedPkColumnNames()) { - pkData[i++] = values.get(name); - } - pkCsvData = CsvUtils.escapeCsvData(pkData); - } else { - pkCsvData = csvData.getCsvData(CsvData.ROW_DATA); + protected Table lookupTableFromCache(Table sourceTable, String tableKey) { + if (!sourceTable.getName().contains(batch.getSourceNodeId())) { + return super.lookupTableFromCache(sourceTable, tableKey); + } else { + Table table = targetTableMap.get(tableKey); + if (table != null) { + try { + table = (Table) table.clone(); + table.setName(sourceTable.getName()); + } catch (CloneNotSupportedException e) { } } - if (pkCsvData != null) { - pkCsvData = pkCsvData.replace("\n", "\\n").replace("\r", "\\r"); - } - return pkCsvData; + return table; } - }, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData)); + } + + protected void putTableInCache(String tableKey, Table table) { + targetTableMap.put(tableKey, table); + } + }; + } else { + writer = new DynamicDefaultDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), + symmetricDialect.getTablePrefix(), resolver, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData)); + } return writer; } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java b/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java index c7d81853eb..11b0645277 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java @@ -1039,7 +1039,7 @@ public String toVerboseString() { } public String getTableKey() { - return getFullyQualifiedTableName() + "-" + calculateTableHashcode(); + return getFullyQualifiedTableName() + "-" + calculateTableLiteHashcode(); } public boolean containsLobColumns(IDatabasePlatform platform) { @@ -1419,20 +1419,28 @@ public String[] getPrimaryKeyColumnNames() { } public int calculateTableHashcode() { + return calculateTableHashcode(true); + } + + public int calculateTableLiteHashcode() { + return calculateTableHashcode(false); + } + + protected int calculateTableHashcode(boolean includeTypes) { final int PRIME = 31; int result = 1; result = PRIME * result + name.hashCode(); - result = PRIME * result + calculateHashcodeForColumns(PRIME, getColumns()); - result = PRIME * result + calculateHashcodeForColumns(PRIME, getPrimaryKeyColumns()); + result = PRIME * result + calculateHashcodeForColumns(PRIME, getColumns(), includeTypes); + result = PRIME * result + calculateHashcodeForColumns(PRIME, getPrimaryKeyColumns(), includeTypes); return result; } - private static int calculateHashcodeForColumns(final int PRIME, Column[] cols) { + private static int calculateHashcodeForColumns(final int PRIME, Column[] cols, boolean includeTypes) { int result = 1; if (cols != null && cols.length > 0) { for (Column column : cols) { result = PRIME * result + column.getName().hashCode(); - if (column.getMappedType() != null) { + if (includeTypes && column.getMappedType() != null) { result = PRIME * result + column.getMappedType().hashCode(); } result = PRIME * result + column.getSizeAsInt(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java index b31480b1de..c189be12c2 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriter.java @@ -1160,8 +1160,8 @@ protected int execute(CsvData data, String[] values) { @Override protected Table lookupTableAtTarget(Table sourceTable) { - String tableNameKey = sourceTable.getTableKey(); - Table table = targetTables.get(tableNameKey); + String tableNameKey = getTableKey(sourceTable); + Table table = lookupTableFromCache(sourceTable, tableNameKey); if (table == null) { try { table = getPlatform(sourceTable).getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), @@ -1182,7 +1182,7 @@ protected Table lookupTableAtTarget(Table sourceTable) { } } } - targetTables.put(tableNameKey, table); + putTableInCache(tableNameKey, table); } } catch (SqlException sqle) { // TODO: is there really a "does not exist" exception or should this be removed? copied from AbstractJdbcDdlReader.readTable() @@ -1194,6 +1194,18 @@ protected Table lookupTableAtTarget(Table sourceTable) { return table; } + protected String getTableKey(Table table) { + return table.getTableKey(); + } + + protected Table lookupTableFromCache(Table sourceTable, String tableKey) { + return targetTables.get(tableKey); + } + + protected void putTableInCache(String tableKey, Table table) { + targetTables.put(tableKey, table); + } + public DmlStatement getCurrentDmlStatement() { return currentDmlStatement; }