Skip to content

Commit

Permalink
0006374: File Sync exceptions thrown by bsh script should be retried for
Browse files Browse the repository at this point in the history
transient errors
  • Loading branch information
Philip Marzullo committed Apr 30, 2024
1 parent b74734f commit e104fbf
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ private ParameterConstants() {
public final static String FILE_SYNC_LOCK_WAIT_MS = "file.sync.lock.wait.ms";
public final static String FILE_SYNC_DELETE_CTL_FILE_AFTER_SYNC = "file.sync.delete.ctl.file.after.sync";
public final static String FILE_SYNC_USE_CTL_AS_FILE_EXT = "file.sync.use.ctl.as.file.ext";
public final static String FILE_SYNC_RETRY_COUNT = "file.sync.retry.count";
public final static String FILE_SYNC_RETRY_DELAY_MS = "file.sync.retry.delay.ms";
public final static String BSH_LOAD_FILTER_HANDLES_MISSING_TABLES = "bsh.load.filter.handles.missing.tables";
public final static String BSH_TRANSFORM_GLOBAL_SCRIPT = "bsh.transform.global.script";
public final static String BSH_EXTENSION_GLOBAL_SCRIPT = "bsh.extension.global.script";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.FileSystemException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -984,15 +985,44 @@ protected List<IncomingBatch> processZip(InputStream is, String sourceNodeId,
waitMillis);
if (isLocked) {
log.debug("The {} node got a shared file sync lock", sourceNodeId);
@SuppressWarnings("unchecked")
Map<String, String> filesToEventType = (Map<String, String>) interpreter
.eval(script);
if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_PREVENT_PING_BACK)) {
updateFileIncoming(sourceNodeId, filesToEventType);
int retryFileSyncCount = parameterService.getInt(ParameterConstants.FILE_SYNC_RETRY_COUNT, 2);
long retryFileSyncDelayMs = parameterService.getLong(ParameterConstants.FILE_SYNC_RETRY_DELAY_MS, 5000);
for (int i = 0; i < retryFileSyncCount; i++) {
if (i > 0) {
try {
log.info("Retrying file sync for batch {}", batchId);
Thread.sleep(retryFileSyncDelayMs);
} catch (InterruptedException e) { }
}
try {
@SuppressWarnings("unchecked")
Map<String, String> filesToEventType = (Map<String, String>) interpreter
.eval(script);
if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_PREVENT_PING_BACK)) {
updateFileIncoming(sourceNodeId, filesToEventType);
}
incomingBatch
.setLoadRowCount(filesToEventType != null ? filesToEventType
.size() : 0);
} catch (Throwable e) {
log.error(e.getMessage(),e);
Throwable target = e;
if (e instanceof TargetError) {
Throwable t = ((TargetError) e).getTarget();
if (t != null) {
target = t;
}
}
if (target instanceof FileSystemException) {
if (i + 1 >= retryFileSyncCount) {
throw target;
}
continue;
} else {
throw target;
}
}
}
incomingBatch
.setLoadRowCount(filesToEventType != null ? filesToEventType
.size() : 0);
} else {
throw new RuntimeException(
"Could not obtain file sync shared lock within " + waitMillis
Expand Down

0 comments on commit e104fbf

Please sign in to comment.