Skip to content

Commit

Permalink
0002969: Support offline mode for file sync
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Jan 24, 2017
1 parent 4bbce74 commit b2092c8
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 28 deletions.
Expand Up @@ -69,7 +69,9 @@ private Constants() {

public static final String PLEASE_SET_ME = "please set me";

public static final String DEPLOYMENT_TYPE_PROFESSIONAL = "professional";
public static final String DEPLOYMENT_TYPE_PROFESSIONAL = "professional";

public static final String DEPLOYMENT_TYPE_CCLIENT = "cclient";

public static final String DEPLOYMENT_TYPE_REST = "rest";

Expand Down
Expand Up @@ -35,13 +35,13 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.model.FileConflictStrategy;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.FileTrigger;
Expand Down Expand Up @@ -282,7 +282,7 @@ protected boolean isCClient(String nodeId) {
boolean cclient = false;
Node node = nodeService.findNode(nodeId, true);
if (node != null) {
cclient = StringUtils.equals(node.getDeploymentType(), "cclient");
cclient = StringUtils.equals(node.getDeploymentType(), Constants.DEPLOYMENT_TYPE_CCLIENT);
}
return cclient;
}
Expand Down
Expand Up @@ -28,10 +28,11 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT;
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT, OFF_FSPULL, OFF_FSPUSH;

public static boolean isPullType(CommunicationType communicationType) {
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL;
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL
|| communicationType == OFFLN_PULL || communicationType == CommunicationType.OFF_FSPULL;
}
};

Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.AbstractSymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -90,7 +91,10 @@
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.transport.NoContentException;
import org.jumpmind.symmetric.transport.file.FileIncomingTransport;
import org.jumpmind.symmetric.transport.file.FileOutgoingTransport;
import org.jumpmind.util.AppUtils;

import bsh.EvalError;
Expand Down Expand Up @@ -435,15 +439,20 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
}

synchronized public RemoteNodeStatuses pullFilesFromNodes(boolean force) {
CommunicationType communicationType = engine.getParameterService().is(ParameterConstants.NODE_OFFLINE) ?
CommunicationType.OFF_FSPULL : CommunicationType.FILE_PULL;

return queueJob(force,
parameterService.getLong(ParameterConstants.FILE_PULL_MINIMUM_PERIOD_MS, -1),
ClusterConstants.FILE_SYNC_PULL, CommunicationType.FILE_PULL);
ClusterConstants.FILE_SYNC_PULL, communicationType);
}

synchronized public RemoteNodeStatuses pushFilesToNodes(boolean force) {
CommunicationType communicationType = engine.getParameterService().is(ParameterConstants.NODE_OFFLINE) ?
CommunicationType.OFF_FSPUSH : CommunicationType.FILE_PUSH;
return queueJob(force,
parameterService.getLong(ParameterConstants.FILE_PUSH_MINIMUM_PERIOD_MS, -1),
ClusterConstants.FILE_SYNC_PUSH, CommunicationType.FILE_PUSH);
ClusterConstants.FILE_SYNC_PUSH, communicationType);
}

@Override
Expand Down Expand Up @@ -755,9 +764,11 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
if (identity != null) {
NodeSecurity security = engine.getNodeService().findNodeSecurity(identity.getNodeId(), true);
if (security != null) {
if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PULL) {
if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PULL
|| nodeCommunication.getCommunicationType() == CommunicationType.OFF_FSPULL) {
pullFilesFromNode(nodeCommunication, status, identity, security);
} else if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PUSH) {
} else if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PUSH
|| nodeCommunication.getCommunicationType() == CommunicationType.OFF_FSPUSH) {
pushFilesToNode(nodeCommunication, status, identity, security);
}
}
Expand All @@ -770,15 +781,29 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt
new ProcessInfoKey(nodeCommunication.getNodeId(), identity.getNodeId(),
ProcessType.FILE_SYNC_PUSH_JOB));
IOutgoingWithResponseTransport transport = null;
ITransportManager transportManager = null;
try {
transport = engine.getTransportManager().getFilePushTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(),
parameterService.getRegistrationUrl());

if (!engine.getParameterService().is(ParameterConstants.NODE_OFFLINE)) {
transportManager = engine.getTransportManager();
transport = transportManager.getFilePushTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(),
parameterService.getRegistrationUrl());
} else {
transportManager = ((AbstractSymmetricEngine)engine).getOfflineTransportManager();
transport = transportManager.getFilePushTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(),
parameterService.getRegistrationUrl());
}

List<OutgoingBatch> batches = sendFiles(processInfo, nodeCommunication.getNode(),
transport);
if (batches.size() > 0) {
if (transport instanceof FileOutgoingTransport) {
((FileOutgoingTransport) transport).setProcessedBatches(batches);
}
List<BatchAck> batchAcks = readAcks(batches, transport,
engine.getTransportManager(), engine.getAcknowledgeService());
transportManager, engine.getAcknowledgeService());
status.updateOutgoingStatus(batches, batchAcks);
}
if (!status.failed() && batches.size() > 0) {
Expand All @@ -793,12 +818,15 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt
} catch (Exception e) {
fireOffline(e, nodeCommunication.getNode(), status);
} finally {
if (transport != null) {
transport.close();
}
if (processInfo.getStatus() != ProcessInfo.Status.ERROR) {
processInfo.setStatus(ProcessInfo.Status.OK);
}
if (transport != null) {
transport.close();
if (transport instanceof FileOutgoingTransport) {
((FileOutgoingTransport) transport).complete(processInfo.getStatus() == ProcessInfo.Status.OK);
}
}
}
}

