Skip to content

Commit

Permalink
0003894: Sync Triggers doesn't run after registration / initial load …
Browse files Browse the repository at this point in the history
…when cluster.lock.enabled=true (3.8)
  • Loading branch information
mmichalek committed Mar 27, 2019
1 parent 9d553b6 commit cc0b472
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Expand Up @@ -325,6 +325,7 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
if (context.get(CTX_KEY_RESYNC_NEEDED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers because new configuration came through the data loader");
engine.getClusterService().refreshLockEntries(); // Needed in case cluster.lock.enabled changed during config change.
engine.getTriggerRouterService().syncTriggers();
context.remove(CTX_KEY_RESYNC_NEEDED);
engine.getRegistrationService().setAllowClientRegistration(true);
Expand Down
Expand Up @@ -31,7 +31,9 @@
*/
public interface IClusterService {

public void init();
public void init();

public void refreshLockEntries();

public boolean lock(String action);

Expand Down
Expand Up @@ -74,11 +74,11 @@ public class ClusterService extends AbstractService implements IClusterService {
PURGE_STATISTICS, SYNCTRIGGERS, PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL,
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR,
SYNC_CONFIG };

private static final String[] sharedActions = new String[] { FILE_SYNC_SHARED };

private String serverId = null;

private Map<String, Lock> lockCache;

public ClusterService(IParameterService parameterService, ISymmetricDialect dialect) {
Expand All @@ -91,23 +91,29 @@ public ClusterService(IParameterService parameterService, ISymmetricDialect dial
public void init() {
if (isClusteringEnabled()) {
sqlTemplate.update(getSql("initLockSql"), new Object[] { getServerId() });
refreshLockEntries();
}
}

@Override
public void refreshLockEntries() {
if (isClusteringEnabled()) {
Map<String, Lock> allLocks = findLocks();

for (String action : actions) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_CLUSTER);
}
}

for (String action : sharedActions) {
if (allLocks.get(action) == null) {
initLockTable(action, TYPE_SHARED);
}
}
}
}
}

@Override
public synchronized void persistToTableForSnapshot() {
sqlTemplate.update(getSql("deleteSql"));
Expand All @@ -116,7 +122,7 @@ public synchronized void persistToTableForSnapshot() {
insertLock(lock);
}
}

protected void insertLock(Lock lock) {
sqlTemplate.update(getSql("insertCompleteLockSql"), lock.getLockAction(), lock.getLockType(), lock.getLockingServerId(), lock.getLockTime(), lock.getSharedCount(), lock.isSharedEnable() ? 1 : 0, lock.getLastLockTime(), lock.getLastLockingServerId());
}
Expand All @@ -125,7 +131,7 @@ protected void insertLock(Lock lock) {
protected void initLockTable(final String action) {
initLockTable(action, TYPE_CLUSTER);
}

protected void initLockTable(final String action, final String lockType) {
try {
sqlTemplate.update(getSql("insertLockSql"), new Object[] { action, lockType });
Expand Down Expand Up @@ -346,7 +352,7 @@ public String getServerId() {
serverId = "unknown";
}
}

log.info("This node picked a server id of {}", serverId);
}
return serverId;
Expand Down

0 comments on commit cc0b472

Please sign in to comment.