diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java index 12d8936892..f1e8c66345 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/AbstractCsvData.java @@ -16,7 +16,8 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. */ + * under the License. + */ package org.jumpmind.symmetric.core.model; @@ -26,10 +27,10 @@ import java.util.Map; import org.jumpmind.symmetric.core.common.StringUtils; +import org.jumpmind.symmetric.core.io.IoException; import org.jumpmind.symmetric.csv.CsvReader; import org.jumpmind.symmetric.csv.CsvUtils; - /** * */ @@ -37,30 +38,37 @@ abstract class AbstractCsvData { private Map parsedCsvData = null; + protected void removeData(String key) { + parsedCsvData.remove(key); + } + + protected void putData(String key, String[] data) { + if (parsedCsvData == null) { + parsedCsvData = new HashMap(2); + } + parsedCsvData.put(key, data); + } + protected String[] getData(String key, String data) { - if (!StringUtils.isBlank(data)) { + String[] values = null; + if (parsedCsvData != null && parsedCsvData.containsKey(key)) { + values = parsedCsvData.get(key); + } else if (!StringUtils.isBlank(data)) { try { - if (parsedCsvData == null) { - parsedCsvData = new HashMap(2); - } - if (parsedCsvData.containsKey(key)) { - return parsedCsvData.get(key); + CsvReader csvReader = CsvUtils.getCsvReader(new StringReader(data)); + if (csvReader.readRecord()) { + values = csvReader.getValues(); + putData(key, values); } else { - CsvReader csvReader = CsvUtils.getCsvReader(new StringReader(data)); - if (csvReader.readRecord()) { - String[] values = csvReader.getValues(); - parsedCsvData.put(key, values); - return values; - } else { - throw new IllegalStateException(String.format("Could not parse the data passed in: %s", data)); - } + throw new IllegalStateException(String.format( + "Could not parse the data passed in: %s", data)); } } catch (IOException e) { - throw new RuntimeException(e); + throw new IoException(e); } - } else { - return null; } + return values; + } } \ No newline at end of file diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java index cf9f2ce450..6ded5f012c 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Batch.java @@ -20,6 +20,7 @@ public class Batch { protected long otherCount; protected long fallbackInsertCount; protected long fallbackUpdateCount; + protected long fallbackUpdateWithNewKeysCount; protected long missingDeleteCount; protected long timerMillis; @@ -34,6 +35,10 @@ public long incrementLineCount() { public long incrementFallbackInsertCount() { return ++fallbackInsertCount; } + + public long incrementFallbackUpdateWithNewKeysCount() { + return ++fallbackUpdateWithNewKeysCount; + } public long incrementFallbackUpdateCount() { return ++fallbackUpdateCount; diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java index ab59158d21..879a344787 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/model/Data.java @@ -74,7 +74,7 @@ public Data(String tableName, DataEventType eventType, String rowData) { this(-1, null, rowData, eventType, tableName, null, null, null, null); } - public Data(String pkData, String rowData, DataEventType eventType, String tableName) { + public Data(String tableName, DataEventType eventType, String pkData, String rowData) { this(-1, pkData, rowData, eventType, tableName, null, null, null, null); } @@ -118,6 +118,11 @@ public String[] toParsedOldData() { public String[] toParsedPkData() { return getData("pkData", pkData); } + + public void clearPkData() { + this.pkData = null; + this.removeData("pkData"); + } public long getDataId() { return dataId; @@ -156,7 +161,7 @@ public String getPkData() { } public void setPkData(String pkData) { - this.pkData = pkData; + this.pkData = pkData; } public String getOldData() { diff --git a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java index 815695047a..16beb1d008 100644 --- a/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java +++ b/future/symmetric3-core/src/main/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriter.java @@ -21,6 +21,7 @@ import org.jumpmind.symmetric.core.process.IDataWriter; import org.jumpmind.symmetric.core.sql.DataIntegrityViolationException; import org.jumpmind.symmetric.core.sql.ISqlTransaction; +import org.jumpmind.symmetric.core.sql.SqlException; import org.jumpmind.symmetric.core.sql.StatementBuilder; import org.jumpmind.symmetric.core.sql.StatementBuilder.DmlType; @@ -31,8 +32,6 @@ public class SqlDataWriter implements IDataWriter { protected IDbPlatform platform; - protected Parameters parameters; - protected List> columnFilters; protected List> dataFilters; @@ -41,30 +40,22 @@ public class SqlDataWriter implements IDataWriter { protected ISqlTransaction transaction; - protected DataEventType lastDataEvent; - - protected int uncommittedRows = 0; - - protected int maxRowsBeforeBatchFlush; - - protected boolean enableFallbackForInsert; - - protected boolean enableFallbackForUpdate; - - protected boolean allowMissingDeletes; + protected StatementBuilder statementBuilder; - protected boolean useBatching; + protected Data lastData; - protected int maxRowsBeforeCommit; - - protected boolean usePrimaryKeysFromSource; + protected int uncommittedRows = 0; - protected boolean dontIncludeKeysInUpdateStatement; + protected Settings settings; protected Batch batch; protected DataContext ctx; + public SqlDataWriter(IDbPlatform platform) { + this(platform, new Settings()); + } + public SqlDataWriter(IDbPlatform platform, Parameters parameters) { this(platform, parameters, null, null); } @@ -72,26 +63,36 @@ public SqlDataWriter(IDbPlatform platform, Parameters parameters) { public SqlDataWriter(IDbPlatform platform, Parameters parameters, List> columnFilters, List> dataFilters) { - this.platform = platform; - this.parameters = parameters != null ? parameters : new Parameters(); - this.columnFilters = columnFilters; - this.dataFilters = dataFilters; + this(platform, new Settings(), columnFilters, dataFilters); - this.maxRowsBeforeBatchFlush = parameters.getInt( + settings.maxRowsBeforeBatchFlush = parameters.getInt( Parameters.LOADER_MAX_ROWS_BEFORE_BATCH_FLUSH, 10); - this.enableFallbackForInsert = parameters - .is(Parameters.LOADER_ENABLE_FALLBACK_INSERT, true); - this.enableFallbackForUpdate = parameters - .is(Parameters.LOADER_ENABLE_FALLBACK_UPDATE, true); - this.allowMissingDeletes = parameters.is(Parameters.LOADER_ALLOW_MISSING_DELETES, true); - this.useBatching = parameters.is(Parameters.LOADER_USE_BATCHING, true); - this.maxRowsBeforeCommit = parameters - .getInt(Parameters.LOADER_MAX_ROWS_BEFORE_COMMIT, 1000); - this.usePrimaryKeysFromSource = parameters.is(Parameters.DB_USE_PKS_FROM_SOURCE, true); - this.dontIncludeKeysInUpdateStatement = parameters.is( + settings.enableFallbackForInsert = parameters.is(Parameters.LOADER_ENABLE_FALLBACK_INSERT, + true); + settings.enableFallbackForUpdate = parameters.is(Parameters.LOADER_ENABLE_FALLBACK_UPDATE, + true); + settings.allowMissingDeletes = parameters.is(Parameters.LOADER_ALLOW_MISSING_DELETES, true); + settings.useBatching = parameters.is(Parameters.LOADER_USE_BATCHING, true); + settings.maxRowsBeforeCommit = parameters.getInt(Parameters.LOADER_MAX_ROWS_BEFORE_COMMIT, + 1000); + settings.usePrimaryKeysFromSource = parameters.is(Parameters.DB_USE_PKS_FROM_SOURCE, true); + settings.dontIncludeKeysInUpdateStatement = parameters.is( Parameters.LOADER_DONT_INCLUDE_PKS_IN_UPDATE, false); } + public SqlDataWriter(IDbPlatform platform, Settings settings) { + this(platform, settings, null, null); + } + + public SqlDataWriter(IDbPlatform platform, Settings settings, + List> columnFilters, + List> dataFilters) { + this.platform = platform; + this.columnFilters = columnFilters; + this.dataFilters = dataFilters; + this.settings = settings; + } + public DataContext createDataContext() { return new DataContext(); } @@ -102,13 +103,12 @@ public void open(DataContext context) { } public boolean switchTables(Table sourceTable) { - this.lastDataEvent = null; if (sourceTable != null) { this.targetTable = platform.findTable(sourceTable.getCatalogName(), sourceTable.getSchemaName(), sourceTable.getTableName(), true).copy(); if (this.targetTable != null) { this.targetTable.reOrderColumns(sourceTable.getColumns(), - this.usePrimaryKeysFromSource); + settings.usePrimaryKeysFromSource); return true; } else { log.log(LogLevel.WARN, "Did not find the %s table in the target database", @@ -123,17 +123,21 @@ public boolean switchTables(Table sourceTable) { } public void startBatch(Batch batch) { - this.lastDataEvent = null; + this.statementBuilder = null; this.batch = batch; } public void writeData(Data data) { - writeData(data, this.useBatching); - lastDataEvent = data.getEventType(); + writeData(data, settings.useBatching); + this.lastData = data; } protected void writeData(Data data, boolean batchMode) { + if (lastData != null && lastData.getEventType() != data.getEventType()) { + transaction.flush(); + } + try { switch (data.getEventType()) { case INSERT: @@ -156,7 +160,7 @@ protected void writeData(Data data, boolean batchMode) { uncommittedRows++; // check if an early commit needs to happen - if (uncommittedRows > this.maxRowsBeforeCommit) { + if (uncommittedRows > settings.maxRowsBeforeCommit) { commit(); } @@ -199,7 +203,7 @@ protected boolean doesColumnNeedUpdated(int columnIndex, Column column, Data dat needsUpdated = !StringUtils.equals(rowData[columnIndex], oldData[columnIndex]) || (platform.isLob(column.getTypeCode()) && (platform.getPlatformInfo() .isNeedsToSelectLobData() || StringUtils.isBlank(oldData[columnIndex]))); - } else if (dontIncludeKeysInUpdateStatement) { + } else if (settings.dontIncludeKeysInUpdateStatement) { // This is in support of creating update statements that don't use // the keys in the set portion of the update statement.

In // oracle (and maybe not only in oracle) if there is no index on @@ -234,7 +238,6 @@ protected String getPkDataFor(Data data, Column column) { } protected int executeUpdateSql(Data data) { - StatementBuilder st = null; String[] columnValues = data.toParsedRowData(); ArrayList changedColumnNameList = new ArrayList(); ArrayList changedColumnValueList = new ArrayList(); @@ -250,13 +253,14 @@ protected int executeUpdateSql(Data data) { } } if (changedColumnNameList.size() > 0) { - st = getStatementBuilder(DmlType.UPDATE, targetTable.getPrimaryKeyColumnsArray(), + this.statementBuilder = getStatementBuilder(DmlType.UPDATE, + targetTable.getPrimaryKeyColumnsArray(), changedColumnMetaList.toArray(new Column[changedColumnMetaList.size()])); columnValues = (String[]) changedColumnValueList .toArray(new String[changedColumnValueList.size()]); String[] values = (String[]) ArrayUtils.addAll(columnValues, getPkData(data)); - transaction.prepare(st.getSql(), -1, false); - return execute(st, data, values); + transaction.prepare(this.statementBuilder.getSql(), -1, false); + return execute(data, values); } else { // There was no change to apply return 1; @@ -278,11 +282,25 @@ protected String[] getPkData(Data data) { } protected void executeInsertSql(Data data, boolean batchMode) { - StatementBuilder st = getStatementBuilder(DmlType.INSERT, null, targetTable.getColumns()); - if (this.lastDataEvent != DataEventType.INSERT) { - transaction.prepare(st.getSql(), this.maxRowsBeforeBatchFlush, batchMode); + if (this.statementBuilder == null || this.lastData == null + || this.lastData.getEventType() != DataEventType.INSERT) { + this.statementBuilder = getStatementBuilder(DmlType.INSERT, null, + targetTable.getColumns()); + transaction.prepare(this.statementBuilder.getSql(), settings.maxRowsBeforeBatchFlush, + batchMode); } - execute(st, data, data.toParsedRowData()); + execute(data, data.toParsedRowData()); + } + + protected int executeDeleteSql(Data data, boolean batchMode) { + if (this.statementBuilder == null || this.lastData == null + || this.lastData.getEventType() != DataEventType.DELETE) { + this.statementBuilder = getStatementBuilder(DmlType.DELETE, + targetTable.getPrimaryKeyColumnsArray(), targetTable.getColumns()); + transaction.prepare(this.statementBuilder.getSql(), settings.maxRowsBeforeBatchFlush, + batchMode); + } + return execute(data, data.toParsedPkData()); } protected void processInsert(Data data, boolean batchMode) { @@ -294,8 +312,8 @@ protected void processInsert(Data data, boolean batchMode) { executeInsertSql(data, batchMode); } catch (DataIntegrityViolationException e) { // TODO log insert failed - if (enableFallbackForInsert && !batchMode) { - this.lastDataEvent = null; + if (settings.enableFallbackForInsert && !batchMode) { + this.statementBuilder = null; // TODO rollback to save point batch.incrementFallbackUpdateCount(); executeUpdateSql(data); @@ -313,12 +331,30 @@ protected void processUpdate(Data data) { batch.incrementUpdateCount(); try { batch.startTimer(); - executeUpdateSql(data); + int updateCount = executeUpdateSql(data); + if (updateCount == 0) { + if (settings.enableFallbackForUpdate) { + // The row was missing, fallback to an insert + batch.incrementFallbackInsertCount(); + executeInsertSql(data, false); + } else { + throw new SqlException("There were no rows to update"); + } + } } catch (DataIntegrityViolationException e) { - // TODO log update failed - if (enableFallbackForUpdate) { - batch.incrementFallbackInsertCount(); - executeInsertSql(data, false); + // If we got here, most likely scenario is that the update + // has already run and updated the primary key. + // Let's attempt to run the update using the new + // key values. + if (settings.enableFallbackForUpdate) { + // remove the old pk values so that the new ones will be + // used + data.clearPkData(); + batch.incrementFallbackUpdateWithNewKeysCount(); + int updateCount = executeUpdateSql(data); + if (updateCount == 0) { + throw new SqlException("There were no rows to update using"); + } } else { throw e; } @@ -329,7 +365,21 @@ protected void processUpdate(Data data) { } protected void processDelete(Data data, boolean batchMode) { - // TODO + if (filterData(data, ctx)) { + batch.incrementDeleteCount(); + batch.startTimer(); + try { + int updateCount = executeDeleteSql(data, batchMode); + if (!batchMode && updateCount == 0) { + batch.incrementMissingDeleteCount(); + if (!settings.allowMissingDeletes) { + throw new SqlException("No rows were deleted"); + } + } + } finally { + batch.incrementDatabaseMillis(batch.endTimer()); + } + } } protected void processSql(Data data) { @@ -337,26 +387,26 @@ protected void processSql(Data data) { } protected boolean isCorrectForIntegrityViolation(Data data) { - if (data.getEventType() == DataEventType.INSERT && enableFallbackForInsert) { + if (data.getEventType() == DataEventType.INSERT && settings.enableFallbackForInsert) { return true; - } else if (data.getEventType() == DataEventType.UPDATE && enableFallbackForUpdate) { + } else if (data.getEventType() == DataEventType.UPDATE && settings.enableFallbackForUpdate) { return true; - } else if (data.getEventType() == DataEventType.DELETE && allowMissingDeletes) { + } else if (data.getEventType() == DataEventType.DELETE && settings.allowMissingDeletes) { return true; } else { return false; } } - protected int execute(StatementBuilder st, Data data, String[] values) { + protected int execute(Data data, String[] values) { Object[] objectValues = platform.getObjectValues(ctx.getBinaryEncoding(), values, - st.getMetaData(true)); + statementBuilder.getMetaData(true)); if (columnFilters != null) { for (IColumnFilter columnFilter : columnFilters) { objectValues = columnFilter.filterColumnsValues(ctx, targetTable, objectValues); } } - return transaction.update(data, objectValues, st.getTypes()); + return transaction.update(data, objectValues, this.statementBuilder.getTypes()); } final private StatementBuilder getStatementBuilder(DmlType dmlType, Column[] lookupColumns, @@ -391,4 +441,24 @@ public void finishBatch(Batch batch) { commit(); } + public static class Settings { + + protected int maxRowsBeforeBatchFlush; + + protected boolean enableFallbackForInsert; + + protected boolean enableFallbackForUpdate; + + protected boolean allowMissingDeletes; + + protected boolean useBatching; + + protected int maxRowsBeforeCommit; + + protected boolean usePrimaryKeysFromSource; + + protected boolean dontIncludeKeysInUpdateStatement; + + } + } diff --git a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java index b80d615105..5fc69bea29 100644 --- a/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java +++ b/future/symmetric3-jdbc/src/test/java/org/jumpmind/symmetric/core/process/sql/SqlDataWriterTest.java @@ -23,14 +23,39 @@ public void cleanupTestTable() { @Test public void testOneRowInsert() { + writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\"")); + Assert.assertEquals(1, count(testTable.getTableName())); + } + + @Test + public void testOneRowInsertOneRowUpdate() { + writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""), + new Data(testTable.getTableName(), DataEventType.UPDATE, "1", "1,\"updated\"")); + Assert.assertEquals(1, count(testTable.getTableName())); + Assert.assertEquals( + "updated", + getPlatform().getSqlConnection().queryForObject( + String.format("select TEST_TEXT from %s where TEST_ID=?", + testTable.getTableName()), String.class, 1)); + } + + @Test + public void testOneRowInsertOneRowDelete() { + writeToTestTable(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\""), + new Data(testTable.getTableName(), DataEventType.DELETE, "1", null)); + Assert.assertEquals(0, count(testTable.getTableName())); + } + + protected void writeToTestTable(Data... datas) { SqlDataWriter writer = new SqlDataWriter(getPlatform(), new Parameters()); writer.open(writer.createDataContext()); Batch batch = new Batch(); writer.startBatch(batch); writer.switchTables(testTable); - writer.writeData(new Data(testTable.getTableName(), DataEventType.INSERT, "1,\"test\"")); + for (Data data : datas) { + writer.writeData(data); + } writer.finishBatch(batch); - writer.close(); - Assert.assertEquals(1, count(testTable.getTableName())); + writer.close(); } }