From 32390d71eefcf8b78fba7e5368664321fb0d9200 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Fri, 20 Jan 2023 08:46:34 -0500 Subject: [PATCH] 0005662: Snapshot util too slow for large multi-tenant deployment --- .../jumpmind/symmetric/util/SnapshotUtil.java | 17 +++++++++++++++++ .../symmetric/common/ParameterConstants.java | 1 + .../main/resources/symmetric-default.properties | 8 ++++++++ 3 files changed, 26 insertions(+) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java index c7051bf936..a4f29165ae 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/util/SnapshotUtil.java @@ -45,10 +45,12 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.TimeZone; import javax.management.MBeanServer; @@ -873,13 +875,28 @@ public static HashMap> getTablesForCaptureByCatalogSc ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); List triggerHistories = triggerRouterService.getActiveTriggerHistories(); String tablePrefix = engine.getTablePrefix().toUpperCase(); + Set triggerIds = new HashSet(); + boolean isClonedTables = engine.getParameterService().is("sync.triggers.expand.table.clone", true); + long timeoutMillis = engine.getParameterService().getLong(ParameterConstants.SNAPSHOT_OPERATION_TIMEOUT_MS, 30000); + long ts = System.currentTimeMillis(); for (TriggerHistory triggerHistory : triggerHistories) { if (!triggerHistory.getSourceTableName().toUpperCase().startsWith(tablePrefix)) { + if (isClonedTables && !triggerIds.add(triggerHistory.getTriggerId())) { + Trigger trigger = triggerRouterService.getTriggerById(triggerHistory.getTriggerId(), false); + if (trigger != null && trigger.getSourceTableName().contains("$(targetExternalId)")) { + // for multi-tenant database where the same table is repeated for each node, just need one definition + continue; + } + } Table table = targetPlatform.getTableFromCache(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName(), false); if (table != null) { addTableToMap(catalogSchemas, new CatalogSchema(table.getCatalog(), table.getSchema()), table); } + if (System.currentTimeMillis() - ts > timeoutMillis) { + log.info("Reached time limit for table definitions"); + break; + } } } return catalogSchemas; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index ce1fec532c..a8822dbea3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -462,6 +462,7 @@ private ParameterConstants() { public final static String SNAPSHOT_MAX_FILES = "snapshot.max.files"; public final static String SNAPSHOT_MAX_BATCHES = "snapshot.max.batches"; public final static String SNAPSHOT_MAX_NODE_CHANNELS = "snapshot.max.node.channels"; + public final static String SNAPSHOT_OPERATION_TIMEOUT_MS = "snapshot.operation.timeout.ms"; public final static String REDSHIFT_APPEND_TO_COPY_COMMAND = "redshift.append.to.copy.command"; public final static String REDSHIFT_BULK_LOAD_MAX_ROWS_BEFORE_FLUSH = "redshift.bulk.load.max.rows.before.flush"; public final static String REDSHIFT_BULK_LOAD_MAX_BYTES_BEFORE_FLUSH = "redshift.bulk.load.max.bytes.before.flush"; diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index a415f120b5..2118a600eb 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -3220,6 +3220,14 @@ snapshot.max.batches=10000 # Type: integer snapshot.max.node.channels=5000 +# Max time for a snapshot operation to complete, such as gathering table definitions, +# before it will be interrupted so the snapshot completes in a reasonable amount of time. +# +# DatabaseOverridable: true +# Tags: other +# Type: integer +snapshot.operation.timeout.ms=30000 + # Log Miner job to find changes from a database archive log # # DatabaseOverridable: false