Skip to content

Commit

Permalink
0005740: Made file sync make better use of staging
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-miller-jumpmind committed Mar 15, 2023
1 parent ada3d1e commit 1f44f21
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
Expand Up @@ -355,6 +355,7 @@ private ParameterConstants() {
public final static String FILE_SYNC_USE_CRC = "file.sync.use.crc";
public final static String FILE_SYNC_PREVENT_PING_BACK = "file.sync.prevent.ping.back";
public final static String FILE_SYNC_LOCK_WAIT_MS = "file.sync.lock.wait.ms";
public final static String FILE_SYNC_DELETE_ZIP_FILE_AFTER_SYNC = "file.sync.delete.zip.file.after.sync";
public final static String FILE_SYNC_DELETE_CTL_FILE_AFTER_SYNC = "file.sync.delete.ctl.file.after.sync";
public final static String FILE_SYNC_USE_CTL_AS_FILE_EXT = "file.sync.use.ctl.as.file.ext";
public final static String BSH_LOAD_FILTER_HANDLES_MISSING_TABLES = "bsh.load.filter.handles.missing.tables";
Expand Down
Expand Up @@ -129,7 +129,7 @@ protected boolean shouldCleanOutgoingPath(IStagedResource resource, long ttlInMs
@SuppressWarnings("unchecked")
Set<Long> outgoingBatches = (Set<Long>) context.getContextValue("outgoingBatches");
try {
Long batchId = Long.valueOf(path[path.length - 1]);
Long batchId = Long.valueOf(path[path.length - 1].replace("_filesync", ""));
if ((resourceClearsMinTimeHurdle && !outgoingBatches.contains(batchId)) || ttlInMs == 0) {
return true;
}
Expand All @@ -149,7 +149,7 @@ protected boolean shouldCleanIncomingPath(IStagedResource resource, long ttlInMs
Map<String, Long> biggestIncomingByNode = (Map<String, Long>) context.getContextValue("biggestIncomingByNode");
boolean recordIncomingBatchesEnabled = context.getBoolean("recordIncomingBatchesEnabled");
try {
BatchId batchId = new BatchId(Long.valueOf(path[path.length - 1]), path[1]);
BatchId batchId = new BatchId(Long.valueOf(path[path.length - 1].replace("_filesync", "")), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && resourceClearsMinTimeHurdle && biggestBatchId != null
&& biggestBatchId > batchId.getBatchId() && !incomingBatches.contains(batchId))
Expand Down
Expand Up @@ -59,6 +59,7 @@
import org.jumpmind.symmetric.file.FileTriggerFileModifiedListener;
import org.jumpmind.symmetric.file.FileTriggerFileModifiedListener.FileModifiedCallback;
import org.jumpmind.symmetric.file.FileTriggerTracker;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.stage.IStagedResource;
Expand Down Expand Up @@ -574,8 +575,9 @@ synchronized public RemoteNodeStatuses pushFilesToNodes(boolean force) {
@Override
public Object[] getStagingPathComponents(OutgoingBatch fileSyncBatch) {
StringBuilder zipName = new StringBuilder(32);
zipName.append("filesync_").append(fileSyncBatch.getNodeBatchId()).append(".zip");
return new String[] { Constants.STAGING_CATEGORY_OUTGOING, fileSyncBatch.getNodeId(), zipName.toString() };
zipName.append(StringUtils.leftPad(String.valueOf(fileSyncBatch.getBatchId()), 10, "0")).append("_filesync");
return new String[] { Constants.STAGING_CATEGORY_OUTGOING, Batch.getStagedLocation(fileSyncBatch.isCommonFlag(),
fileSyncBatch.getNodeId(), fileSyncBatch.getBatchId()), zipName.toString() };
}

public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
Expand Down Expand Up @@ -609,9 +611,7 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
stagedResource = previouslyStagedResource;
} else {
if (dataWriter == null) {
stagedResource = stagingManager.create(
Constants.STAGING_CATEGORY_OUTGOING, processInfo.getSourceNodeId(),
targetNode.getNodeId(), "filesync.zip");
stagedResource = stagingManager.create(getStagingPathComponents(currentBatch));
dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this,
engine.getNodeService(), stagedResource, engine.getExtensionService(), engine.getConfigurationService());
}
Expand Down Expand Up @@ -691,7 +691,7 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
}
throw e;
} finally {
if (stagedResource != null) {
if (stagedResource != null && parameterService.is(ParameterConstants.FILE_SYNC_DELETE_ZIP_FILE_AFTER_SYNC)) {
stagedResource.delete();
}
}
Expand Down
Expand Up @@ -2493,6 +2493,15 @@ file.push.thread.per.server.count=1
# Type: integer
file.push.lock.timeout.ms=7200000

# If set to true, this will delete the zip file from staging immediately after attempting to sync it to the target.
# If synchronization fails due to a timeout or an error, the zip file will have to be re-created during the next sync
# attempt. If set to false, this will leave the zip file in staging until it is purged by the Stage Management job.
#
# DatabaseOverridable: true
# Tags: filesync
# Type: boolean
file.sync.delete.zip.file.after.sync=false

# If the ctl file is used to control file triggers this will allow the system to remove
# the ctl file after sync but leave the source file.
#
Expand Down

0 comments on commit 1f44f21

Please sign in to comment.