Skip to content

Commit

Permalink
working on data loader service test
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Dec 26, 2011
1 parent c12558f commit 622b1e1
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 43 deletions.
Expand Up @@ -81,6 +81,8 @@
import org.jumpmind.symmetric.web.WebConstants;

/**
* Responsible for writing batch data to the database
*
* @see IDataLoaderService
*/
public class DataLoaderService extends AbstractService implements IDataLoaderService {
Expand Down Expand Up @@ -396,7 +398,8 @@ public void beforeBatchEnd(DataContext<CsvDataReader, TransformDatabaseWriter> c
enableSyncTriggers(context);
}

public boolean beforeBatchStarted(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
public boolean beforeBatchStarted(
DataContext<CsvDataReader, TransformDatabaseWriter> context) {
this.currentBatch = null;
Batch batch = context.getBatch();
if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED)
Expand All @@ -408,16 +411,14 @@ public boolean beforeBatchStarted(DataContext<CsvDataReader, TransformDatabaseWr
this.currentBatch = incomingBatch;
return true;
}

}
return false;
}

public void afterBatchStarted(
DataContext<CsvDataReader, TransformDatabaseWriter> context) {

public void afterBatchStarted(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
Batch batch = context.getBatch();
dbDialect.disableSyncTriggers(context.getWriter().getDatabaseWriter()
.getTransaction(), batch.getSourceNodeId());
dbDialect.disableSyncTriggers(context.getWriter().getDatabaseWriter().getTransaction(),
batch.getSourceNodeId());
}

public void batchSuccessful(DataContext<CsvDataReader, TransformDatabaseWriter> context) {
Expand Down Expand Up @@ -449,6 +450,9 @@ protected void enableSyncTriggers(

public void batchInError(DataContext<CsvDataReader, TransformDatabaseWriter> context,
Exception ex) {
Batch batch = context.getBatch();
this.currentBatch.setValues(context.getReader().getStatistics().get(batch), context
.getWriter().getStatistics().get(batch), false);
enableSyncTriggers(context);
statisticManager.incrementDataLoadedErrors(this.currentBatch.getChannelId(), 1);
if (ex instanceof IOException || ex instanceof TransportException) {
Expand Down
Expand Up @@ -165,15 +165,15 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
|| !parameterService
.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
existingBatch.setStatus(Status.LD);
updateIncomingBatch(existingBatch);
existingBatch.setStatus(Status.LD);
log.warn("BatchRetrying", batch.getNodeBatchId());
} else {
okayToProcess = false;
batch.setStatus(existingBatch.getStatus());
batch.setSkipCount(existingBatch.getSkipCount() + 1);
existingBatch.setStatus(existingBatch.getStatus());
existingBatch.setSkipCount(existingBatch.getSkipCount() + 1);
log.warn("BatchSkipping", batch.getNodeBatchId());
}
updateIncomingBatch(existingBatch);
}
}
return okayToProcess;
Expand Down
Expand Up @@ -940,7 +940,7 @@ public void reOrderColumns(Column[] targetOrder, boolean copyPrimaryKeys) {
String name = targetOrder[i].getName();
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(name)) {
orderedColumns.add(i, column);
orderedColumns.add(column);
if (copyPrimaryKeys) {
column.setPrimaryKey(targetOrder[i].isPrimaryKey());
}
Expand Down
Expand Up @@ -39,7 +39,7 @@
public class DmlStatement {

public enum DmlType {
INSERT, UPDATE, DELETE, COUNT
INSERT, UPDATE, DELETE, COUNT, UNKNOWN
};

protected DmlType dmlType;
Expand Down
Expand Up @@ -10,13 +10,18 @@
import java.util.Iterator;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.log.Log;
import org.jumpmind.log.LogFactory;

/**
* TODO Support Oracle's non-standard way of batching
*/
public class JdbcSqlTransaction implements ISqlTransaction {

protected final static Log log = LogFactory.getLog(JdbcSqlTransaction.class);

protected boolean inBatchMode = false;

Expand Down Expand Up @@ -215,12 +220,15 @@ protected void removeMarkersThatWereSuccessful(BatchUpdateException ex) {
}

public void prepare(String sql) {
try {
try {
if (this.markers.size() > 0) {
throw new IllegalStateException(
"Cannot prepare a new batch before the last batch has been flushed.");
}
JdbcSqlTemplate.close(pstmt);
if (log.isDebugEnabled()) {
log.debug("Preparing: %s", sql);
}
pstmt = connection.prepareStatement(sql);
psql = sql;
} catch (SQLException ex) {
Expand All @@ -231,6 +239,9 @@ public void prepare(String sql) {
public int addRow(Object marker, Object[] args, int[] argTypes) {
int rowsUpdated = 0;
try {
if (log.isDebugEnabled()) {
log.debug("Adding %s %s", ArrayUtils.toString(args), inBatchMode ? " in batch mode" : "");
}
if (args != null) {
StatementCreatorUtil.setValues(pstmt, args, argTypes,
jdbcSqlTemplate.getLobHandler());
Expand Down
5 changes: 5 additions & 0 deletions symmetric/symmetric-io/pom.xml
Expand Up @@ -42,6 +42,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>provided</scope>
</dependency>
<!-- Databases -->
<dependency>
<groupId>org.apache.derby</groupId>
Expand Down
Expand Up @@ -20,6 +20,8 @@


package org.jumpmind.symmetric.io.data;

import org.jumpmind.db.sql.DmlStatement.DmlType;

/**
*
Expand Down Expand Up @@ -80,6 +82,19 @@ public boolean isDml() {

public String getCode() {
return this.code;
}

public DmlType getDmlType() {
switch (this) {
case INSERT:
return DmlType.INSERT;
case UPDATE:
return DmlType.UPDATE;
case DELETE:
return DmlType.DELETE;
default:
return DmlType.UNKNOWN;
}
}

public static DataEventType getEventType(String s) {
Expand Down
Expand Up @@ -150,12 +150,15 @@ public void write(CsvData data) {
boolean success = false;
switch (data.getDataEventType()) {
case UPDATE:
statistics.get(batch).increment(DatabaseWriterStatistics.STATEMENTCOUNT);
success = update(data);
break;
case INSERT:
statistics.get(batch).increment(DatabaseWriterStatistics.STATEMENTCOUNT);
success = insert(data);
break;
case DELETE:
statistics.get(batch).increment(DatabaseWriterStatistics.STATEMENTCOUNT);
success = delete(data);
break;
case BSH:
Expand All @@ -180,7 +183,6 @@ public void write(CsvData data) {
}
} else {
uncommittedCount++;
statistics.get(batch).increment(DatabaseWriterStatistics.STATEMENTCOUNT);
}

lastData = data;
Expand Down Expand Up @@ -224,8 +226,9 @@ protected void rollback() {
uncommittedCount = 0;
}

protected boolean requireNewStatement(CsvData data) {
protected boolean requireNewStatement(DmlType currentType, CsvData data) {
boolean requiresNew = currentDmlStatement == null || lastData == null
|| currentDmlStatement.getDmlType() != currentType
|| lastData.getDataEventType() != data.getDataEventType();
if (!requiresNew && data.getDataEventType() == DataEventType.UPDATE) {
String currentChanges = Arrays.toString(data.getChangedDataIndicators());
Expand All @@ -241,8 +244,8 @@ protected boolean filterBefore(CsvData data) {
try {
statistics.get(batch).startTimer(DatabaseWriterStatistics.FILTERMILLIS);
for (IDatabaseWriterFilter filter : filters) {
process &= filter.beforeWrite(this.context, this.targetTable != null ? this.targetTable : this.sourceTable,
data);
process &= filter.beforeWrite(this.context,
this.targetTable != null ? this.targetTable : this.sourceTable, data);
}
} finally {
statistics.get(batch).stopTimer(DatabaseWriterStatistics.FILTERMILLIS);
Expand Down Expand Up @@ -308,8 +311,8 @@ protected void filterAfter(CsvData data) {
try {
statistics.get(batch).startTimer(DatabaseWriterStatistics.FILTERMILLIS);
for (IDatabaseWriterFilter filter : filters) {
filter.afterWrite(this.context, this.targetTable != null ? this.targetTable : this.sourceTable,
data);
filter.afterWrite(this.context, this.targetTable != null ? this.targetTable
: this.sourceTable, data);
}
} finally {
statistics.get(batch).stopTimer(DatabaseWriterStatistics.FILTERMILLIS);
Expand All @@ -320,7 +323,7 @@ protected void filterAfter(CsvData data) {
protected boolean insert(CsvData data) {
try {
statistics.get(batch).startTimer(DatabaseWriterStatistics.DATABASEMILLIS);
if (requireNewStatement(data)) {
if (requireNewStatement(DmlType.INSERT, data)) {
this.currentDmlStatement = platform.createDmlStatement(DmlType.INSERT, targetTable);
transaction.prepare(this.currentDmlStatement.getSql());
}
Expand All @@ -346,7 +349,7 @@ protected boolean insert(CsvData data) {
protected boolean delete(CsvData data) {
try {
statistics.get(batch).startTimer(DatabaseWriterStatistics.DATABASEMILLIS);
if (requireNewStatement(data)) {
if (requireNewStatement(DmlType.DELETE, data)) {
this.currentDmlStatement = platform.createDmlStatement(DmlType.DELETE, targetTable);
transaction.prepare(this.currentDmlStatement.getSql());
}
Expand Down Expand Up @@ -377,7 +380,7 @@ protected boolean update(CsvData data) {
}
}
if (changedColumnNameList.size() > 0) {
if (requireNewStatement(data)) {
if (requireNewStatement(DmlType.UPDATE, data)) {
this.currentDmlStatement = platform
.createDmlStatement(
DmlType.UPDATE,
Expand Down Expand Up @@ -598,7 +601,7 @@ public Batch getBatch() {
public IDatabaseWriterConflictResolver getConflictResolver() {
return conflictResolver;
}

public void setConflictResolver(IDatabaseWriterConflictResolver conflictResolver) {
this.conflictResolver = conflictResolver;
}
Expand All @@ -622,5 +625,5 @@ public Map<Batch, Statistics> getStatistics() {
public ISqlTransaction getTransaction() {
return transaction;
}

}
Expand Up @@ -216,7 +216,7 @@ public void testBinaryColumnTypesForPostgres() throws Exception {
Assert.assertEquals("test 1 2 3", result);
}
}

@Test
public void testBenchmark() throws Exception {
Table table = buildSourceTable(TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
Expand Down
10 changes: 5 additions & 5 deletions symmetric/symmetric-server/pom.xml
Expand Up @@ -93,11 +93,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Framework -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Expand Up @@ -134,9 +134,9 @@ public void testStatistics() throws Exception {
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID,
TestConstants.TEST_CLIENT_EXTERNAL_ID });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

// Update becomes fallback insert
writer.write(CsvConstants.UPDATE);
Expand Down Expand Up @@ -175,7 +175,7 @@ public void testStatistics() throws Exception {
assertNotNull(batch);
assertEquals(batch.getStatus(), IncomingBatch.Status.ER, "Wrong status. " + printDatabase());
assertEquals(batch.getFailedRowNumber(), 8l, "Wrong failed row number. " + printDatabase());
assertEquals(batch.getByteCount(), 322l, "Wrong byte count. " + printDatabase());
assertEquals(batch.getByteCount(), 509l, "Wrong byte count. " + printDatabase());
assertEquals(batch.getStatementCount(), 8l, "Wrong statement count. " + printDatabase());
assertEquals(batch.getFallbackInsertCount(), 1l, "Wrong fallback insert count. "
+ printDatabase());
Expand All @@ -202,11 +202,9 @@ public void testUpdateCollision() throws Exception {
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID,
TestConstants.TEST_CLIENT_EXTERNAL_ID });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

String nextBatchId = getNextBatchId();

writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

// This insert will be OK
writer.write(CsvConstants.INSERT);
Expand Down Expand Up @@ -302,10 +300,6 @@ public void testSkippingResentBatch() throws Exception {
for (long i = 0; i < 7; i++) {
batchId--;
testSimple(CsvConstants.INSERT, values, values);
long expectedCount = 1;
if (i > 0) {
expectedCount = 0;
}
assertEquals(findIncomingBatchStatus(batchId, TestConstants.TEST_CLIENT_EXTERNAL_ID),
IncomingBatch.Status.OK, "Wrong status");
IncomingBatch batch = getIncomingBatchService().findIncomingBatch(batchId,
Expand All @@ -314,7 +308,7 @@ public void testSkippingResentBatch() throws Exception {
assertEquals(batch.getStatus(), IncomingBatch.Status.OK, "Wrong status");
assertEquals(batch.getSkipCount(), i);
assertEquals(batch.getFailedRowNumber(), 0l, "Wrong failed row number");
assertEquals(batch.getStatementCount(), expectedCount, "Wrong statement count");
assertEquals(batch.getStatementCount(), 1l, "Wrong statement count");
assertEquals(batch.getFallbackInsertCount(), 0l, "Wrong fallback insert count");
assertEquals(batch.getFallbackUpdateCount(), 0l, "Wrong fallback update count");
// pause to make sure we get a different start time on the incoming
Expand Down Expand Up @@ -487,16 +481,16 @@ public void testMultipleBatch() throws Exception {
CsvWriter writer = getWriter(out);
writer.writeRecord(new String[] { CsvConstants.NODEID,
TestConstants.TEST_CLIENT_EXTERNAL_ID });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);

String nextBatchId = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
writer.write(CsvConstants.INSERT);
writer.writeRecord(values, true);
writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId });

String nextBatchId2 = getNextBatchId();
writer.writeRecord(new String[] { CsvConstants.BATCH, nextBatchId2 });
writeTable(writer, TEST_TABLE, TEST_KEYS, TEST_COLUMNS);
writer.write(CsvConstants.INSERT);
writer.writeRecord(values2, true);
writer.writeRecord(new String[] { CsvConstants.COMMIT, nextBatchId2 });
Expand Down
Expand Up @@ -25,6 +25,7 @@

import javax.sql.DataSource;

import org.jumpmind.log.Log4jLog;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.logging.ILog;
Expand Down Expand Up @@ -53,6 +54,10 @@ public class AbstractDatabaseTest extends AbstractTest {
private String database = TestSetupUtil.getRootDbTypes(DatabaseTestSuite.DEFAULT_TEST_PREFIX)[0];

static boolean standalone = true;

static {
org.jumpmind.log.LogFactory.setLogClass(Log4jLog.class);
}

public void init(String database) {
this.database = database;
Expand Down

0 comments on commit 622b1e1

Please sign in to comment.