Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Work on sql server dialect. Fix unit tests and casting of numeric.
  • Loading branch information
chenson42 committed Apr 22, 2008
1 parent c74c2df commit 091a38c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 40 deletions.
Expand Up @@ -315,7 +315,8 @@ protected void runDdl(String xml) {
}
dbDialect.createTables(xml);
context.getTableTemplate().resetMetaData();
dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
// TODO Eric - why was this done here?
//dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
}

protected String[] parseKeys(String[] tokens, int startIndex) {
Expand Down
28 changes: 11 additions & 17 deletions symmetric/src/main/resources/dialects/mssql.xml
Expand Up @@ -57,7 +57,7 @@
</property>
<property name="numberColumnTemplate">
<value>
<![CDATA[coalesce(cast($(tableAlias).$(columnName) as char), '') +','+]]>
<![CDATA[coalesce(cast($(tableAlias).$(columnName) as varchar), '') +','+]]>
</value>
</property>
<!-- TODO -->
Expand All @@ -78,7 +78,7 @@
begin
declare @TransactionId varchar(1000)
declare @SyncEnabled varbinary(128)
declare @DataRow varchar(8000)
declare @DataRow varchar(max)
if (@@TRANCOUNT > 0) begin
execute sp_getbindtoken @TransactionId output;
end
Expand All @@ -94,11 +94,9 @@
fetch next from DataCursor into @DataRow
while @@FETCH_STATUS = 0 begin
insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, create_time)
values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, current_timestamp)
if (@@ROWCOUNT > 0) begin
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
end
values('$(targetTableName)','I', $(triggerHistoryId), @DataRow, current_timestamp)
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
fetch next from DataCursor into @DataRow
end
close DataCursor
Expand All @@ -115,7 +113,7 @@
begin
declare @TransactionId varchar(1000)
declare @SyncEnabled varbinary(128)
declare @DataRow varchar(8000)
declare @DataRow varchar(max)
declare @OldPk varchar(2000)
if (@@TRANCOUNT > 0) begin
execute sp_getbindtoken @TransactionId output;
Expand All @@ -133,10 +131,8 @@
while @@FETCH_STATUS = 0 begin
insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, row_data, pk_data, create_time)
values('$(targetTableName)','U', $(triggerHistoryId), @DataRow, @OldPk, current_timestamp)
if (@@ROWCOUNT > 0) begin
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
end
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
fetch next from DataCursor into @DataRow, @OldPk
end
close DataCursor
Expand Down Expand Up @@ -165,11 +161,9 @@
fetch next from DataCursor into @OldPk
while @@FETCH_STATUS = 0 begin
insert into $(defaultSchema)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, create_time)
values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, current_timestamp)
if (@@ROWCOUNT > 0) begin
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
end
values('$(targetTableName)','D', $(triggerHistoryId), @OldPk, current_timestamp)
insert into $(defaultSchema)$(prefixName)_data_event (node_id, data_id, channel_id, transaction_id) (select node_id, @@IDENTITY, '$(channelName)', $(txIdExpression) from $(defaultSchema)$(prefixName)_node c where
c.node_group_id='$(targetGroupId)' and c.sync_enabled=1 $(nodeSelectWhere))
fetch next from DataCursor into @OldPk
end
close DataCursor
Expand Down
Expand Up @@ -59,9 +59,9 @@ public void testSimple(String dmlType, String[] values, String[] expectedValues)
ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
writer.write(dmlType);
writer.writeRecord(values, true);
writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.TestConstants;
import org.jumpmind.symmetric.common.csv.CsvConstants;
import org.jumpmind.symmetric.db.mssql.MsSqlDbDialect;
import org.jumpmind.symmetric.db.postgresql.PostgreSqlDbDialect;
import org.jumpmind.symmetric.load.AbstractDataLoaderTest;
import org.jumpmind.symmetric.load.csv.CsvLoader;
Expand All @@ -57,14 +58,15 @@ public class DataLoaderServiceTest extends AbstractDataLoaderTest {

protected Node client;

protected void turnOffLoggingForTest() {
Logger.getLogger(DataLoaderService.class).setLevel(Level.OFF);
Logger.getLogger(CsvLoader.class).setLevel(Level.OFF);
protected Level setLoggingLevelForTest(Level level) {
Level old = Logger.getLogger(DataLoaderService.class).getLevel();
Logger.getLogger(DataLoaderService.class).setLevel(level);
Logger.getLogger(CsvLoader.class).setLevel(level);
return old;
}

@BeforeTest(groups = "continuous")
protected void setUp() {
turnOffLoggingForTest();
protected void setUp() {
dataLoaderService = (IDataLoaderService) getBeanFactory().getBean(Constants.DATALOADER_SERVICE);
incomingBatchService = (IIncomingBatchService) getBeanFactory().getBean(Constants.INCOMING_BATCH_SERVICE);
transportManager = new MockTransportManager();
Expand All @@ -74,7 +76,7 @@ protected void setUp() {
}

@Test(groups = "continuous")
public void testStatistics() throws Exception {
public void testStatistics() throws Exception {
String[] updateValues = new String[11];
updateValues[0] = updateValues[10] = getNextId();
updateValues[2] = updateValues[4] = "required string";
Expand Down Expand Up @@ -136,6 +138,7 @@ public void testStatistics() throws Exception {

@Test(groups = "continuous")
public void testUpdateCollision() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] updateValues = new String[11];
// pick an id we won't hit
updateValues[10] = "699996";
Expand Down Expand Up @@ -177,21 +180,22 @@ public void testUpdateCollision() throws Exception {

history = list.get(1);
Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");

setLoggingLevelForTest(old);
}

@Test(groups = "continuous")
public void testSqlStatistics() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] insertValues = new String[10];
insertValues[2] = insertValues[4] = "sql stat test";

ByteArrayOutputStream out = new ByteArrayOutputStream();
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID, TestConstants.TEST_CLIENT_EXTERNAL_ID });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
String nextBatchId = getNextBatchId();
String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });

writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

// Clean insert
String firstId = getNextId();
insertValues[0] = firstId;
Expand All @@ -203,12 +207,12 @@ public void testSqlStatistics() throws Exception {
insertValues[0] = secondId;
writer.write(CsvConstants.INSERT);
writer.writeRecord(insertValues, true);

// Statement will cause SQLException because of primary key violation
String[] updateValues = (String[]) ArrayUtils.add(insertValues, secondId);
updateValues[0] = firstId;
writer.write(CsvConstants.UPDATE);
writer.writeRecord(updateValues, true);
String thirdId = getNextId();
insertValues[0] = thirdId;
insertValues[2] = "This is a very long string that will fail upon insert into the database.";
writer.write(CsvConstants.INSERT);
writer.writeRecord(insertValues, true);

writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });
writer.close();
Expand All @@ -222,16 +226,17 @@ public void testSqlStatistics() throws Exception {
Assert.assertNotNull(history.getStartTime(), "Start time cannot be null");
Assert.assertNotNull(history.getEndTime(), "End time cannot be null");
Assert.assertEquals(history.getFailedRowNumber(), 3, "Wrong failed row number");
Assert.assertEquals(history.getByteCount(), 135, "Wrong byte count");
Assert.assertEquals(history.getByteCount(), 365, "Wrong byte count");
Assert.assertEquals(history.getStatementCount(), 3, "Wrong statement count");
Assert.assertEquals(history.getFallbackInsertCount(), 0, "Wrong fallback insert count");
Assert.assertEquals(history.getFallbackUpdateCount(), 0, "Wrong fallback update count");
Assert.assertEquals(history.getFallbackUpdateCount(), 1, "Wrong fallback update count");
Assert.assertEquals(history.getMissingDeleteCount(), 0, "Wrong missing delete count");
Assert.assertNotNull(history.getSqlState(), "Sql state should not be null");
if (! (getDbDialect() instanceof PostgreSqlDbDialect)) {
if (! (getDbDialect() instanceof PostgreSqlDbDialect || getDbDialect() instanceof MsSqlDbDialect)) {
Assert.assertTrue(history.getSqlCode() != 0, "Sql code should not be zero");
}
Assert.assertNotNull(history.getSqlMessage(), "Sql message should not be null");
Assert.assertNotNull(history.getSqlMessage(), "Sql message should not be null");
setLoggingLevelForTest(old);
}

@Test(groups = "continuous")
Expand Down Expand Up @@ -266,6 +271,7 @@ public void testSkippingResentBatch() throws Exception {

@Test(groups = "continuous")
public void testErrorWhileSkip() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] values = { getNextId(), "string2", "string not null2", "char2", "char not null2",
"2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89" };

Expand All @@ -288,7 +294,7 @@ public void testErrorWhileSkip() throws Exception {
writer.write("UnknownTokenWithinBatch");
writer.writeRecord(new String[] { CsvConstants.COMMIT, getBatchId() });
writer.close();
// Pause a moment to guarentee our history comes back in time order
// Pause a moment to guarantee our history comes back in time order
Thread.sleep(10);
load(out);
Assert.assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
Expand All @@ -299,10 +305,12 @@ public void testErrorWhileSkip() throws Exception {
Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.ER, "Wrong status");
Assert.assertEquals(history.getFailedRowNumber(), 0, "Wrong failed row number");
Assert.assertEquals(history.getStatementCount(), 0, "Wrong statement count");
setLoggingLevelForTest(old);
}

@Test(groups = "continuous")
public void testErrorWhileParsing() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] values = { getNextId(), "should not reach database", "string not null", "char", "char not null",
"2007-01-02", "2007-02-03 04:05:06.0", "0", "47", "67.89" };

