From db3e944070828f1f08e81e87bf273ffa37d766d1 Mon Sep 17 00:00:00 2001
From: chenson42
Date: Thu, 19 May 2011 21:11:48 +0000
Subject: [PATCH] very simple tests working
---
.../symmetric/core/model/AbstractCsvData.java | 46 +++--
.../jumpmind/symmetric/core/model/Batch.java | 5 +
.../jumpmind/symmetric/core/model/Data.java | 9 +-
.../core/process/sql/SqlDataWriter.java | 194 ++++++++++++------
.../core/process/sql/SqlDataWriterTest.java | 31 ++-
5 files changed, 199 insertions(+), 86 deletions(-)
diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java
index 12d8936892..f1e8c66345 100644
--- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java
+++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java
@@ -16,7 +16,8 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
*/
+ * under the License.
+ */
package org.jumpmind.symmetric.core.model;
@@ -26,10 +27,10 @@
import java.util.Map;
import org.jumpmind.symmetric.core.common.StringUtils;
+import org.jumpmind.symmetric.core.io.IoException;
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.csv.CsvUtils;
-
/**
*
*/
@@ -37,30 +38,37 @@ abstract class AbstractCsvData {
private Map parsedCsvData = null;
+ protected void removeData(String key) {
+ parsedCsvData.remove(key);
+ }
+
+ protected void putData(String key, String[] data) {
+ if (parsedCsvData == null) {
+ parsedCsvData = new HashMap(2);
+ }
+ parsedCsvData.put(key, data);
+ }
+
protected String[] getData(String key, String data) {
- if (!StringUtils.isBlank(data)) {
+ String[] values = null;
+ if (parsedCsvData != null && parsedCsvData.containsKey(key)) {
+ values = parsedCsvData.get(key);
+ } else if (!StringUtils.isBlank(data)) {
try {
- if (parsedCsvData == null) {
- parsedCsvData = new HashMap(2);
- }
- if (parsedCsvData.containsKey(key)) {
- return parsedCsvData.get(key);
+ CsvReader csvReader = CsvUtils.getCsvReader(new StringReader(data));
+ if (csvReader.readRecord()) {
+ values = csvReader.getValues();
+ putData(key, values);
} else {
- CsvReader csvReader = CsvUtils.getCsvReader(new StringReader(data));
- if (csvReader.readRecord()) {
- String[] values = csvReader.getValues();
- parsedCsvData.put(key, values);
- return values;
- } else {
- throw new IllegalStateException(String.format("Could not parse the data passed in: %s", data));
- }
+ throw new IllegalStateException(String.format(
+ "Could not parse the data passed in: %s", data));
}
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new IoException(e);
}
- } else {
- return null;
}
+ return values;
+
}
}
\ No newline at end of file
diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java
index cf9f2ce450..6ded5f012c 100644
--- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java
+++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java
@@ -20,6 +20,7 @@ public class Batch {
protected long otherCount;
protected long fallbackInsertCount;
protected long fallbackUpdateCount;
+ protected long fallbackUpdateWithNewKeysCount;
protected long missingDeleteCount;
protected long timerMillis;
@@ -34,6 +35,10 @@ public long incrementLineCount() {
public long incrementFallbackInsertCount() {
return ++fallbackInsertCount;
}
+
+ public long incrementFallbackUpdateWithNewKeysCount() {
+ return ++fallbackUpdateWithNewKeysCount;
+ }
public long incrementFallbackUpdateCount() {
return ++fallbackUpdateCount;
diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java
index ab59158d21..879a344787 100644
--- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java
+++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java
@@ -74,7 +74,7 @@ public Data(String tableName, DataEventType eventType, String rowData) {
this(-1, null, rowData, eventType, tableName, null, null, null, null);
}
- public Data(String pkData, String rowData, DataEventType eventType, String tableName) {
+ public Data(String tableName, DataEventType eventType, String pkData, String rowData) {
this(-1, pkData, rowData, eventType, tableName, null, null, null, null);
}
@@ -118,6 +118,11 @@ public String[] toParsedOldData() {
public String[] toParsedPkData() {
return getData("pkData", pkData);
}
+
+ public void clearPkData() {
+ this.pkData = null;
+ this.removeData("pkData");
+ }
public long getDataId() {
return dataId;
@@ -156,7 +161,7 @@ public String getPkData() {
}
public void setPkData(String pkData) {
- this.pkData = pkData;
+ this.pkData = pkData;
}
public String getOldData() {
diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java
index 815695047a..16beb1d008 100644
--- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java
+++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java
@@ -21,6 +21,7 @@
import org.jumpmind.symmetric.core.process.IDataWriter;
import org.jumpmind.symmetric.core.sql.DataIntegrityViolationException;
import org.jumpmind.symmetric.core.sql.ISqlTransaction;
+import org.jumpmind.symmetric.core.sql.SqlException;
import org.jumpmind.symmetric.core.sql.StatementBuilder;
import org.jumpmind.symmetric.core.sql.StatementBuilder.DmlType;
@@ -31,8 +32,6 @@ public class SqlDataWriter implements IDataWriter {
protected IDbPlatform platform;
- protected Parameters parameters;
-
protected List> columnFilters;
protected List> dataFilters;
@@ -41,30 +40,22 @@ public class SqlDataWriter implements IDataWriter {
protected ISqlTransaction transaction;
- protected DataEventType lastDataEvent;
-
- protected int uncommittedRows = 0;
-
- protected int maxRowsBeforeBatchFlush;
-
- protected boolean enableFallbackForInsert;
-
- protected boolean enableFallbackForUpdate;
-
- protected boolean allowMissingDeletes;
+ protected StatementBuilder statementBuilder;
- protected boolean useBatching;
+ protected Data lastData;
- protected int maxRowsBeforeCommit;
-
- protected boolean usePrimaryKeysFromSource;
+ protected int uncommittedRows = 0;
- protected boolean dontIncludeKeysInUpdateStatement;
+ protected Settings settings;
protected Batch batch;
protected DataContext ctx;
+ public SqlDataWriter(IDbPlatform platform) {
+ this(platform, new Settings());
+ }
+
public SqlDataWriter(IDbPlatform platform, Parameters parameters) {
this(platform, parameters, null, null);
}
@@ -72,26 +63,36 @@ public SqlDataWriter(IDbPlatform platform, Parameters parameters) {
public SqlDataWriter(IDbPlatform platform, Parameters parameters,
List> columnFilters,
List> dataFilters) {
- this.platform = platform;
- this.parameters = parameters != null ? parameters : new Parameters();
- this.columnFilters = columnFilters;
- this.dataFilters = dataFilters;
+ this(platform, new Settings(), columnFilters, dataFilters);
- this.maxRowsBeforeBatchFlush = parameters.getInt(
+ settings.maxRowsBeforeBatchFlush = parameters.getInt(
Parameters.LOADER_MAX_ROWS_BEFORE_BATCH_FLUSH, 10);
- this.enableFallbackForInsert = parameters
- .is(Parameters.LOADER_ENABLE_FALLBACK_INSERT, true);
- this.enableFallbackForUpdate = parameters
- .is(Parameters.LOADER_ENABLE_FALLBACK_UPDATE, true);
- this.allowMissingDeletes = parameters.is(Parameters.LOADER_ALLOW_MISSING_DELETES, true);
- this.useBatching = parameters.is(Parameters.LOADER_USE_BATCHING, true);
- this.maxRowsBeforeCommit = parameters
- .getInt(Parameters.LOADER_MAX_ROWS_BEFORE_COMMIT, 1000);
- this.usePrimaryKeysFromSource = parameters.is(Parameters.DB_USE_PKS_FROM_SOURCE, true);
- this.dontIncludeKeysInUpdateStatement = parameters.is(
+ settings.enableFallbackForInsert = parameters.is(Parameters.LOADER_ENABLE_FALLBACK_INSERT,
+ true);
+ settings.enableFallbackForUpdate = parameters.is(Parameters.LOADER_ENABLE_FALLBACK_UPDATE,
+ true);
+ settings.allowMissingDeletes = parameters.is(Parameters.LOADER_ALLOW_MISSING_DELETES, true);
+ settings.useBatching = parameters.is(Parameters.LOADER_USE_BATCHING, true);
+ settings.maxRowsBeforeCommit = parameters.getInt(Parameters.LOADER_MAX_ROWS_BEFORE_COMMIT,
+ 1000);
+ settings.usePrimaryKeysFromSource = parameters.is(Parameters.DB_USE_PKS_FROM_SOURCE, true);
+ settings.dontIncludeKeysInUpdateStatement = parameters.is(
Parameters.LOADER_DONT_INCLUDE_PKS_IN_UPDATE, false);
}
+ public SqlDataWriter(IDbPlatform platform, Settings settings) {
+ this(platform, settings, null, null);
+ }
+
+ public SqlDataWriter(IDbPlatform platform, Settings settings,
+ List> columnFilters,
+ List> dataFilters) {
+ this.platform = platform;
+ this.columnFilters = columnFilters;
+ this.dataFilters = dataFilters;
+ this.settings = settings;
+ }
+
public DataContext createDataContext() {
return new DataContext();
}
@@ -102,13 +103,12 @@ public void open(DataContext context) {
}
public boolean switchTables(Table sourceTable) {
- this.lastDataEvent = null;
if (sourceTable != null) {
this.targetTable = platform.findTable(sourceTable.getCatalogName(),
sourceTable.getSchemaName(), sourceTable.getTableName(), true).copy();
if (this.targetTable != null) {
this.targetTable.reOrderColumns(sourceTable.getColumns(),
- this.usePrimaryKeysFromSource);
+ settings.usePrimaryKeysFromSource);
return true;
} else {
log.log(LogLevel.WARN, "Did not find the %s table in the target database",
@@ -123,17 +123,21 @@ public boolean switchTables(Table sourceTable) {
}
public void startBatch(Batch batch) {
- this.lastDataEvent = null;
+ this.statementBuilder = null;
this.batch = batch;
}
public void writeData(Data data) {
- writeData(data, this.useBatching);
- lastDataEvent = data.getEventType();
+ writeData(data, settings.useBatching);
+ this.lastData = data;
}
protected void writeData(Data data, boolean batchMode) {
+ if (lastData != null && lastData.getEventType() != data.getEventType()) {
+ transaction.flush();
+ }
+
try {
switch (data.getEventType()) {
case INSERT:
@@ -156,7 +160,7 @@ protected void writeData(Data data, boolean batchMode) {
uncommittedRows++;
// check if an early commit needs to happen
- if (uncommittedRows > this.maxRowsBeforeCommit) {
+ if (uncommittedRows > settings.maxRowsBeforeCommit) {
commit();
}
@@ -199,7 +203,7 @@ protected boolean doesColumnNeedUpdated(int columnIndex, Column column, Data dat
needsUpdated = !StringUtils.equals(rowData[columnIndex], oldData[columnIndex])
|| (platform.isLob(column.getTypeCode()) && (platform.getPlatformInfo()
.isNeedsToSelectLobData() || StringUtils.isBlank(oldData[columnIndex])));
- } else if (dontIncludeKeysInUpdateStatement) {
+ } else if (settings.dontIncludeKeysInUpdateStatement) {
// This is in support of creating update statements that don't use
// the keys in the set portion of the update statement.
In
// oracle (and maybe not only in oracle) if there is no index on
@@ -234,7 +238,6 @@ protected String getPkDataFor(Data data, Column column) {
}
protected int executeUpdateSql(Data data) {
- StatementBuilder st = null;
String[] columnValues = data.toParsedRowData();
ArrayList changedColumnNameList = new ArrayList();
ArrayList changedColumnValueList = new ArrayList();
@@ -250,13 +253,14 @@ protected int executeUpdateSql(Data data) {
}
}
if (changedColumnNameList.size() > 0) {
- st = getStatementBuilder(DmlType.UPDATE, targetTable.getPrimaryKeyColumnsArray(),
+ this.statementBuilder = getStatementBuilder(DmlType.UPDATE,
+ targetTable.getPrimaryKeyColumnsArray(),
changedColumnMetaList.toArray(new Column[changedColumnMetaList.size()]));
columnValues = (String[]) changedColumnValueList
.toArray(new String[changedColumnValueList.size()]);
String[] values = (String[]) ArrayUtils.addAll(columnValues, getPkData(data));
- transaction.prepare(st.getSql(), -1, false);
- return execute(st, data, values);
+ transaction.prepare(this.statementBuilder.getSql(), -1, false);
+ return execute(data, values);
} else {
// There was no change to apply
return 1;
@@ -278,11 +282,25 @@ protected String[] getPkData(Data data) {
}
protected void executeInsertSql(Data data, boolean batchMode) {
- StatementBuilder st = getStatementBuilder(DmlType.INSERT, null, targetTable.getColumns());
- if (this.lastDataEvent != DataEventType.INSERT) {
- transaction.prepare(st.getSql(), this.maxRowsBeforeBatchFlush, batchMode);
+ if (this.statementBuilder == null || this.lastData == null
+ || this.lastData.getEventType() != DataEventType.INSERT) {
+ this.statementBuilder = getStatementBuilder(DmlType.INSERT, null,
+ targetTable.getColumns());
+ transaction.prepare(this.statementBuilder.getSql(), settings.maxRowsBeforeBatchFlush,
+ batchMode);
}
- execute(st, data, data.toParsedRowData());
+ execute(data, data.toParsedRowData());
+ }
+
+ protected int executeDeleteSql(Data data, boolean batchMode) {
+ if (this.statementBuilder == null || this.lastData == null
+ || this.lastData.getEventType() != DataEventType.DELETE) {
+ this.statementBuilder = getStatementBuilder(DmlType.DELETE,
+ targetTable.getPrimaryKeyColumnsArray(), targetTable.getColumns());
+ transaction.prepare(this.statementBuilder.getSql(), settings.maxRowsBeforeBatchFlush,
+ batchMode);
+ }
+ return execute(data, data.toParsedPkData());
}
protected void processInsert(Data data, boolean batchMode) {
@@ -294,8 +312,8 @@ protected void processInsert(Data data, boolean batchMode) {
executeInsertSql(data, batchMode);
} catch (DataIntegrityViolationException e) {
// TODO log insert failed
- if (enableFallbackForInsert && !batchMode) {
- this.lastDataEvent = null;
+ if (settings.enableFallbackForInsert && !batchMode) {
+ this.statementBuilder = null;
// TODO rollback to save point
batch.incrementFallbackUpdateCount();
executeUpdateSql(data);
@@ -313,12 +331,30 @@ protected void processUpdate(Data data) {
batch.incrementUpdateCount();
try {
batch.startTimer();
- executeUpdateSql(data);
+ int updateCount = executeUpdateSql(data);
+ if (updateCount == 0) {
+ if (settings.enableFallbackForUpdate) {
+ // The row was missing, fallback to an insert
+ batch.incrementFallbackInsertCount();
+ executeInsertSql(data, false);
+ } else {
+ throw new SqlException("There were no rows to update");
+ }
+ }
} catch (DataIntegrityViolationException e) {
- // TODO log update failed
- if (enableFallbackForUpdate) {
- batch.incrementFallbackInsertCount();
- executeInsertSql(data, false);
+ // If we got here, most likely scenario is that the update
+ // has already run and updated the primary key.
+ // Let's attempt to run the update using the new
+ // key values.
+ if (settings.enableFallbackForUpdate) {
+ // remove the old pk values so that the new ones will be
+ // used
+ data.clearPkData();
+ batch.incrementFallbackUpdateWithNewKeysCount();
+ int updateCount = executeUpdateSql(data);
+ if (updateCount == 0) {
+ throw new SqlException("There were no rows to update using");
+ }
} else {
throw e;
}
@@ -329,7 +365,21 @@ protected void processUpdate(Data data) {
}
protected void processDelete(Data data, boolean batchMode) {
- // TODO
+ if (filterData(data, ctx)) {
+ batch.incrementDeleteCount();
+ batch.startTimer();
+ try {
+ int updateCount = executeDeleteSql(data, batchMode);
+ if (!batchMode && updateCount == 0) {
+ batch.incrementMissingDeleteCount();
+ if (!settings.allowMissingDeletes) {
+ throw new SqlException("No rows were deleted");
+ }
+ }
+ } finally {
+ batch.incrementDatabaseMillis(batch.endTimer());
+ }
+ }
}
protected void processSql(Data data) {
@@ -337,26 +387,26 @@ protected void processSql(Data data) {
}
protected boolean isCorrectForIntegrityViolation(Data data) {
- if (data.getEventType() == DataEventType.INSERT && enableFallbackForInsert) {
+ if (data.getEventType() == DataEventType.INSERT && settings.enableFallbackForInsert) {
return true;
- } else if (data.getEventType() == DataEventType.UPDATE && enableFallbackForUpdate) {
+ } else if (data.getEventType() == DataEventType.UPDATE && settings.enableFallbackForUpdate) {
return true;
- } else if (data.getEventType() == DataEventType.DELETE && allowMissingDeletes) {
+ } else if (data.getEventType() == DataEventType.DELETE && settings.allowMissingDeletes) {
return true;
} else {
return false;
}
}
- protected int execute(StatementBuilder st, Data data, String[] values) {
+ protected int execute(Data data, String[] values) {
Object[] objectValues = platform.getObjectValues(ctx.getBinaryEncoding(), values,
- st.getMetaData(true));
+ statementBuilder.getMetaData(true));
if (columnFilters != null) {
for (IColumnFilter columnFilter : columnFilters) {
objectValues = columnFilter.filterColumnsValues(ctx, targetTable, objectValues);
}
}
- return transaction.update(data, objectValues, st.getTypes());
+ return transaction.update(data, objectValues, this.statementBuilder.getTypes());
}
final private StatementBuilder getStatementBuilder(DmlType dmlType, Column[] lookupColumns,
@@ -391,4 +441,24 @@ public void finishBatch(Batch batch) {
commit();
}
+ public static class Settings {
+
+ protected int maxRowsBeforeBatchFlush;
+
+ protected boolean enableFallbackForInsert;
+
+ protected boolean enableFallbackForUpdate;
+
+ protected boolean allowMissingDeletes;
+
+ protected boolean useBatching;
+
+ protected int maxRowsBeforeCommit;
+
+ protected boolean usePrimaryKeysFromSource;
+
+ protected boolean dontIncludeKeysInUpdateStatement;
+
+ }
+
}
diff --git a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java
index b80d615105..5fc69bea29 100644
--- a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java
+++ b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java
@@ -23,14 +23,39 @@ public void cleanupTestTable() {
@Test
public void testOneRowInsert() {
+ writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""));
+ Assert.assertEquals(1, count(testTable.getTableName()));
+ }
+
+ @Test
+ public void testOneRowInsertOneRowUpdate() {
+ writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""),
+ new Data(testTable.getTableName(), DataEventType.UPDATE, "1", "1,\"updated\""));
+ Assert.assertEquals(1, count(testTable.getTableName()));
+ Assert.assertEquals(
+ "updated",
+ getPlatform().getSqlConnection().queryForObject(
+ String.format("select TEST_TEXT from %s where TEST_ID=?",
+ testTable.getTableName()), String.class, 1));
+ }
+
+ @Test
+ public void testOneRowInsertOneRowDelete() {
+ writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""),
+ new Data(testTable.getTableName(), DataEventType.DELETE, "1", null));
+ Assert.assertEquals(0, count(testTable.getTableName()));
+ }
+
+ protected void writeToTestTable(Data... datas) {
SqlDataWriter writer = new SqlDataWriter(getPlatform(), new Parameters());
writer.open(writer.createDataContext());
Batch batch = new Batch();
writer.startBatch(batch);
writer.switchTables(testTable);
- writer.writeData(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""));
+ for (Data data : datas) {
+ writer.writeData(data);
+ }
writer.finishBatch(batch);
- writer.close();
- Assert.assertEquals(1, count(testTable.getTableName()));
+ writer.close();
}
}