Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003667: Bulk loads should not log errors while falling back to default
loader
  • Loading branch information
jumpmind-josh committed Aug 9, 2018
1 parent 35406c8 commit 7012301
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 148 deletions.
@@ -1,16 +1,13 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter;
import org.jumpmind.symmetric.model.IncomingBatch;

public abstract class AbstractBulkDatabaseWriter extends DynamicDefaultDatabaseWriter{

protected boolean useDefaultDataWriter;

public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String tablePrefix){
super(symmetricPlatform, targetPlatform, tablePrefix);
}
Expand All @@ -21,9 +18,10 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase
}

public final void write(CsvData data) {
if (useDefaultDataWriter) {
if (context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE).equals("default")) {
writeDefault(data);
}else{
context.put(IDataWriter.CONTEXT_BULK_WRITER_TO_USE, "bulk");
bulkWrite(data);
}
}
Expand All @@ -33,11 +31,5 @@ protected final void writeDefault(CsvData data) {
}

protected abstract void bulkWrite(CsvData data);

@Override
public void start(Batch batch) {
super.start(batch);
IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch");
useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag();
}

}
Expand Up @@ -38,6 +38,7 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.postgresql.copy.CopyIn;
Expand Down Expand Up @@ -84,7 +85,6 @@ protected void bulkWrite(CsvData data) {
}
}
}
useDefaultDataWriter=false;
switch (dataEventType) {
case INSERT:
startCopy();
Expand Down Expand Up @@ -116,7 +116,7 @@ protected void bulkWrite(CsvData data) {
case DELETE:
default:
endCopy();
useDefaultDataWriter=true;
context.put(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT, "default");
super.write(data);
break;
}
Expand Down
Expand Up @@ -985,12 +985,34 @@ public void start(DataContext ctx, Batch batch) {
batchStartsToArriveTimeInMs = System.currentTimeMillis();
}

protected ProtocolDataReader buildDataReader(final Batch batchInStaging, final IStagedResource resource) {
return new ProtocolDataReader(BatchType.LOAD, batchInStaging.getTargetNodeId(), resource) {
@Override
public Table nextTable() {
Table table = super.nextTable();
if (table != null && listener.currentBatch != null) {
listener.currentBatch.incrementTableCount(table.getNameLowerCase());
}
return table;
}

public Batch nextBatch() {
Batch nextBatch = super.nextBatch();
if (nextBatch != null) {
nextBatch.setStatistics(batchInStaging.getStatistics());
}
return nextBatch;
}
};
}

public void end(final DataContext ctx, final Batch batchInStaging, final IStagedResource resource) {
final long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs;

Callable<IncomingBatch> loadBatchFromStage = new Callable<IncomingBatch>() {
public IncomingBatch call() throws Exception {
IncomingBatch incomingBatch = null;
DataProcessor processor = null;
if (!isError && resource != null && resource.exists()) {
try {
loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(transferInfo.getSourceNodeId(),
Expand All @@ -1001,25 +1023,9 @@ public IncomingBatch call() throws Exception {

loadInfo.setStatus(ProcessInfo.ProcessStatus.LOADING);

ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, batchInStaging.getTargetNodeId(), resource) {
@Override
public Table nextTable() {
Table table = super.nextTable();
if (table != null && listener.currentBatch != null) {
listener.currentBatch.incrementTableCount(table.getNameLowerCase());
}
return table;
}

public Batch nextBatch() {
Batch nextBatch = super.nextBatch();
if (nextBatch != null) {
nextBatch.setStatistics(batchInStaging.getStatistics());
}
return nextBatch;
}
};
DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") {
ProtocolDataReader reader = buildDataReader(batchInStaging, resource);

processor = new DataProcessor(reader, null, listener, "data load from stage") {
@Override
protected IDataWriter chooseDataWriter(Batch batch) {
boolean isRetry = ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry();
Expand All @@ -1032,8 +1038,14 @@ protected IDataWriter chooseDataWriter(Batch batch) {
loadInfo.setStatus(ProcessStatus.OK);
}
} catch (Exception e) {
isError = true;
throw e;
if (ctx.get(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT) != null && ctx.get(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT).equals("bulk")) {
ctx.put(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT, "default");
processor.setDataReader(buildDataReader(batchInStaging, resource));
processor.process(ctx);
} else {
isError = true;
throw e;
}
} finally {
incomingBatch = listener.currentBatch;
if (incomingBatch != null) {
Expand Down

0 comments on commit 7012301

Please sign in to comment.