From f24bbb169929a44a9eb75ebf720a8758b0ef0ae3 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Mon, 16 Mar 2020 15:49:28 -0400 Subject: [PATCH] 0004312: Routing data gap detection option for Postgres to use earliest transaction time --- .../PostgreSqlSymmetricDialect.java | 28 +++++++++++++++++++ .../symmetric/common/ParameterConstants.java | 3 +- .../symmetric/route/DataGapFastDetector.java | 1 + .../resources/symmetric-default.properties | 19 +++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlSymmetricDialect.java index 6ba5aed5bd..4567370d48 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlSymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/postgresql/PostgreSqlSymmetricDialect.java @@ -21,6 +21,7 @@ package org.jumpmind.symmetric.db.postgresql; import java.sql.Types; +import java.util.Date; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; @@ -51,11 +52,23 @@ public class PostgreSqlSymmetricDialect extends AbstractSymmetricDialect impleme " select count(*) from information_schema.routines " + " where routine_name = '$(functionName)' and specific_schema = '$(defaultSchema)'" ; + static final String SQL_SELECT_TRANSACTIONS = "select min(a.xact_start) from pg_stat_activity a join pg_catalog.pg_locks l on l.pid = a.pid where l.mode = 'RowExclusiveLock'"; + private Boolean supportsTransactionId = null; public PostgreSqlSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) { super(parameterService, platform); this.triggerTemplate = new PostgreSqlTriggerTemplate(this); + + if (parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW)) { + try { + getEarliestTransactionStartTime(); + supportsTransactionViews = true; + log.info("Enabling use of transaction views for data gap detection."); + } catch (Exception ex) { + log.warn("Cannot enable use of transaction views for data gap detection.", ex); + } + } } @Override @@ -230,6 +243,21 @@ public boolean supportsTransactionId() { return supportsTransactionId; } + @Override + public Date getEarliestTransactionStartTime() { + Date minStartTime = platform.getSqlTemplate().queryForObject(SQL_SELECT_TRANSACTIONS, Date.class); + if (minStartTime == null) { + minStartTime = new Date(); + } + return minStartTime; + } + + @Override + public boolean supportsTransactionViews() { + return supportsTransactionViews + && parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW); + } + public void cleanDatabase() { } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index a59a033dbd..7890091b3a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -208,7 +208,6 @@ private ParameterConstants() { public final static String ROUTING_STALE_DATA_ID_GAP_TIME = "routing.stale.dataid.gap.time.ms"; public final static String ROUTING_STALE_GAP_BUSY_EXPIRE_TIME = "routing.stale.gap.busy.expire.time.ms"; public final static String ROUTING_LARGEST_GAP_SIZE = "routing.largest.gap.size"; -// public final static String ROUTING_DATA_READER_TYPE_GAP_RETENTION_MINUTES = "routing.data.reader.type.gap.retention.period.minutes"; public final static String ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED = "routing.data.reader.order.by.gap.id.enabled"; public final static String ROUTING_DATA_READER_INTO_MEMORY_ENABLED = "routing.data.reader.into.memory.enabled"; public final static String ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY = "routing.data.reader.threshold.gaps.to.use.greater.than.query"; @@ -220,6 +219,8 @@ private ParameterConstants() { public final static String ROUTING_MAX_GAP_CHANGES = "routing.max.gap.changes"; public final static String ROUTING_USE_COMMON_GROUPS = "routing.use.common.groups"; public final static String ROUTING_USE_NON_COMMON_FOR_INCOMING = "routing.use.non.common.for.incoming"; + public final static String ROUTING_GAPS_USE_TRANSACTION_VIEW = "routing.gaps.use.transaction.view"; + public final static String ROUTING_GAPS_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "routing.gaps.transaction.view.clock.sync.threshold"; public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates"; @Deprecated diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java index 98dd324ae8..70bf936926 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java @@ -163,6 +163,7 @@ protected void reset() { if (date != null) { earliestTransactionTime = date.getTime() - parameterService.getLong( ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000); + log.debug("Earliest transaction time is {}", earliestTransactionTime); } routingStartTime = symmetricDialect.getDatabaseTime(); } diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index dffc63993f..8a2370ef4b 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -1325,6 +1325,25 @@ routing.largest.gap.size=50000000 # Type: boolean routing.data.reader.order.by.gap.id.enabled=true +# Find the earliest starting time of any transaction in the database, and expire any +# data gaps that were created before that time. When enabled, it can expire gaps +# sooner in some cases (before the routing.stale.dataid.gap.time.ms timeout), +# and in other cases, it can prevent expiring gaps that reach the timeout when there are still open transactions. +# +# DatabaseOverridable: false +# Tags: routing +# Type: boolean +routing.gaps.use.transaction.view=false + +# When using the earliest transaction time to expire gaps (routing.gaps.use.transaction.view=true), +# subtract the given number of milliseconds from the transaction time. +# This may be needed in a clustered environment where the time on each database master is not perfectly in sync. +# +# DatabaseOverridable: true +# Tags: routing +# Type: integer +routing.gaps.transaction.view.clock.sync.threshold=10000 + # Router will read all unrouted data into memory, then perform sorting. # Enable this option if sorting is expensive for the database to perform. #