Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 20, 2011
1 parent ed1dcd4 commit 3813505
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 8 deletions.
Expand Up @@ -17,6 +17,7 @@ public class Batch {
protected long deleteCount;
protected long updateCount;
protected long sqlCount;
protected long sqlRowsAffectedCount;
protected long otherCount;
protected long fallbackInsertCount;
protected long fallbackUpdateCount;
Expand Down Expand Up @@ -68,6 +69,10 @@ public long incrementSqlCount() {
return ++sqlCount;
}

public long incrementSqlRowsAffected(int count) {
return sqlRowsAffectedCount+=count;
}

public long incrementOtherCount() {
return ++otherCount;
}
Expand Down Expand Up @@ -195,4 +200,8 @@ public boolean isInitialLoad() {
public long getFallbackUpdateWithNewKeysCount() {
return fallbackUpdateWithNewKeysCount;
}

public long getSqlRowsAffectedCount() {
return sqlRowsAffectedCount;
}
}
Expand Up @@ -327,7 +327,7 @@ protected int executeDeleteSql(Data data, boolean batchMode) {
}

protected void processInsert(Data data, boolean batchMode) {
if (filterData(data, ctx)) {
if (filterData(data, ctx)) {
try {
batch.startTimer();
// TODO add save point logic for postgresql
Expand All @@ -350,7 +350,7 @@ protected void processInsert(Data data, boolean batchMode) {
}

protected void processUpdate(Data data) {
if (filterData(data, ctx)) {
if (filterData(data, ctx)) {
try {
batch.startTimer();
int updateCount = executeUpdateSql(data);
Expand All @@ -373,7 +373,7 @@ protected void processUpdate(Data data) {
if (settings.enableFallbackForUpdate) {
// remove the old pk values so that the new ones will be
// used
data.clearPkData();
data.clearPkData();
int updateCount = executeUpdateSql(data);
if (updateCount == 0) {
throw new SqlException("There were no rows to update using");
Expand All @@ -391,7 +391,6 @@ protected void processUpdate(Data data) {

protected void processDelete(Data data, boolean batchMode) {
if (filterData(data, ctx)) {
batch.incrementDeleteCount();
batch.startTimer();
try {
int updateCount = executeDeleteSql(data, batchMode);
Expand All @@ -400,6 +399,8 @@ protected void processDelete(Data data, boolean batchMode) {
if (!settings.allowMissingDeletes) {
throw new SqlException("No rows were deleted");
}
} else {
batch.incrementDeleteCount();
}
} finally {
batch.incrementDatabaseMillis(batch.endTimer());
Expand All @@ -408,7 +409,15 @@ protected void processDelete(Data data, boolean batchMode) {
}

protected void processSql(Data data) {
// TODO
if (filterData(data, ctx)) {
transaction.setInBatchMode(false);
String[] tokens = data.toParsedRowData();
if (tokens != null && tokens.length > 0) {
transaction.prepare(tokens[0], -1);
batch.incrementSqlRowsAffected(transaction.update(data));
batch.incrementSqlCount();
}
}
}

protected boolean isCorrectForIntegrityViolation(Data data) {
Expand Down
Expand Up @@ -19,6 +19,8 @@ public interface ISqlTransaction {
*/
public void prepare(String sql, int flushSize);

public <T> int update(T marker);

public <T> int update(T marker, Object[] values, int[] types);

public int flush();
Expand Down
Expand Up @@ -136,12 +136,18 @@ public void prepare(String sql, int flushSize) {
throw sqlConnection.translate(ex);
}
}

public int update(Object marker) {
return update(marker, null, null);
}

public int update(Object marker, Object[] values, int[] types) {
public int update(Object marker, Object[] args, int[] argTypes) {
int rowsUpdated = 0;
try {
StatementCreatorUtil.setValues(pstmt, values, types, sqlConnection.getJdbcDbPlatform()
if (args != null) {
StatementCreatorUtil.setValues(pstmt, args, argTypes, sqlConnection.getJdbcDbPlatform()
.getLobHandler());
}
if (inBatchMode) {
if (marker == null) {
marker = new Integer(markers.size() + 1);
Expand Down
Expand Up @@ -94,6 +94,19 @@ public void testOneRowUpdateFallbackUpdateWithNewKeys() {
Assert.assertEquals(1,
count(testTable.getTableName(), String.format("TEST_TEXT='updated'")));
}

@Test
public void testSqlData() {
insertTestTableRows(10);
Assert.assertEquals(10, count(testTable.getTableName()));
Batch batch = writeToTestTable(new Data(testTable.getTableName(), DataEventType.SQL,
String.format("\"update %s set TEST_TEXT='it worked!'\"", testTable.getTableName())));
Assert.assertEquals(10, count(testTable.getTableName()));
Assert.assertEquals(10, count(testTable.getTableName(), "TEST_TEXT='it worked!'"));
Assert.assertEquals(1, batch.getSqlCount());
Assert.assertEquals(10, batch.getSqlRowsAffectedCount());

}

protected Batch writeToTestTable(Data... datas) {
SqlDataWriter writer = new SqlDataWriter(getPlatform(), new Parameters());
Expand Down
Expand Up @@ -7,13 +7,21 @@
import org.jumpmind.symmetric.core.model.Data;
import org.jumpmind.symmetric.core.model.Table;
import org.jumpmind.symmetric.core.process.DataContext;
import org.junit.Before;
import org.junit.Test;

public class SqlTableDataReaderTest extends AbstractDatabaseTest {

Table testTable;

@Before
public void cleanupTestTable() {
testTable = buildTestTable();
delete(testTable.getTableName());
}

@Test
public void testSimpleTableWithNoCondition() throws Exception {
Table testTable = buildTestTable();
insertTestTableRows(100);
TableToExtract tableToExtract = new TableToExtract(testTable, "");
SqlTableDataReader reader = new SqlTableDataReader(getPlatform(true), new Batch(),
Expand Down

0 comments on commit 3813505

Please sign in to comment.