Expand Down Expand Up @@ -978,10 +1006,19 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode
ProcessType.FILE_SYNC_PULL_JOB));
try {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);

transport = engine.getTransportManager().getFilePullTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(), null,
parameterService.getRegistrationUrl());
ITransportManager transportManager;

if (!engine.getParameterService().is(ParameterConstants.NODE_OFFLINE)) {
transportManager = engine.getTransportManager();
transport = transportManager.getFilePullTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(), null,
parameterService.getRegistrationUrl());
} else {
transportManager = ((AbstractSymmetricEngine)engine).getOfflineTransportManager();
transport = transportManager.getFilePullTransport(
nodeCommunication.getNode(), identity, security.getNodePassword(), null,
parameterService.getRegistrationUrl());
}

List<IncomingBatch> batchesProcessed = processZip(transport.openStream(),
nodeCommunication.getNodeId(), processInfo);
Expand All @@ -990,7 +1027,7 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode
processInfo.setStatus(ProcessInfo.Status.ACKING);
status.updateIncomingStatus(batchesProcessed);
sendAck(nodeCommunication.getNode(), identity, security, batchesProcessed,
engine.getTransportManager());
transportManager);
}
if (!status.failed() && batchesProcessed.size() > 0) {
log.info("Pull files received from {}. {} files and {} batches were processed",
Expand All @@ -1008,13 +1045,14 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode
} finally {
if (transport != null) {
transport.close();
}

if (processInfo.getStatus() != ProcessInfo.Status.ERROR) {
processInfo.setStatus(ProcessInfo.Status.OK);
if (processInfo.getStatus() != ProcessInfo.Status.ERROR) {
processInfo.setStatus(ProcessInfo.Status.OK);
}
if (transport instanceof FileIncomingTransport) {
((FileIncomingTransport) transport).complete(!status.failed());
}
}
}

}

protected RemoteNodeStatuses queueJob(boolean force, long minimumPeriodMs, String clusterLock,
Expand Down
Expand Up @@ -178,9 +178,11 @@ public List<NodeCommunication> list(CommunicationType communicationType) {
nodesToCommunicateWith = removeOfflineNodes(nodeService.findNodesToPushTo());
break;
case OFFLN_PUSH:
case OFF_FSPUSH:
nodesToCommunicateWith = getNodesToCommunicateWithOffline(CommunicationType.PUSH);
break;
case OFFLN_PULL:
case OFF_FSPULL:
nodesToCommunicateWith = getNodesToCommunicateWithOffline(CommunicationType.PULL);
break;
default:
Expand Down Expand Up @@ -396,9 +398,11 @@ protected ThreadPoolExecutor getExecutor(final CommunicationType communicationTy
threadCountParameter = ParameterConstants.OFFLINE_PUSH_THREAD_COUNT_PER_SERVER;
break;
case FILE_PULL:
case OFF_FSPULL:
threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER;
break;
case FILE_PUSH:
case OFF_FSPUSH:
threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER;
break;
case FILE_XTRCT:
Expand Down Expand Up @@ -458,9 +462,11 @@ protected Date getLockTimeoutDate(CommunicationType communicationType) {
parameter = ParameterConstants.OFFLINE_PUSH_LOCK_TIMEOUT_MS;
break;
case FILE_PULL:
case OFF_FSPULL:
parameter = ParameterConstants.FILE_PULL_LOCK_TIMEOUT_MS;
break;
case FILE_PUSH:
case OFF_FSPUSH:
parameter = ParameterConstants.FILE_PUSH_LOCK_TIMEOUT_MS;
break;
case FILE_XTRCT:
Expand Down
Expand Up @@ -28,10 +28,14 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.BatchBufferedWriter;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
Expand All @@ -51,6 +55,10 @@ public class FileOutgoingTransport implements IOutgoingWithResponseTransport {

String outgoingDir;

private List<OutgoingBatch> processedBatches;

private String extension;

public FileOutgoingTransport(Node remoteNode, Node localNode, String outgoingDir) throws IOException {
this.outgoingDir = outgoingDir;
this.fileName = outgoingDir + File.separator + localNode.getNodeGroupId() + "-" + localNode.getNodeId() + "_to_" +
Expand Down Expand Up @@ -84,8 +92,17 @@ public OutputStream openStream() {

@Override
public BufferedReader readResponse() throws IOException {
List<Long> batchIds = new ArrayList<Long>();
if (processedBatches != null) {
for (OutgoingBatch processedBatch : processedBatches) {
batchIds.add(processedBatch.getBatchId());
}
} else if (writer != null) {
batchIds = writer.getBatchIds();
}

StringBuilder resp = new StringBuilder();
for (Long batchId : writer.getBatchIds()) {
for (Long batchId : batchIds) {
resp.append(WebConstants.ACK_BATCH_NAME).append(batchId).append("=").append(WebConstants.ACK_BATCH_OK).append("&");
resp.append(WebConstants.ACK_NODE_ID).append(batchId).append("=").append(remoteNode.getNodeId()).append("&");
}
Expand Down Expand Up @@ -118,4 +135,12 @@ public void complete(boolean success) {
new File(fileName + ".tmp").renameTo(new File(fileName + ".zip"));
}
}
}

public List<OutgoingBatch> getProcessedBatches() {
return processedBatches;
}

public void setProcessedBatches(List<OutgoingBatch> processedBatches) {
this.processedBatches = processedBatches;
}
}

0 comments on commit b2092c8

Please sign in to comment.