diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java index 641b1ee07a..4afd216919 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java @@ -17,6 +17,7 @@ public class Batch { protected long deleteCount; protected long updateCount; protected long sqlCount; + protected long sqlRowsAffectedCount; protected long otherCount; protected long fallbackInsertCount; protected long fallbackUpdateCount; @@ -68,6 +69,10 @@ public long incrementSqlCount() { return ++sqlCount; } + public long incrementSqlRowsAffected(int count) { + return sqlRowsAffectedCount+=count; + } + public long incrementOtherCount() { return ++otherCount; } @@ -195,4 +200,8 @@ public boolean isInitialLoad() { public long getFallbackUpdateWithNewKeysCount() { return fallbackUpdateWithNewKeysCount; } + + public long getSqlRowsAffectedCount() { + return sqlRowsAffectedCount; + } } diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java index 2f0e074e07..d99d41b2e4 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java @@ -327,7 +327,7 @@ protected int executeDeleteSql(Data data, boolean batchMode) { } protected void processInsert(Data data, boolean batchMode) { - if (filterData(data, ctx)) { + if (filterData(data, ctx)) { try { batch.startTimer(); // TODO add save point logic for postgresql @@ -350,7 +350,7 @@ protected void processInsert(Data data, boolean batchMode) { } protected void processUpdate(Data data) { - if (filterData(data, ctx)) { + if (filterData(data, ctx)) { try { batch.startTimer(); int updateCount = executeUpdateSql(data); @@ -373,7 +373,7 @@ protected void processUpdate(Data data) { if (settings.enableFallbackForUpdate) { // remove the old pk values so that the new ones will be // used - data.clearPkData(); + data.clearPkData(); int updateCount = executeUpdateSql(data); if (updateCount == 0) { throw new SqlException("There were no rows to update using"); @@ -391,7 +391,6 @@ protected void processUpdate(Data data) { protected void processDelete(Data data, boolean batchMode) { if (filterData(data, ctx)) { - batch.incrementDeleteCount(); batch.startTimer(); try { int updateCount = executeDeleteSql(data, batchMode); @@ -400,6 +399,8 @@ protected void processDelete(Data data, boolean batchMode) { if (!settings.allowMissingDeletes) { throw new SqlException("No rows were deleted"); } + } else { + batch.incrementDeleteCount(); } } finally { batch.incrementDatabaseMillis(batch.endTimer()); @@ -408,7 +409,15 @@ protected void processDelete(Data data, boolean batchMode) { } protected void processSql(Data data) { - // TODO + if (filterData(data, ctx)) { + transaction.setInBatchMode(false); + String[] tokens = data.toParsedRowData(); + if (tokens != null && tokens.length > 0) { + transaction.prepare(tokens[0], -1); + batch.incrementSqlRowsAffected(transaction.update(data)); + batch.incrementSqlCount(); + } + } } protected boolean isCorrectForIntegrityViolation(Data data) { diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/sql/ISqlTransaction.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/sql/ISqlTransaction.java index 7a6216ea74..d5f717d34b 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/sql/ISqlTransaction.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/sql/ISqlTransaction.java @@ -19,6 +19,8 @@ public interface ISqlTransaction { */ public void prepare(String sql, int flushSize); + public int update(T marker); + public int update(T marker, Object[] values, int[] types); public int flush(); diff --git a/future/symmetric3-jdbc/src/main/java/org/jumpmind/symmetric/jdbc/sql/JdbcSqlTransaction.java b/future/symmetric3-jdbc/src/main/java/org/jumpmind/symmetric/jdbc/sql/JdbcSqlTransaction.java index 57518d78e5..4b3be5b3de 100644 --- a/future/symmetric3-jdbc/src/main/java/org/jumpmind/symmetric/jdbc/sql/JdbcSqlTransaction.java +++ b/future/symmetric3-jdbc/src/main/java/org/jumpmind/symmetric/jdbc/sql/JdbcSqlTransaction.java @@ -136,12 +136,18 @@ public void prepare(String sql, int flushSize) { throw sqlConnection.translate(ex); } } + + public int update(Object marker) { + return update(marker, null, null); + } - public int update(Object marker, Object[] values, int[] types) { + public int update(Object marker, Object[] args, int[] argTypes) { int rowsUpdated = 0; try { - StatementCreatorUtil.setValues(pstmt, values, types, sqlConnection.getJdbcDbPlatform() + if (args != null) { + StatementCreatorUtil.setValues(pstmt, args, argTypes, sqlConnection.getJdbcDbPlatform() .getLobHandler()); + } if (inBatchMode) { if (marker == null) { marker = new Integer(markers.size() + 1); diff --git a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java index f10c79e56b..4f707af26c 100644 --- a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java +++ b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java @@ -94,6 +94,19 @@ public void testOneRowUpdateFallbackUpdateWithNewKeys() { Assert.assertEquals(1, count(testTable.getTableName(), String.format("TEST_TEXT='updated'"))); } + + @Test + public void testSqlData() { + insertTestTableRows(10); + Assert.assertEquals(10, count(testTable.getTableName())); + Batch batch = writeToTestTable(new Data(testTable.getTableName(), DataEventType.SQL, + String.format("\"update %s set TEST_TEXT='it worked!'\"", testTable.getTableName()))); + Assert.assertEquals(10, count(testTable.getTableName())); + Assert.assertEquals(10, count(testTable.getTableName(), "TEST_TEXT='it worked!'")); + Assert.assertEquals(1, batch.getSqlCount()); + Assert.assertEquals(10, batch.getSqlRowsAffectedCount()); + + } protected Batch writeToTestTable(Data... datas) { SqlDataWriter writer = new SqlDataWriter(getPlatform(), new Parameters()); diff --git a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlTableDataReaderTest.java b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlTableDataReaderTest.java index 8fe8b8e6d5..29d2a4e67e 100644 --- a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlTableDataReaderTest.java +++ b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlTableDataReaderTest.java @@ -7,13 +7,21 @@ import org.jumpmind.symmetric.core.model.Data; import org.jumpmind.symmetric.core.model.Table; import org.jumpmind.symmetric.core.process.DataContext; +import org.junit.Before; import org.junit.Test; public class SqlTableDataReaderTest extends AbstractDatabaseTest { + Table testTable; + + @Before + public void cleanupTestTable() { + testTable = buildTestTable(); + delete(testTable.getTableName()); + } + @Test public void testSimpleTableWithNoCondition() throws Exception { - Table testTable = buildTestTable(); insertTestTableRows(100); TableToExtract tableToExtract = new TableToExtract(testTable, ""); SqlTableDataReader reader = new SqlTableDataReader(getPlatform(true), new Batch(),