Skip to content

Commit

Permalink
0003157: Allow bulk loaders to fall back to default loader when an error
Browse files Browse the repository at this point in the history
occurs
  • Loading branch information
klementinastojanovska committed Jun 16, 2017
1 parent bec9c35 commit 483d599
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 33 deletions.
Expand Up @@ -39,12 +39,11 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {
public class MsSqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected NativeJdbcExtractor jdbcExtractor;
protected int maxRowsBeforeFlush;
Expand Down Expand Up @@ -122,7 +121,8 @@ public void end(Table table) {
}
}

public void write(CsvData data) {
public void bulkWrite(CsvData data) {

DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
Expand Down
Expand Up @@ -43,12 +43,11 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MySqlBulkDatabaseWriter extends DefaultDatabaseWriter {
public class MySqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter {


protected NativeJdbcExtractor jdbcExtractor;
Expand Down Expand Up @@ -109,7 +108,8 @@ public void end(Table table) {
}
}

public void write(CsvData data) {
public void bulkWrite(CsvData data) {
super.write(data);
DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

import oracle.jdbc.OracleTypes;
Expand All @@ -56,7 +55,7 @@
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;

public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter {
public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected String procedurePrefix;

Expand Down Expand Up @@ -86,7 +85,9 @@ public boolean start(Table table) {
}
}

public void write(CsvData data) {
public void bulkWrite(CsvData data) {
super.write(data);

DataEventType dataEventType = data.getDataEventType();

if (lastEventType != null && !lastEventType.equals(dataEventType)) {
Expand Down
Expand Up @@ -40,14 +40,12 @@
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.postgresql.copy.CopyIn;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter {
public class PostgresBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected NativeJdbcExtractor jdbcExtractor;

Expand All @@ -60,24 +58,17 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter {
protected int loadedRows = 0;

protected boolean needsBinaryConversion;

protected boolean useDefaultDataWriter;

public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform, settings);
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
}

@Override
protected void bulkWrite(CsvData data) {

public void write(CsvData data) {
if (useDefaultDataWriter) {
super.write(data);
return;
}

statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);

DataEventType dataEventType = data.getDataEventType();
Expand Down Expand Up @@ -117,6 +108,7 @@ public void write(CsvData data) {
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
}
//endCopy();
break;
case UPDATE:
case DELETE:
Expand All @@ -131,6 +123,9 @@ public void write(CsvData data) {
loadedRows = 0;
}
}
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);

statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}

Expand Down Expand Up @@ -182,20 +177,16 @@ protected void endCopy() {
copyIn.endCopy();
}
} catch (Exception ex) {
statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT, 0);
statistics.get(batch).set(DataWriterStatisticConstants.LINENUMBER, 0);

throw getPlatform().getSqlTemplate().translate(ex);
} finally {
copyIn = null;
}
}
}
}

@Override
public void start(Batch batch) {
super.start(batch);
IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch");
useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag();
}

@Override
public boolean start(Table table) {
Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
Expand All @@ -48,7 +47,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

public class RedshiftBulkDatabaseWriter extends DefaultDatabaseWriter {
public class RedshiftBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected IStagingManager stagingManager;
protected IStagedResource stagedInputFile;
Expand Down Expand Up @@ -115,7 +114,9 @@ public void end(Table table) {
}
}

public void write(CsvData data) {
public void bulkWrite(CsvData data) {
super.write(data);

if (filterBefore(data)) {
try {
DataEventType dataEventType = data.getDataEventType();
Expand Down
Expand Up @@ -29,10 +29,12 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.AbstractWriterTest;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -69,6 +71,7 @@ protected void insertAndVerify(String[] values) {
}

protected abstract long writeData(List<CsvData> data);
//protected abstract long writeBulkData(IDataWriter writer, List<CsvData> data);

@Test
public void testInsert() {
Expand Down Expand Up @@ -205,6 +208,47 @@ public void testInsertBlobRandom() {
insertAndVerify(values);
}
}

@Test
public void testDuplicateRow(){
if (shouldTestRun(platform)) {
platform.getSqlTemplate().update("truncate table " + getTestTable());
List<CsvData> data = new ArrayList<CsvData>();

String id=getNextId();
String[] values1 = { id, "stri'ng2", "string not null2", "char2", "char not null2",
"2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663", encode("string") };
data.add(new CsvData(DataEventType.INSERT, values1));

String[] values2 = { id, "stri'ng2", "string not null2", "char2", "char not null2",
"2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663", encode("string") };
data.add(new CsvData(DataEventType.INSERT, values2));

long statementCount = writeBulkData(data);
Assert.assertEquals(2, statementCount);
Assert.assertEquals(1, countRows(getTestTable()));
}
}

protected abstract AbstractDatabaseWriter create();

protected long writeBulkData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);

AbstractDatabaseWriter bulkWriter = create();

try{
writeData(bulkWriter, new TableCsvData(table, data));
Assert.fail("Bulk Writer throws duplicate key exception");
}catch(Exception e){
//do nothing, error expected here
}
IncomingBatch expectedBatch = new IncomingBatch();
expectedBatch.setErrorFlag(true);
bulkWriter.getContext().put("currentBatch", expectedBatch);

return writeData(bulkWriter, bulkWriter.getContext(), new TableCsvData(table, data));
}

@Override
protected void assertTestTableEquals(String testTableId, String[] expectedValues) {
Expand Down
Expand Up @@ -70,6 +70,10 @@ protected boolean shouldTestRun(IDatabasePlatform platform) {
platform instanceof MsSql2008DatabasePlatform);
}

protected AbstractDatabaseWriter create(){
return new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null);
}

protected long writeData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null), new TableCsvData(table, data));
Expand Down
Expand Up @@ -58,6 +58,10 @@ protected boolean shouldTestRun(IDatabasePlatform platform) {
return platform != null && platform instanceof MySqlDatabasePlatform;
}

protected AbstractDatabaseWriter create(){
return new MySqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 10, 1000,true, true);
}

protected long writeData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);
return writeData(new MySqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 10, 1000,
Expand Down
Expand Up @@ -21,7 +21,6 @@
package org.jumpmind.symmetric.io.data.writer;

import java.util.List;

import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
Expand Down Expand Up @@ -60,10 +59,16 @@ protected boolean shouldTestRun(IDatabasePlatform platform) {
return platform != null && platform instanceof PostgreSqlDatabasePlatform;
}

protected AbstractDatabaseWriter create(){
return new PostgresBulkDatabaseWriter(platform, new DatabaseWriterSettings(), new CommonsDbcpNativeJdbcExtractor(), 1000);
}

@Override
protected long writeData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);
return writeData(new PostgresBulkDatabaseWriter(platform, new DatabaseWriterSettings(),
new CommonsDbcpNativeJdbcExtractor(), 1000), new TableCsvData(table, data));
}


}

0 comments on commit 483d599

Please sign in to comment.