Skip to content

Commit

Permalink
0001857: Initial load is broken for the master to master sync scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Aug 1, 2014
1 parent efe5c9c commit 9de8c19
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
Expand Up @@ -25,6 +25,7 @@

import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.IBatchAlgorithm;
import org.jumpmind.symmetric.route.IDataRouter;
Expand All @@ -43,6 +44,8 @@ public interface IRouterService extends IService {

public long getUnroutedDataCount();

public List<NodeSecurity> findNodesThatAreReadyForInitialLoad();

public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData,
Node node, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter);

Expand Down
Expand Up @@ -90,8 +90,7 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " rev_initial_load_enabled, rev_initial_load_time, initial_load_id, "
+ " initial_load_create_by, rev_initial_load_id, rev_initial_load_create_by "
+ " from $(node_security) "
+ " where (initial_load_enabled=1 and created_at_node_id in (select node_id from sym_node_identity)) "
+ " or (rev_initial_load_enabled=1 and created_at_node_id not in (select node_id from sym_node_identity)) ");
+ " where initial_load_enabled=1 or rev_initial_load_enabled=1 ");

putSql("findAllNodeSecuritySql",
"select node_id, node_password, registration_enabled, registration_time, "
Expand Down
Expand Up @@ -77,6 +77,7 @@
import org.jumpmind.symmetric.route.SubSelectDataRouter;
import org.jumpmind.symmetric.route.TransactionalBatchAlgorithm;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.statistic.StatisticConstants;
Expand Down Expand Up @@ -197,8 +198,7 @@ protected void insertInitialLoadEvents() {
if (engine.getParameterService().isRegistrationServer() ||
(identitySecurity != null && !identitySecurity.isRegistrationEnabled()
&& identitySecurity.getRegistrationTime() != null)) {
List<NodeSecurity> nodeSecurities = nodeService
.findNodeSecurityWithLoadEnabled();
List<NodeSecurity> nodeSecurities = findNodesThatAreReadyForInitialLoad();
if (nodeSecurities != null) {
boolean reverseLoadFirst = parameterService
.is(ParameterConstants.INITIAL_LOAD_REVERSE_FIRST);
Expand All @@ -208,15 +208,11 @@ protected void insertInitialLoadEvents() {
boolean reverseLoadQueued = security
.isRevInitialLoadEnabled();
boolean initialLoadQueued = security.isInitialLoadEnabled();
boolean thisMySecurityRecord = security.getNodeId().equals(
identity.getNodeId());
boolean registered = security.getRegistrationTime() != null;
boolean parent = identity.getNodeId().equals(
security.getCreatedAtNodeId());
if (thisMySecurityRecord && reverseLoadQueued
if (reverseLoadQueued
&& (reverseLoadFirst || !initialLoadQueued)) {
sendReverseInitialLoad();
} else if (!thisMySecurityRecord && registered && parent
} else if (registered
&& initialLoadQueued
&& (!reverseLoadFirst || !reverseLoadQueued)) {
long ts = System.currentTimeMillis();
Expand Down Expand Up @@ -260,6 +256,23 @@ protected void insertInitialLoadEvents() {
log.info("Not attempting to insert reload events because sync trigger is currently running");
}
}

public List<NodeSecurity> findNodesThatAreReadyForInitialLoad() {
INodeService nodeService = engine.getNodeService();
IConfigurationService configurationService = engine.getConfigurationService();
String me = nodeService.findIdentityNodeId();
List<NodeSecurity> toReturn = new ArrayList<NodeSecurity>();
List<NodeSecurity> securities = nodeService.findNodeSecurityWithLoadEnabled();
for (NodeSecurity nodeSecurity : securities) {
if (!nodeSecurity.getNodeId().equals(me) &&
((me.equals(nodeSecurity.getCreatedAtNodeId()) && nodeSecurity.isInitialLoadEnabled()) ||
configurationService.isMasterToMaster() ||
(!me.equals(nodeSecurity.getCreatedAtNodeId()) && nodeSecurity.isRevInitialLoadEnabled()))) {
toReturn.add(nodeSecurity);
}
}
return toReturn;
}

protected void sendReverseInitialLoad() {
INodeService nodeService = engine.getNodeService();
Expand Down

0 comments on commit 9de8c19

Please sign in to comment.