Expand All @@ -324,10 +332,12 @@ public void testErrorWhileParsing() throws Exception {
List<IncomingBatchHistory> list = incomingBatchService.findIncomingBatchHistory(batchId + "",
TestConstants.TEST_CLIENT_EXTERNAL_ID);
Assert.assertEquals(list.size(), 0, "Wrong number of history");
setLoggingLevelForTest(old);
}

@Test(groups = "continuous")
public void testErrorThenSuccessBatch() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] values = { getNextId(), "This string is too large and will cause the statement to fail",
"string not null2", "char2", "char not null2", "2007-01-02 00:00:00.0",
"2007-02-03 04:05:06.0", "0", "47", "67.89" };
Expand Down Expand Up @@ -361,10 +371,12 @@ public void testErrorThenSuccessBatch() throws Exception {
Assert.assertEquals(history.getStatus(), IncomingBatchHistory.Status.OK, "Wrong status");
Assert.assertEquals(history.getFailedRowNumber(), 0, "Wrong failed row number");
Assert.assertEquals(history.getStatementCount(), 1, "Wrong statement count");
setLoggingLevelForTest(old);
}

@Test(groups = "continuous")
public void testMultipleBatch() throws Exception {
Level old = setLoggingLevelForTest(Level.OFF);
String[] values = { getNextId(), "string", "string not null2", "char2", "char not null2",
"2007-01-02 00:00:00.0", "2007-02-03 04:05:06.0", "0", "47", "67.89" };
String[] values2 = { getNextId(), "This string is too large and will cause the statement to fail",
Expand Down Expand Up @@ -397,6 +409,7 @@ public void testMultipleBatch() throws Exception {
TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.OK, "Wrong status");
Assert.assertEquals(findIncomingBatchStatus(Integer.parseInt(nextBatchId2),
TestConstants.TEST_CLIENT_EXTERNAL_ID), IncomingBatch.Status.ER, "Wrong status");
setLoggingLevelForTest(old);
}

protected void load(ByteArrayOutputStream out) throws Exception {
Expand Down

0 comments on commit 091a38c

Please sign in to comment.