Skip to content

Commit

Permalink
0005178: Push registration to nodes when group links indicate push
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 29, 2021
1 parent 97af697 commit fc9ecd2
Show file tree
Hide file tree
Showing 21 changed files with 329 additions and 80 deletions.
Expand Up @@ -319,7 +319,7 @@ protected void init() {
this.registrationService = new RegistrationService(this);
this.acknowledgeService = new AcknowledgeService(this);
this.pushService = new PushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, transportManager, nodeService,
dataExtractorService, acknowledgeService, registrationService, transportManager, nodeService,
clusterService, nodeCommunicationService, statisticManager, configurationService, extensionService);
this.pullService = new PullService(parameterService, symmetricDialect,
nodeService, dataLoaderService, registrationService, clusterService, nodeCommunicationService,
Expand Down
Expand Up @@ -294,6 +294,9 @@ public boolean isVersionGreaterThanOrEqualTo(int... targetVersion) {
return true;
}
int[] currentVersion = getSymmetricVersionParts();
if (currentVersion == null) {
return false;
}
for (int i = 0; i < currentVersion.length; i++) {
int j = currentVersion[i];
if (targetVersion.length > i) {
Expand Down
Expand Up @@ -21,11 +21,14 @@
package org.jumpmind.symmetric.service;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.RegistrationRequest;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;

/**
* This service provides an API that deals with {@link Node} registration
Expand Down Expand Up @@ -75,6 +78,8 @@ public boolean registerNode(Node node, String remoteHost, String remoteAddress,

public boolean isRegistrationOpen(String nodeGroupId, String externalId);

public boolean isRegistrationOpen();

/**
* Re-open registration for a single node that already exists in the database. A new password is generated and the registration_enabled flag is turned on.
* The next node to try registering for this node group and external ID will be given this information.
Expand All @@ -98,6 +103,12 @@ public boolean registerNode(Node node, String remoteHost, String remoteAddress,
*/
public boolean registerWithServer();

/**
* Server method which attempts to register using the registration URL of a client node using a push to send configuration.
* Returns configuration batch sent with its status.
*/
public List<OutgoingBatch> registerWithClient(Node remote, IOutgoingWithResponseTransport transport);

/**
* Client method which attempts to register with the registration.url to pull configuration if the node has not already been registered. Returns true if
* registered successfully
Expand All @@ -122,4 +133,14 @@ public boolean registerNode(Node node, String remoteHost, String remoteAddress,
public void requestNodeCopy();

public void setAllowClientRegistration(boolean enabled);

/**
* When server pushes to client asking to register it, the client responds with its registration request properties
*/
public boolean writeRegistrationProperties(OutputStream os);

/**
* When server pushes to client asking to register it, the client loads the configuration batch and returns an acknowledgement
*/
public boolean loadRegistrationBatch(Node node, InputStream is, OutputStream os);
}
Expand Up @@ -126,7 +126,6 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.ITransformService;
import org.jumpmind.symmetric.service.RegistrationNotOpenException;
import org.jumpmind.symmetric.service.RegistrationPendingException;
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
import org.jumpmind.symmetric.statistic.IStatisticManager;
Expand Down Expand Up @@ -382,17 +381,25 @@ public void loadDataFromPush(Node sourceNode, InputStream in, OutputStream out)
public void loadDataFromPush(Node sourceNode, String queue, InputStream in, OutputStream out)
throws IOException {
Node local = nodeService.findIdentity();
if (local != null && local.getNodeId() != null && sourceNode != null && sourceNode.getNodeId() != null) {
if (sourceNode != null && sourceNode.getNodeId() != null) {
ProcessInfo transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(sourceNode
.getNodeId(), queue, local.getNodeId(), PUSH_HANDLER_TRANSFER));
.getNodeId(), queue, local != null ? local.getNodeId() : null, PUSH_HANDLER_TRANSFER));
try {
List<IncomingBatch> batchList = loadDataFromTransport(transferInfo, sourceNode,
new InternalIncomingTransport(in), out);
logDataReceivedFromPush(sourceNode, batchList, transferInfo);
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
transferInfo.setStatus(ProcessInfo.ProcessStatus.ACKING);
transportManager.writeAcknowledgement(out, sourceNode, batchList, local,
security != null ? security.getNodePassword() : null);
if (local == null) {
local = nodeService.findIdentity(false);
}
if (local != null && local.getNodeId() != null) {
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
transferInfo.setStatus(ProcessInfo.ProcessStatus.ACKING);
transportManager.writeAcknowledgement(out, sourceNode, batchList, local,
security != null ? security.getNodePassword() : null);
} else {
log.info("Could not load data because the node is not registered");
throw new RegistrationRequiredException();
}
transferInfo.setStatus(ProcessInfo.ProcessStatus.OK);
purgeLoadBatchesFromStaging(batchList);
} catch (Exception e) {
Expand All @@ -404,9 +411,6 @@ public void loadDataFromPush(Node sourceNode, String queue, InputStream in, Outp
}
throw new RuntimeException(e);
}
} else {
log.info("Could not load data because the node is not registered");
throw new RegistrationPendingException();
}
}

