diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java index af0aee557a..fdd39f44dc 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/load/csv/CsvLoader.java @@ -315,7 +315,8 @@ protected void runDdl(String xml) { } dbDialect.createTables(xml); context.getTableTemplate().resetMetaData(); - dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable()); + // TODO Eric - why was this done here? + //dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable()); } protected String[] parseKeys(String[] tokens, int startIndex) { diff --git a/symmetric/src/main/resources/dialects/mssql.xml b/symmetric/src/main/resources/dialects/mssql.xml index 667d82e5bc..ae926e4ff5 100644 --- a/symmetric/src/main/resources/dialects/mssql.xml +++ b/symmetric/src/main/resources/dialects/mssql.xml @@ -57,7 +57,7 @@ - + @@ -78,7 +78,7 @@ begin declare @TransactionId varchar(1000) declare @SyncEnabled varbinary(128) - declare @DataRow varchar(8000) + declare @DataRow varchar(max) if (@@TRANCOUNT > 0) begin execute sp_getbindtoken @TransactionId output; end @@ -94,11 +94,9 @@ fetch next from DataCursor into @DataRow while @@FETCH_STATUS = 0 begin insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, create_time) - values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, current_timestamp) - if (@@ROWCOUNT > 0) begin - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) - end + values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, current_timestamp) + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) fetch next from DataCursor into @DataRow end close DataCursor @@ -115,7 +113,7 @@ begin declare @TransactionId varchar(1000) declare @SyncEnabled varbinary(128) - declare @DataRow varchar(8000) + declare @DataRow varchar(max) declare @OldPk varchar(2000) if (@@TRANCOUNT > 0) begin execute sp_getbindtoken @TransactionId output; @@ -133,10 +131,8 @@ while @@FETCH_STATUS = 0 begin insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time) values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, current_timestamp) - if (@@ROWCOUNT > 0) begin - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) - end + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) fetch next from DataCursor into @DataRow, @OldPk end close DataCursor @@ -165,11 +161,9 @@ fetch next from DataCursor into @OldPk while @@FETCH_STATUS = 0 begin insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, create_time) - values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, current_timestamp) - if (@@ROWCOUNT > 0) begin - insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where - c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) - end + values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, current_timestamp) + insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where + c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere)) fetch next from DataCursor into @OldPk end close DataCursor diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/load/AbstractDataLoaderTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/load/AbstractDataLoaderTest.java index dde36ec2dd..47fd5e8d92 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/load/AbstractDataLoaderTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/load/AbstractDataLoaderTest.java @@ -59,9 +59,9 @@ public void testSimple(String dmlType, String[] values, String[] expectedValues) ByteArrayOutputStream out = new ByteArrayOutputStream(); CsvWriter writer = getWriter(out); writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID }); - writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS); String nextBatchId = getNextBatchId(); writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId }); + writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS); writer.write(dmlType); writer.writeRecord(values, true); writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId }); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/DataLoaderServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/DataLoaderServiceTest.java index 11cb9d35ed..2fa57338c1 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/DataLoaderServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/DataLoaderServiceTest.java @@ -31,6 +31,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.TestConstants; import org.jumpmind.symmetric.common.csv.CsvConstants; +import org.jumpmind.symmetric.db.mssql.MsSqlDbDialect; import org.jumpmind.symmetric.db.postgresql.PostgreSqlDbDialect; import org.jumpmind.symmetric.load.AbstractDataLoaderTest; import org.jumpmind.symmetric.load.csv.CsvLoader; @@ -57,14 +58,15 @@ public class DataLoaderServiceTest extends AbstractDataLoaderTest { protected Node client; - protected void turnOffLoggingForTest() { - Logger.getLogger(DataLoaderService.class).setLevel(Level.OFF); - Logger.getLogger(CsvLoader.class).setLevel(Level.OFF); + protected Level setLoggingLevelForTest(Level level) { + Level old = Logger.getLogger(DataLoaderService.class).getLevel(); + Logger.getLogger(DataLoaderService.class).setLevel(level); + Logger.getLogger(CsvLoader.class).setLevel(level); + return old; } @BeforeTest(groups = "continuous") - protected void setUp() { - turnOffLoggingForTest(); + protected void setUp() { dataLoaderService = (IDataLoaderService) getBeanFactory().getBean(Constants.DATALOADER_SERVICE); incomingBatchService = (IIncomingBatchService) getBeanFactory().getBean(Constants.INCOMING_BATCH_SERVICE); transportManager = new MockTransportManager(); @@ -74,7 +76,7 @@ protected void setUp() { } @Test(groups = "continuous") - public void testStatistics() throws Exception { + public void testStatistics() throws Exception { String[] updateValues = new String[11]; updateValues[0] = updateValues[10] = getNextId(); updateValues[2] = updateValues[4] = "required string"; @@ -136,6 +138,7 @@ public void testStatistics() throws Exception { @Test(groups = "continuous") public void testUpdateCollision() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] updateValues = new String[11]; // pick an id we won't hit updateValues[10] = "699996"; @@ -177,21 +180,22 @@ public void testUpdateCollision() throws Exception { history = list.get(1); Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status"); - + setLoggingLevelForTest(old); } @Test(groups = "continuous") public void testSqlStatistics() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] insertValues = new String[10]; insertValues[2] = insertValues[4] = "sql stat test"; ByteArrayOutputStream out = new ByteArrayOutputStream(); CsvWriter writer = getWriter(out); writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID }); - writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS); - String nextBatchId = getNextBatchId(); + String nextBatchId = getNextBatchId(); writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId }); - + writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS); + // Clean insert String firstId = getNextId(); insertValues[0] = firstId; @@ -203,12 +207,12 @@ public void testSqlStatistics() throws Exception { insertValues[0] = secondId; writer.write(CsvConstants.INSERT); writer.writeRecord(insertValues, true); - - // Statement will cause SQLException because of primary key violation - String[] updateValues = (String[]) ArrayUtils.add(insertValues, secondId); - updateValues[0] = firstId; - writer.write(CsvConstants.UPDATE); - writer.writeRecord(updateValues, true); + + String thirdId = getNextId(); + insertValues[0] = thirdId; + insertValues[2] = "This is a very long string that will fail upon insert into the database."; + writer.write(CsvConstants.INSERT); + writer.writeRecord(insertValues, true); writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId }); writer.close(); @@ -222,16 +226,17 @@ public void testSqlStatistics() throws Exception { Assert.assertNotNull(history.getStartTime(), "Start time cannot be null"); Assert.assertNotNull(history.getEndTime(), "End time cannot be null"); Assert.assertEquals(history.getFailedRowNumber(), 3, "Wrong failed row number"); - Assert.assertEquals(history.getByteCount(), 135, "Wrong byte count"); + Assert.assertEquals(history.getByteCount(), 365, "Wrong byte count"); Assert.assertEquals(history.getStatementCount(), 3, "Wrong statement count"); Assert.assertEquals(history.getFallbackInsertCount(), 0, "Wrong fallback insert count"); - Assert.assertEquals(history.getFallbackUpdateCount(), 0, "Wrong fallback update count"); + Assert.assertEquals(history.getFallbackUpdateCount(), 1, "Wrong fallback update count"); Assert.assertEquals(history.getMissingDeleteCount(), 0, "Wrong missing delete count"); Assert.assertNotNull(history.getSqlState(), "Sql state should not be null"); - if (! (getDbDialect() instanceof PostgreSqlDbDialect)) { + if (! (getDbDialect() instanceof PostgreSqlDbDialect || getDbDialect() instanceof MsSqlDbDialect)) { Assert.assertTrue(history.getSqlCode() != 0, "Sql code should not be zero"); } - Assert.assertNotNull(history.getSqlMessage(), "Sql message should not be null"); + Assert.assertNotNull(history.getSqlMessage(), "Sql message should not be null"); + setLoggingLevelForTest(old); } @Test(groups = "continuous") @@ -266,6 +271,7 @@ public void testSkippingResentBatch() throws Exception { @Test(groups = "continuous") public void testErrorWhileSkip() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] values = { getNextId(), "string2", "string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89" }; @@ -288,7 +294,7 @@ public void testErrorWhileSkip() throws Exception { writer.write("UnknownTokenWithinBatch"); writer.writeRecord(new String[] { CsvConstants.COMMIT, getBatchId() }); writer.close(); - // Pause a moment to guarentee our history comes back in time order + // Pause a moment to guarantee our history comes back in time order Thread.sleep(10); load(out); Assert.assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID), @@ -299,10 +305,12 @@ public void testErrorWhileSkip() throws Exception { Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status"); Assert.assertEquals(history.getFailedRowNumber(), 0, "Wrong failed row number"); Assert.assertEquals(history.getStatementCount(), 0, "Wrong statement count"); + setLoggingLevelForTest(old); } @Test(groups = "continuous") public void testErrorWhileParsing() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] values = { getNextId(), "should not reach database", "string not null", "char", "char not null", "2007-01-02", "2007-02-03 04:05:06.0", "0", "47", "67.89" }; @@ -324,10 +332,12 @@ public void testErrorWhileParsing() throws Exception { List list = incomingBatchService.findIncomingBatchHistory(batchId + "", TestConstants.TEST_CLIENT_EXTERNAL_ID); Assert.assertEquals(list.size(), 0, "Wrong number of history"); + setLoggingLevelForTest(old); } @Test(groups = "continuous") public void testErrorThenSuccessBatch() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] values = { getNextId(), "This string is too large and will cause the statement to fail", "string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89" }; @@ -361,10 +371,12 @@ public void testErrorThenSuccessBatch() throws Exception { Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.OK, "Wrong status"); Assert.assertEquals(history.getFailedRowNumber(), 0, "Wrong failed row number"); Assert.assertEquals(history.getStatementCount(), 1, "Wrong statement count"); + setLoggingLevelForTest(old); } @Test(groups = "continuous") public void testMultipleBatch() throws Exception { + Level old = setLoggingLevelForTest(Level.OFF); String[] values = { getNextId(), "string", "string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89" }; String[] values2 = { getNextId(), "This string is too large and will cause the statement to fail", @@ -397,6 +409,7 @@ public void testMultipleBatch() throws Exception { TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.OK, "Wrong status"); Assert.assertEquals(findIncomingBatchStatus(Integer.parseInt(nextBatchId2), TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.ER, "Wrong status"); + setLoggingLevelForTest(old); } protected void load(ByteArrayOutputStream out) throws Exception {