Skip to content

Commit

Permalink
0002685: File sync parameter to remove just the *.ctl file after file
Browse files Browse the repository at this point in the history
has been replicated.
  • Loading branch information
jumpmind-josh committed Jul 19, 2016
1 parent 4ca05b7 commit 0b16cf3
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 1 deletion.
Expand Up @@ -292,6 +292,8 @@ private ParameterConstants() {

public final static String FILE_SYNC_LOCK_WAIT_MS = "file.sync.lock.wait.ms";

public final static String FILE_SYNC_DELETE_CTL_FILE_AFTER_SYNC = "file.sync.delete.ctl.file.after.sync";

public final static String BSH_LOAD_FILTER_HANDLES_MISSING_TABLES = "bsh.load.filter.handles.missing.tables";

public final static String BSH_TRANSFORM_GLOBAL_SCRIPT = "bsh.transform.global.script";
Expand Down
@@ -1,11 +1,15 @@
package org.jumpmind.symmetric.route;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand All @@ -28,11 +32,13 @@ public ISymmetricEngine getEngine() {
public List<String> parse(File file, int lineNumber) {
List<String> rows = new ArrayList<String>();

InputStream fileInputStream = null;
try {
boolean validateHeader = engine.getParameterService()
.is(ParameterConstants.DBF_ROUTER_VALIDATE_HEADER, true);

dbfReader = new DBFReader(Files.newInputStream(file.toPath(), StandardOpenOption.READ), validateHeader);
fileInputStream = Files.newInputStream(file.toPath(), StandardOpenOption.READ);
dbfReader = new DBFReader(fileInputStream, validateHeader);
int currentLine = 1;
while (dbfReader.hasNextRecord()) {
StringBuffer row = new StringBuffer();
Expand All @@ -51,6 +57,16 @@ public List<String> parse(File file, int lineNumber) {
catch (Exception e) {
log.error("Unable to parse DBF file " + file.getName(), e);
}
finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
}
catch (IOException ioe) {
log.error("Unable to close file " + file.getName(), ioe);
}
}
}
return rows;
}

Expand Down
Expand Up @@ -594,6 +594,14 @@ public void acknowledgeFiles(OutgoingBatch outgoingBatch) {
}
}
}
else if (parameterService.is(ParameterConstants.FILE_SYNC_DELETE_CTL_FILE_AFTER_SYNC, false)) {
File file = fileTrigger.createSourceFile(fileSnapshot);
if (!file.isDirectory()) {
if (fileTrigger.isSyncOnCtlFile()) {
filesToDelete.add(new File(file.getAbsolutePath() + ".ctl"));
}
}
}
}
}
data = cursor.next();
Expand Down
Expand Up @@ -1391,6 +1391,14 @@ file.push.thread.per.server.count=1
# Tags: jobs,filesync
file.push.lock.timeout.ms=7200000

# If the ctl file is used to control file triggers this will allow the system to remove
# the ctl file after sync but leave the source file.
#
# DatabaseOverridable: true
# Tags: filesync
# Type: boolean
file.sync.delete.ctl.file.after.sync=false;

# This parameter can be used to indicate that bean shell load filters will handle missing tables. Useful
# for the case where you want to make, for example, global catalog or schema changes at the destination
# in the case where the catalog, schema, or table doesn't exist but the BSH will handle it.
Expand Down

0 comments on commit 0b16cf3

Please sign in to comment.