From b2092c8664f85a5be524ae237ab3aefc831e2fc2 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Tue, 24 Jan 2017 14:11:13 -0500 Subject: [PATCH] 0002969: Support offline mode for file sync --- .../jumpmind/symmetric/common/Constants.java | 4 +- .../symmetric/file/FileSyncZipDataWriter.java | 4 +- .../symmetric/model/NodeCommunication.java | 5 +- .../service/impl/FileSyncService.java | 80 ++++++++++++++----- .../impl/NodeCommunicationService.java | 6 ++ .../transport/file/FileOutgoingTransport.java | 29 ++++++- 6 files changed, 100 insertions(+), 28 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java index ea79e32b78..e7ec9a7f01 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -69,7 +69,9 @@ private Constants() { public static final String PLEASE_SET_ME = "please set me"; - public static final String DEPLOYMENT_TYPE_PROFESSIONAL = "professional"; + public static final String DEPLOYMENT_TYPE_PROFESSIONAL = "professional"; + + public static final String DEPLOYMENT_TYPE_CCLIENT = "cclient"; public static final String DEPLOYMENT_TYPE_REST = "rest"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileSyncZipDataWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileSyncZipDataWriter.java index 7d143f2a68..68e506573a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileSyncZipDataWriter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/file/FileSyncZipDataWriter.java @@ -35,13 +35,13 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.db.model.Table; import org.jumpmind.exception.IoException; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.stage.IStagedResource; -import org.jumpmind.symmetric.model.FileConflictStrategy; import org.jumpmind.symmetric.model.FileSnapshot; import org.jumpmind.symmetric.model.FileSnapshot.LastEventType; import org.jumpmind.symmetric.model.FileTrigger; @@ -282,7 +282,7 @@ protected boolean isCClient(String nodeId) { boolean cclient = false; Node node = nodeService.findNode(nodeId, true); if (node != null) { - cclient = StringUtils.equals(node.getDeploymentType(), "cclient"); + cclient = StringUtils.equals(node.getDeploymentType(), Constants.DEPLOYMENT_TYPE_CCLIENT); } return cclient; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java index c4d64b811f..86d10f11ef 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeCommunication.java @@ -28,10 +28,11 @@ public class NodeCommunication implements Serializable { private static final long serialVersionUID = 1L; public enum CommunicationType { - PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT; + PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT, OFF_FSPULL, OFF_FSPUSH; public static boolean isPullType(CommunicationType communicationType) { - return communicationType == PULL || communicationType == CommunicationType.FILE_PULL || communicationType == OFFLN_PULL; + return communicationType == PULL || communicationType == CommunicationType.FILE_PULL + || communicationType == OFFLN_PULL || communicationType == CommunicationType.OFF_FSPULL; } }; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 7ea88b1d87..aa3c72bec2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -43,6 +43,7 @@ import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; import org.jumpmind.exception.IoException; +import org.jumpmind.symmetric.AbstractSymmetricEngine; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.common.Constants; @@ -90,7 +91,10 @@ import org.jumpmind.symmetric.transport.IIncomingTransport; import org.jumpmind.symmetric.transport.IOutgoingTransport; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; +import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.transport.NoContentException; +import org.jumpmind.symmetric.transport.file.FileIncomingTransport; +import org.jumpmind.symmetric.transport.file.FileOutgoingTransport; import org.jumpmind.util.AppUtils; import bsh.EvalError; @@ -435,15 +439,20 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) { } synchronized public RemoteNodeStatuses pullFilesFromNodes(boolean force) { + CommunicationType communicationType = engine.getParameterService().is(ParameterConstants.NODE_OFFLINE) ? + CommunicationType.OFF_FSPULL : CommunicationType.FILE_PULL; + return queueJob(force, parameterService.getLong(ParameterConstants.FILE_PULL_MINIMUM_PERIOD_MS, -1), - ClusterConstants.FILE_SYNC_PULL, CommunicationType.FILE_PULL); + ClusterConstants.FILE_SYNC_PULL, communicationType); } synchronized public RemoteNodeStatuses pushFilesToNodes(boolean force) { + CommunicationType communicationType = engine.getParameterService().is(ParameterConstants.NODE_OFFLINE) ? + CommunicationType.OFF_FSPUSH : CommunicationType.FILE_PUSH; return queueJob(force, parameterService.getLong(ParameterConstants.FILE_PUSH_MINIMUM_PERIOD_MS, -1), - ClusterConstants.FILE_SYNC_PUSH, CommunicationType.FILE_PUSH); + ClusterConstants.FILE_SYNC_PUSH, communicationType); } @Override @@ -755,9 +764,11 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status if (identity != null) { NodeSecurity security = engine.getNodeService().findNodeSecurity(identity.getNodeId(), true); if (security != null) { - if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PULL) { + if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PULL + || nodeCommunication.getCommunicationType() == CommunicationType.OFF_FSPULL) { pullFilesFromNode(nodeCommunication, status, identity, security); - } else if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PUSH) { + } else if (nodeCommunication.getCommunicationType() == CommunicationType.FILE_PUSH + || nodeCommunication.getCommunicationType() == CommunicationType.OFF_FSPUSH) { pushFilesToNode(nodeCommunication, status, identity, security); } } @@ -770,15 +781,29 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt new ProcessInfoKey(nodeCommunication.getNodeId(), identity.getNodeId(), ProcessType.FILE_SYNC_PUSH_JOB)); IOutgoingWithResponseTransport transport = null; + ITransportManager transportManager = null; try { - transport = engine.getTransportManager().getFilePushTransport( - nodeCommunication.getNode(), identity, security.getNodePassword(), - parameterService.getRegistrationUrl()); + + if (!engine.getParameterService().is(ParameterConstants.NODE_OFFLINE)) { + transportManager = engine.getTransportManager(); + transport = transportManager.getFilePushTransport( + nodeCommunication.getNode(), identity, security.getNodePassword(), + parameterService.getRegistrationUrl()); + } else { + transportManager = ((AbstractSymmetricEngine)engine).getOfflineTransportManager(); + transport = transportManager.getFilePushTransport( + nodeCommunication.getNode(), identity, security.getNodePassword(), + parameterService.getRegistrationUrl()); + } + List batches = sendFiles(processInfo, nodeCommunication.getNode(), transport); if (batches.size() > 0) { + if (transport instanceof FileOutgoingTransport) { + ((FileOutgoingTransport) transport).setProcessedBatches(batches); + } List batchAcks = readAcks(batches, transport, - engine.getTransportManager(), engine.getAcknowledgeService()); + transportManager, engine.getAcknowledgeService()); status.updateOutgoingStatus(batches, batchAcks); } if (!status.failed() && batches.size() > 0) { @@ -793,12 +818,15 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt } catch (Exception e) { fireOffline(e, nodeCommunication.getNode(), status); } finally { - if (transport != null) { - transport.close(); - } if (processInfo.getStatus() != ProcessInfo.Status.ERROR) { processInfo.setStatus(ProcessInfo.Status.OK); } + if (transport != null) { + transport.close(); + if (transport instanceof FileOutgoingTransport) { + ((FileOutgoingTransport) transport).complete(processInfo.getStatus() == ProcessInfo.Status.OK); + } + } } } @@ -978,10 +1006,19 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode ProcessType.FILE_SYNC_PULL_JOB)); try { processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); - - transport = engine.getTransportManager().getFilePullTransport( - nodeCommunication.getNode(), identity, security.getNodePassword(), null, - parameterService.getRegistrationUrl()); + ITransportManager transportManager; + + if (!engine.getParameterService().is(ParameterConstants.NODE_OFFLINE)) { + transportManager = engine.getTransportManager(); + transport = transportManager.getFilePullTransport( + nodeCommunication.getNode(), identity, security.getNodePassword(), null, + parameterService.getRegistrationUrl()); + } else { + transportManager = ((AbstractSymmetricEngine)engine).getOfflineTransportManager(); + transport = transportManager.getFilePullTransport( + nodeCommunication.getNode(), identity, security.getNodePassword(), null, + parameterService.getRegistrationUrl()); + } List batchesProcessed = processZip(transport.openStream(), nodeCommunication.getNodeId(), processInfo); @@ -990,7 +1027,7 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode processInfo.setStatus(ProcessInfo.Status.ACKING); status.updateIncomingStatus(batchesProcessed); sendAck(nodeCommunication.getNode(), identity, security, batchesProcessed, - engine.getTransportManager()); + transportManager); } if (!status.failed() && batchesProcessed.size() > 0) { log.info("Pull files received from {}. {} files and {} batches were processed", @@ -1008,13 +1045,14 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode } finally { if (transport != null) { transport.close(); - } - - if (processInfo.getStatus() != ProcessInfo.Status.ERROR) { - processInfo.setStatus(ProcessInfo.Status.OK); + if (processInfo.getStatus() != ProcessInfo.Status.ERROR) { + processInfo.setStatus(ProcessInfo.Status.OK); + } + if (transport instanceof FileIncomingTransport) { + ((FileIncomingTransport) transport).complete(!status.failed()); + } } } - } protected RemoteNodeStatuses queueJob(boolean force, long minimumPeriodMs, String clusterLock, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java index db4327ad1f..d5221f2a9e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java @@ -178,9 +178,11 @@ public List list(CommunicationType communicationType) { nodesToCommunicateWith = removeOfflineNodes(nodeService.findNodesToPushTo()); break; case OFFLN_PUSH: + case OFF_FSPUSH: nodesToCommunicateWith = getNodesToCommunicateWithOffline(CommunicationType.PUSH); break; case OFFLN_PULL: + case OFF_FSPULL: nodesToCommunicateWith = getNodesToCommunicateWithOffline(CommunicationType.PULL); break; default: @@ -396,9 +398,11 @@ protected ThreadPoolExecutor getExecutor(final CommunicationType communicationTy threadCountParameter = ParameterConstants.OFFLINE_PUSH_THREAD_COUNT_PER_SERVER; break; case FILE_PULL: + case OFF_FSPULL: threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER; break; case FILE_PUSH: + case OFF_FSPUSH: threadCountParameter = ParameterConstants.FILE_PUSH_THREAD_COUNT_PER_SERVER; break; case FILE_XTRCT: @@ -458,9 +462,11 @@ protected Date getLockTimeoutDate(CommunicationType communicationType) { parameter = ParameterConstants.OFFLINE_PUSH_LOCK_TIMEOUT_MS; break; case FILE_PULL: + case OFF_FSPULL: parameter = ParameterConstants.FILE_PULL_LOCK_TIMEOUT_MS; break; case FILE_PUSH: + case OFF_FSPUSH: parameter = ParameterConstants.FILE_PUSH_LOCK_TIMEOUT_MS; break; case FILE_XTRCT: diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java index f72747333c..067c002ebf 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/transport/file/FileOutgoingTransport.java @@ -28,10 +28,14 @@ import java.io.IOException; import java.io.OutputStream; import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.io.IOUtils; import org.jumpmind.symmetric.model.ChannelMap; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.OutgoingBatches; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.transport.BatchBufferedWriter; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; @@ -51,6 +55,10 @@ public class FileOutgoingTransport implements IOutgoingWithResponseTransport { String outgoingDir; + private List processedBatches; + + private String extension; + public FileOutgoingTransport(Node remoteNode, Node localNode, String outgoingDir) throws IOException { this.outgoingDir = outgoingDir; this.fileName = outgoingDir + File.separator + localNode.getNodeGroupId() + "-" + localNode.getNodeId() + "_to_" + @@ -84,8 +92,17 @@ public OutputStream openStream() { @Override public BufferedReader readResponse() throws IOException { + List batchIds = new ArrayList(); + if (processedBatches != null) { + for (OutgoingBatch processedBatch : processedBatches) { + batchIds.add(processedBatch.getBatchId()); + } + } else if (writer != null) { + batchIds = writer.getBatchIds(); + } + StringBuilder resp = new StringBuilder(); - for (Long batchId : writer.getBatchIds()) { + for (Long batchId : batchIds) { resp.append(WebConstants.ACK_BATCH_NAME).append(batchId).append("=").append(WebConstants.ACK_BATCH_OK).append("&"); resp.append(WebConstants.ACK_NODE_ID).append(batchId).append("=").append(remoteNode.getNodeId()).append("&"); } @@ -118,4 +135,12 @@ public void complete(boolean success) { new File(fileName + ".tmp").renameTo(new File(fileName + ".zip")); } } -} + + public List getProcessedBatches() { + return processedBatches; + } + + public void setProcessedBatches(List processedBatches) { + this.processedBatches = processedBatches; + } +} \ No newline at end of file