Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003894: Sync Triggers doesn't run after registration / initial load
when cluster.lock.enabled=true (3.8)
# Conflicts:
#	symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ClusterService.java
  • Loading branch information
mmichalek authored and erilong committed Apr 23, 2019
1 parent cd223cb commit 06ad211
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 6 deletions.
Expand Up @@ -327,6 +327,7 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
if (context.get(CTX_KEY_RESYNC_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers because new configuration came through the data loader");
engine.getClusterService().refreshLockEntries(); // Needed in case cluster.lock.enabled changed during config change.
engine.getTriggerRouterService().syncTriggers();
context.remove(CTX_KEY_RESYNC_NEEDED);
engine.getRegistrationService().setAllowClientRegistration(true);
Expand Down
Expand Up @@ -31,7 +31,9 @@
*/
public interface IClusterService {

public void init();
public void init();

public void refreshLockEntries();

public boolean lock(String action);

Expand Down
Expand Up @@ -90,6 +90,7 @@ public class ClusterService extends AbstractService implements IClusterService {
private static boolean isUpgradedInstanceId;

private String serverId = null;

private static String instanceId = null;

private INodeService nodeService;
Expand Down Expand Up @@ -122,20 +123,26 @@ public void init() {

if (isClusteringEnabled()) {
sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() });
refreshLockEntries();
}
}

@Override
public void refreshLockEntries() {
if (isClusteringEnabled()) {
Map<String, Lock> allLocks = findLocks();

for (String action : actions) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_CLUSTER);
}
}

for (String action : sharedActions) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_SHARED);
}
}
}
}
}

Expand Down Expand Up @@ -206,7 +213,7 @@ public synchronized void persistToTableForSnapshot() {
insertLock(lock);
}
}

protected void insertLock(Lock lock) {
sqlTemplate.update(getSql("insertCompleteLockSql"), lock.getLockAction(), lock.getLockType(), lock.getLockingServerId(), lock.getLockTime(), lock.getSharedCount(), lock.isSharedEnable() ? 1 : 0, lock.getLastLockTime(), lock.getLastLockingServerId());
}
Expand All @@ -215,7 +222,7 @@ protected void insertLock(Lock lock) {
protected void initLockTable(final String action) {
initLockTable(action, TYPE_CLUSTER);
}

protected void initLockTable(final String action, final String lockType) {
try {
sqlTemplate.update(getSql("insertLockSql"), new Object[] { action, lockType });
Expand Down Expand Up @@ -478,7 +485,7 @@ public String getServerId() {
serverId = "unknown";
}
}

log.info("This node picked a server id of {}", serverId);
}
return serverId;
Expand Down

0 comments on commit 06ad211

Please sign in to comment.