From 483d599c5481ff5771af37268e32bddbee0fe4be Mon Sep 17 00:00:00 2001 From: klementinastojanovska Date: Fri, 16 Jun 2017 10:43:58 -0400 Subject: [PATCH] 0003157: Allow bulk loaders to fall back to default loader when an error occurs --- .../symmetric/io/MsSqlBulkDatabaseWriter.java | 6 +-- .../symmetric/io/MySqlBulkDatabaseWriter.java | 6 +-- .../io/OracleBulkDatabaseWriter.java | 7 +-- .../io/PostgresBulkDatabaseWriter.java | 31 +++++-------- .../io/RedshiftBulkDatabaseWriter.java | 7 +-- .../AbstractBulkDatabaseWriterTest.java | 44 +++++++++++++++++++ .../writer/MsSqlBulkDatabaseWriterTest.java | 4 ++ .../writer/MySqlBulkDatabaseWriterTest.java | 4 ++ .../PostgresBulkDatabaseWriterTest.java | 7 ++- 9 files changed, 83 insertions(+), 33 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java index b9b578d451..d93ee8154e 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java @@ -39,12 +39,11 @@ import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; -import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; -public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter { +public class MsSqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter { protected NativeJdbcExtractor jdbcExtractor; protected int maxRowsBeforeFlush; @@ -122,7 +121,8 @@ public void end(Table table) { } } - public void write(CsvData data) { + public void bulkWrite(CsvData data) { + DataEventType dataEventType = data.getDataEventType(); switch (dataEventType) { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java index 6658c4ee6d..5f57951ad9 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java @@ -43,12 +43,11 @@ import org.jumpmind.symmetric.io.data.CsvUtils; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; -import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; -public class MySqlBulkDatabaseWriter extends DefaultDatabaseWriter { +public class MySqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter { protected NativeJdbcExtractor jdbcExtractor; @@ -109,7 +108,8 @@ public void end(Table table) { } } - public void write(CsvData data) { + public void bulkWrite(CsvData data) { + super.write(data); DataEventType dataEventType = data.getDataEventType(); switch (dataEventType) { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java index acbbd8b64b..fe057bbe56 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java @@ -45,7 +45,6 @@ import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; -import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; import oracle.jdbc.OracleTypes; @@ -56,7 +55,7 @@ import oracle.sql.TIMESTAMPLTZ; import oracle.sql.TIMESTAMPTZ; -public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter { +public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter { protected String procedurePrefix; @@ -86,7 +85,9 @@ public boolean start(Table table) { } } - public void write(CsvData data) { + public void bulkWrite(CsvData data) { + super.write(data); + DataEventType dataEventType = data.getDataEventType(); if (lastEventType != null && !lastEventType.equals(dataEventType)) { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java index 9320e8314c..240a25a281 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java @@ -40,14 +40,12 @@ import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; -import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; -import org.jumpmind.symmetric.model.IncomingBatch; import org.postgresql.copy.CopyIn; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; -public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter { +public class PostgresBulkDatabaseWriter extends AbstractBulkDatabaseWriter { protected NativeJdbcExtractor jdbcExtractor; @@ -60,8 +58,6 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter { protected int loadedRows = 0; protected boolean needsBinaryConversion; - - protected boolean useDefaultDataWriter; public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings, NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) { @@ -69,15 +65,10 @@ public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSett this.jdbcExtractor = jdbcExtractor; this.maxRowsBeforeFlush = maxRowsBeforeFlush; } + + @Override + protected void bulkWrite(CsvData data) { - public void write(CsvData data) { - if (useDefaultDataWriter) { - super.write(data); - return; - } - - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); - statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); DataEventType dataEventType = data.getDataEventType(); @@ -117,6 +108,7 @@ public void write(CsvData data) { } catch (Exception ex) { throw getPlatform().getSqlTemplate().translate(ex); } + //endCopy(); break; case UPDATE: case DELETE: @@ -131,6 +123,9 @@ public void write(CsvData data) { loadedRows = 0; } } + statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); + statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS); } @@ -182,6 +177,9 @@ protected void endCopy() { copyIn.endCopy(); } } catch (Exception ex) { + statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT, 0); + statistics.get(batch).set(DataWriterStatisticConstants.LINENUMBER, 0); + throw getPlatform().getSqlTemplate().translate(ex); } finally { copyIn = null; @@ -189,13 +187,6 @@ protected void endCopy() { } } } - - @Override - public void start(Batch batch) { - super.start(batch); - IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch"); - useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag(); - } @Override public boolean start(Table table) { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java index bf372c036c..b3f534297b 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java @@ -36,7 +36,6 @@ import org.jumpmind.symmetric.io.data.CsvUtils; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; -import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; import org.jumpmind.symmetric.io.stage.IStagedResource; @@ -48,7 +47,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; -public class RedshiftBulkDatabaseWriter extends DefaultDatabaseWriter { +public class RedshiftBulkDatabaseWriter extends AbstractBulkDatabaseWriter { protected IStagingManager stagingManager; protected IStagedResource stagedInputFile; @@ -115,7 +114,9 @@ public void end(Table table) { } } - public void write(CsvData data) { + public void bulkWrite(CsvData data) { + super.write(data); + if (filterBefore(data)) { try { DataEventType dataEventType = data.getDataEventType(); diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/AbstractBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/AbstractBulkDatabaseWriterTest.java index 1d5219f8f0..d950d29c25 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/AbstractBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/AbstractBulkDatabaseWriterTest.java @@ -29,10 +29,12 @@ import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang.ArrayUtils; +import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.symmetric.io.AbstractWriterTest; import org.jumpmind.symmetric.io.data.CsvData; import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.model.IncomingBatch; import org.junit.Assert; import org.junit.Test; @@ -69,6 +71,7 @@ protected void insertAndVerify(String[] values) { } protected abstract long writeData(List data); + //protected abstract long writeBulkData(IDataWriter writer, List data); @Test public void testInsert() { @@ -205,6 +208,47 @@ public void testInsertBlobRandom() { insertAndVerify(values); } } + + @Test + public void testDuplicateRow(){ + if (shouldTestRun(platform)) { + platform.getSqlTemplate().update("truncate table " + getTestTable()); + List data = new ArrayList(); + + String id=getNextId(); + String[] values1 = { id, "stri'ng2", "string not null2", "char2", "char not null2", + "2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663", encode("string") }; + data.add(new CsvData(DataEventType.INSERT, values1)); + + String[] values2 = { id, "stri'ng2", "string not null2", "char2", "char not null2", + "2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663", encode("string") }; + data.add(new CsvData(DataEventType.INSERT, values2)); + + long statementCount = writeBulkData(data); + Assert.assertEquals(2, statementCount); + Assert.assertEquals(1, countRows(getTestTable())); + } + } + + protected abstract AbstractDatabaseWriter create(); + + protected long writeBulkData(List data) { + Table table = platform.getTableFromCache(getTestTable(), false); + + AbstractDatabaseWriter bulkWriter = create(); + + try{ + writeData(bulkWriter, new TableCsvData(table, data)); + Assert.fail("Bulk Writer throws duplicate key exception"); + }catch(Exception e){ + //do nothing, error expected here + } + IncomingBatch expectedBatch = new IncomingBatch(); + expectedBatch.setErrorFlag(true); + bulkWriter.getContext().put("currentBatch", expectedBatch); + + return writeData(bulkWriter, bulkWriter.getContext(), new TableCsvData(table, data)); + } @Override protected void assertTestTableEquals(String testTableId, String[] expectedValues) { diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java index 7e778185ca..91ef4710f0 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java @@ -70,6 +70,10 @@ protected boolean shouldTestRun(IDatabasePlatform platform) { platform instanceof MsSql2008DatabasePlatform); } + protected AbstractDatabaseWriter create(){ + return new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null); + } + protected long writeData(List data) { Table table = platform.getTableFromCache(getTestTable(), false); return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null), new TableCsvData(table, data)); diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java index db5be75933..99ccc51b71 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MySqlBulkDatabaseWriterTest.java @@ -58,6 +58,10 @@ protected boolean shouldTestRun(IDatabasePlatform platform) { return platform != null && platform instanceof MySqlDatabasePlatform; } + protected AbstractDatabaseWriter create(){ + return new MySqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 10, 1000,true, true); + } + protected long writeData(List data) { Table table = platform.getTableFromCache(getTestTable(), false); return writeData(new MySqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 10, 1000, diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/PostgresBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/PostgresBulkDatabaseWriterTest.java index 03a7588f49..1423cf0c93 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/PostgresBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/PostgresBulkDatabaseWriterTest.java @@ -21,7 +21,6 @@ package org.jumpmind.symmetric.io.data.writer; import java.util.List; - import org.jumpmind.db.DbTestUtils; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; @@ -60,10 +59,16 @@ protected boolean shouldTestRun(IDatabasePlatform platform) { return platform != null && platform instanceof PostgreSqlDatabasePlatform; } + protected AbstractDatabaseWriter create(){ + return new PostgresBulkDatabaseWriter(platform, new DatabaseWriterSettings(), new CommonsDbcpNativeJdbcExtractor(), 1000); + } + + @Override protected long writeData(List data) { Table table = platform.getTableFromCache(getTestTable(), false); return writeData(new PostgresBulkDatabaseWriter(platform, new DatabaseWriterSettings(), new CommonsDbcpNativeJdbcExtractor(), 1000), new TableCsvData(table, data)); } + }