Skip to content

Commit

Permalink
0004843: Push and pull errors while node registration is pending
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 18, 2021
1 parent b523c61 commit c9f97b0
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.jumpmind.exception.InvalidRetryException;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
Expand Down Expand Up @@ -124,6 +123,7 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.RegistrationNotOpenException;
import org.jumpmind.symmetric.service.RegistrationPendingException;
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
import org.jumpmind.symmetric.statistic.IStatisticManager;
Expand Down Expand Up @@ -427,7 +427,8 @@ public void loadDataFromPush(Node sourceNode, String queue, InputStream in, Outp
throw new RuntimeException(e);
}
} else {
throw new SymmetricException("Could not load data because the node is not registered");
log.info("Could not load data because the node is not registered");
throw new RegistrationPendingException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,15 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
""
+ "inner join $(node_group_link) d on "
+ " c.node_group_id = d.source_node_group_id where d.target_node_group_id = ? and "
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) ");
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) "
+ "and c.sync_enabled = 1");

putSql("findNodesWhoITargetSql",
""
+ "inner join $(node_group_link) d on "
+ " c.node_group_id = d.target_node_group_id where d.source_node_group_id = ? and "
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) ");
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) "
+ "and c.sync_enabled = 1");

putSql("selectNodeHostPrefixSql",
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ synchronized public RemoteNodeStatuses pushData(boolean force) {
}
}
} else {
Node doubleCheckidentity = nodeService.findIdentity(false);
if (doubleCheckidentity != null) {
log.error("Could not find a node security row for '{}'. A node needs a matching security row in both the local and remote nodes if it is going to authenticate to push data",
identity.getNodeId());
identity = nodeService.findIdentity(false);
if (identity != null) {
if (nodeService.findNodeSecurity(identity.getNodeId(), false) == null) {
log.error("Could not find my node security row, which is needed to authenticate as node {}", identity.getNodeId());
}
}
}
}
Expand Down Expand Up @@ -232,7 +233,8 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
if (isRegistrationRequired(ex) && !parameterService.isRegistrationServer() && parameterService.isRemoteNodeRegistrationServer(remote)) {
log.info("Removing identity because registration is required");
nodeService.deleteIdentity();
nodeService.findIdentity(false);
nodeService.deleteNodeSecurity(identity.getNodeId());
nodeService.deleteNode(identity.getNodeId(), false);
}
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.RegistrationPendingException;
import org.jumpmind.symmetric.statistic.IStatisticManager;

/**
Expand Down Expand Up @@ -81,6 +82,8 @@ protected int push(String sourceNodeId, String channelId, InputStream inputStrea
try {
Node sourceNode = nodeService.findNode(sourceNodeId, true);
dataLoaderService.loadDataFromPush(sourceNode, channelId, inputStream, outputStream);
} catch (RegistrationPendingException e) {
return WebConstants.SC_SERVICE_BUSY;
} finally {
statisticManager.incrementNodesPushed(1);
statisticManager.incrementTotalNodesPushedTime(System.currentTimeMillis() - ts);
Expand Down

0 comments on commit c9f97b0

Please sign in to comment.