Expand Down
Expand Up @@ -106,13 +106,15 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ "inner join $(node_group_link) d on "
+ " c.node_group_id = d.source_node_group_id where d.target_node_group_id = ? and "
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) "
+ "and c.sync_enabled = 1");
+ "and (c.sync_enabled = 1 or c.node_id in (select node_id from $(node_security) where registration_enabled = 1 "
+ "and created_at_node_id = (select node_id from $(node_identity))))");
putSql("findNodesWhoITargetSql",
""
+ "inner join $(node_group_link) d on "
+ " c.node_group_id = d.target_node_group_id where d.source_node_group_id = ? and "
+ " (d.data_event_action = ? or d.is_reversible = 1) and c.node_id not in (select node_id from $(node_identity)) "
+ "and c.sync_enabled = 1");
+ "and (c.sync_enabled = 1 or c.node_id in (select node_id from $(node_security) where registration_enabled = 1 "
+ "and created_at_node_id = (select node_id from $(node_identity))))");
putSql("selectNodeHostPrefixSql",
""
+ "select node_id, host_name, instance_id, ip_address, os_user, os_name, os_arch, os_version, available_processors, "
Expand Down
Expand Up @@ -88,22 +88,22 @@ synchronized public RemoteNodeStatuses pullData(boolean force) {
int availableThreads = nodeCommunicationService.getAvailableThreads(CommunicationType.PULL);
boolean m2mLoadInProgress = configurationService.isMasterToMaster() && nodeService.isDataLoadStarted();
for (NodeCommunication nodeCommunication : nodes) {
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeCommunication.getNodeId(), true);
boolean meetsMinimumTime = true;
if (minimumPeriodMs > 0 && nodeCommunication.getLastLockTime() != null &&
(System.currentTimeMillis() - nodeCommunication.getLastLockTime().getTime()) < minimumPeriodMs) {
meetsMinimumTime = false;
}
boolean m2mLockout = false;
if (m2mLoadInProgress) {
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeCommunication.getNodeId(), true);
m2mLockout = identity.getCreatedAtNodeId() != null && "registration".equals(nodeSecurity.getInitialLoadCreateBy()) &&
!identity.getCreatedAtNodeId().equals(nodeCommunication.getNodeId());
if (m2mLockout) {
log.debug("Not pulling from node {} until initial load from {} is complete", nodeCommunication.getNodeId(), identity
.getCreatedAtNodeId());
}
}
if (availableThreads > 0 && meetsMinimumTime && !m2mLockout) {
if (availableThreads > 0 && meetsMinimumTime && !m2mLockout && nodeSecurity != null && !nodeSecurity.isRegistrationEnabled()) {
if (nodeCommunicationService.execute(nodeCommunication, statuses, this)) {
availableThreads--;
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPushService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
Expand All @@ -62,6 +63,7 @@
public class PushService extends AbstractOfflineDetectorService implements IPushService, INodeCommunicationExecutor {
private IDataExtractorService dataExtractorService;
private IAcknowledgeService acknowledgeService;
private IRegistrationService registrationService;
private ITransportManager transportManager;
private INodeService nodeService;
private IClusterService clusterService;
Expand All @@ -71,13 +73,14 @@ public class PushService extends AbstractOfflineDetectorService implements IPush
private Map<String, Date> startTimesOfNodesBeingPushedTo = new HashMap<String, Date>();

public PushService(IParameterService parameterService, ISymmetricDialect symmetricDialect,
IDataExtractorService dataExtractorService, IAcknowledgeService acknowledgeService,
IDataExtractorService dataExtractorService, IAcknowledgeService acknowledgeService, IRegistrationService registrationService,
ITransportManager transportManager, INodeService nodeService,
IClusterService clusterService, INodeCommunicationService nodeCommunicationService, IStatisticManager statisticManager,
IConfigurationService configrationService, IExtensionService extensionService) {
super(parameterService, symmetricDialect, extensionService);
this.dataExtractorService = dataExtractorService;
this.acknowledgeService = acknowledgeService;
this.registrationService = registrationService;
this.transportManager = transportManager;
this.nodeService = nodeService;
this.clusterService = clusterService;
Expand Down Expand Up @@ -182,22 +185,31 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
startTimesOfNodesBeingPushedTo.remove(node.getNodeId());
}
} else {
log.warn("Cannot push to node '{}' in the group '{}'. The sync url is blank", node.getNodeId(), node.getNodeGroupId());
log.debug("Cannot push to node '{}' in the group '{}'. The sync url is blank", node.getNodeId(), node.getNodeGroupId());
}
}

private void pushToNode(Node remote, RemoteNodeStatus status) {
Node identity = nodeService.findIdentity();
NodeSecurity identitySecurity = nodeService.findNodeSecurity(identity.getNodeId(), true);
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(remote.getNodeId(), true);
IOutgoingWithResponseTransport transport = null;
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity
.getNodeId(), status.getQueue(), remote.getNodeId(), ProcessType.PUSH_JOB_EXTRACT));
Map<String, String> requestProperties = new HashMap<String, String>();
requestProperties.put(WebConstants.CHANNEL_QUEUE, status.getQueue());
try {
transport = transportManager.getPushTransport(remote, identity,
identitySecurity.getNodePassword(), requestProperties, parameterService.getRegistrationUrl());
List<OutgoingBatch> extractedBatches = dataExtractorService.extract(processInfo, remote, status.getQueue(), transport);
List<OutgoingBatch> extractedBatches = null;
if (nodeSecurity != null && nodeSecurity.isRegistrationEnabled()) {
if (identity.getNodeId().equals(nodeSecurity.getCreatedAtNodeId())) {
transport = transportManager.getRegisterPushTransport(remote, identity);
extractedBatches = registrationService.registerWithClient(remote, transport);
}
} else {
transport = transportManager.getPushTransport(remote, identity,
identitySecurity.getNodePassword(), requestProperties, parameterService.getRegistrationUrl());
extractedBatches = dataExtractorService.extract(processInfo, remote, status.getQueue(), transport);
}
if (extractedBatches.size() > 0) {
log.info("Push data sent to {}", remote);
List<BatchAck> batchAcks = readAcks(extractedBatches, transport, transportManager, acknowledgeService, dataExtractorService);
Expand All @@ -212,11 +224,16 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
} catch (Exception ex) {
processInfo.setStatus(ProcessStatus.ERROR);
fireOffline(ex, remote, status);
if (isRegistrationRequired(ex) && !parameterService.isRegistrationServer() && parameterService.isRemoteNodeRegistrationServer(remote)) {
log.info("Removing identity because registration is required");
nodeService.deleteIdentity();
nodeService.deleteNodeSecurity(identity.getNodeId());
nodeService.deleteNode(identity.getNodeId(), false);
if (isRegistrationRequired(ex)) {
if (identity.getNodeId().equals(remote.getCreatedAtNodeId())) {
log.info("Re-opening registration for {} because registration is required", remote);
registrationService.reOpenRegistration(remote.getNodeId());
} else if (!parameterService.isRegistrationServer() && parameterService.isRemoteNodeRegistrationServer(remote)) {
log.info("Removing identity because registration is required");
nodeService.deleteIdentity();
nodeService.deleteNodeSecurity(identity.getNodeId());
nodeService.deleteNode(identity.getNodeId(), false);
}
}
} finally {
try {
Expand Down

0 comments on commit fc9ecd2

Please sign in to comment.