Skip to content

Commit

Permalink
0004312: Routing data gap detection option for Postgres to use earliest
Browse files Browse the repository at this point in the history
transaction time
  • Loading branch information
erilong committed Mar 16, 2020
1 parent 824dcec commit f24bbb1
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 1 deletion.
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.db.postgresql;

import java.sql.Types;
import java.util.Date;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
Expand Down Expand Up @@ -51,11 +52,23 @@ public class PostgreSqlSymmetricDialect extends AbstractSymmetricDialect impleme
" select count(*) from information_schema.routines " +
" where routine_name = '$(functionName)' and specific_schema = '$(defaultSchema)'" ;

static final String SQL_SELECT_TRANSACTIONS = "select min(a.xact_start) from pg_stat_activity a join pg_catalog.pg_locks l on l.pid = a.pid where l.mode = 'RowExclusiveLock'";

private Boolean supportsTransactionId = null;

public PostgreSqlSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new PostgreSqlTriggerTemplate(this);

if (parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW)) {
try {
getEarliestTransactionStartTime();
supportsTransactionViews = true;
log.info("Enabling use of transaction views for data gap detection.");
} catch (Exception ex) {
log.warn("Cannot enable use of transaction views for data gap detection.", ex);
}
}
}

@Override
Expand Down Expand Up @@ -230,6 +243,21 @@ public boolean supportsTransactionId() {
return supportsTransactionId;
}

@Override
public Date getEarliestTransactionStartTime() {
Date minStartTime = platform.getSqlTemplate().queryForObject(SQL_SELECT_TRANSACTIONS, Date.class);
if (minStartTime == null) {
minStartTime = new Date();
}
return minStartTime;
}

@Override
public boolean supportsTransactionViews() {
return supportsTransactionViews
&& parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW);
}

public void cleanDatabase() {
}

Expand Down
Expand Up @@ -208,7 +208,6 @@ private ParameterConstants() {
public final static String ROUTING_STALE_DATA_ID_GAP_TIME = "routing.stale.dataid.gap.time.ms";
public final static String ROUTING_STALE_GAP_BUSY_EXPIRE_TIME = "routing.stale.gap.busy.expire.time.ms";
public final static String ROUTING_LARGEST_GAP_SIZE = "routing.largest.gap.size";
// public final static String ROUTING_DATA_READER_TYPE_GAP_RETENTION_MINUTES = "routing.data.reader.type.gap.retention.period.minutes";
public final static String ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED = "routing.data.reader.order.by.gap.id.enabled";
public final static String ROUTING_DATA_READER_INTO_MEMORY_ENABLED = "routing.data.reader.into.memory.enabled";
public final static String ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY = "routing.data.reader.threshold.gaps.to.use.greater.than.query";
Expand All @@ -220,6 +219,8 @@ private ParameterConstants() {
public final static String ROUTING_MAX_GAP_CHANGES = "routing.max.gap.changes";
public final static String ROUTING_USE_COMMON_GROUPS = "routing.use.common.groups";
public final static String ROUTING_USE_NON_COMMON_FOR_INCOMING = "routing.use.non.common.for.incoming";
public final static String ROUTING_GAPS_USE_TRANSACTION_VIEW = "routing.gaps.use.transaction.view";
public final static String ROUTING_GAPS_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "routing.gaps.transaction.view.clock.sync.threshold";

public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
@Deprecated
Expand Down
Expand Up @@ -163,6 +163,7 @@ protected void reset() {
if (date != null) {
earliestTransactionTime = date.getTime() - parameterService.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000);
log.debug("Earliest transaction time is {}", earliestTransactionTime);
}
routingStartTime = symmetricDialect.getDatabaseTime();
}
Expand Down
19 changes: 19 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -1325,6 +1325,25 @@ routing.largest.gap.size=50000000
# Type: boolean
routing.data.reader.order.by.gap.id.enabled=true

# Find the earliest starting time of any transaction in the database, and expire any
# data gaps that were created before that time. When enabled, it can expire gaps
# sooner in some cases (before the routing.stale.dataid.gap.time.ms timeout),
# and in other cases, it can prevent expiring gaps that reach the timeout when there are still open transactions.
#
# DatabaseOverridable: false
# Tags: routing
# Type: boolean
routing.gaps.use.transaction.view=false

# When using the earliest transaction time to expire gaps (routing.gaps.use.transaction.view=true),
# subtract the given number of milliseconds from the transaction time.
# This may be needed in a clustered environment where the time on each database master is not perfectly in sync.
#
# DatabaseOverridable: true
# Tags: routing
# Type: integer
routing.gaps.transaction.view.clock.sync.threshold=10000

# Router will read all unrouted data into memory, then perform sorting.
# Enable this option if sorting is expensive for the database to perform.
#
Expand Down

0 comments on commit f24bbb1

Please sign in to comment.