Skip to content

Commit

Permalink
0003157: Allow bulk loaders to fall back to default loader when an error
Browse files Browse the repository at this point in the history
occurs
  • Loading branch information
jumpmind-josh committed Jun 13, 2017
1 parent 561f299 commit c7d814e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
Expand All @@ -60,6 +61,8 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter {

protected boolean needsBinaryConversion;

protected boolean useDefaultDataWriter;

public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform, settings);
Expand All @@ -68,6 +71,11 @@ public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSett
}

public void write(CsvData data) {
if (useDefaultDataWriter) {
super.write(data);
return;
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
Expand Down Expand Up @@ -182,6 +190,13 @@ protected void endCopy() {
}
}

@Override
public void start(Batch batch) {
super.start(batch);
IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch");
useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag();
}

@Override
public boolean start(Table table) {
return super.start(table);
Expand Down
Expand Up @@ -964,7 +964,6 @@ public Table nextTable() {
return table;
}
};

DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
Expand Down Expand Up @@ -1043,6 +1042,8 @@ public void beforeBatchEnd(DataContext context) {
public boolean beforeBatchStarted(DataContext context) {
this.currentBatch = null;
Batch batch = context.getBatch();
context.remove("currentBatch");

if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED)
|| (batch.getChannelId() != null && batch.getChannelId().equals(
Constants.CHANNEL_CONFIG))) {
Expand All @@ -1059,6 +1060,8 @@ public boolean beforeBatchStarted(DataContext context) {
this.batchesProcessed.add(incomingBatch);
if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
this.currentBatch = incomingBatch;
context.put("currentBatch", this.currentBatch);

return true;
}
}
Expand Down
Expand Up @@ -214,6 +214,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
|| !parameterService
.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
batch.setErrorFlag(existingBatch.isErrorFlag());
existingBatch.setStatus(Status.LD);
log.info("Retrying batch {}", batch.getNodeBatchId());
} else if (existingBatch.getStatus() == Status.IG) {
Expand Down

0 comments on commit c7d814e

Please sign in to comment.