diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index ab17e9a2e3..1fd0a73004 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -651,10 +651,9 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, "Could not find the table, %s, to extract", Table.getFullyQualifiedTableName(catalogName, schemaName, tableName))); } - currentTable = currentTable.copy(); + currentTable = currentTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(), triggerHistory.getParsedPkColumnNames(), true); currentTable.setCatalog(catalogName); currentTable.setSchema(schemaName); - currentTable.orderColumns(triggerHistory.getParsedColumnNames()); Router router = triggerRouterService.getRouterById(routerId, false); if (router != null && setTargetTableName) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 72be9a641d..8eae44c503 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -40,6 +40,7 @@ import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; +import org.jumpmind.db.sql.SqlException; import org.jumpmind.db.sql.UniqueKeyException; import org.jumpmind.exception.IoException; import org.jumpmind.symmetric.ISymmetricEngine; @@ -366,7 +367,7 @@ protected void logAndRethrow(Exception ex) throws IOException { } throw (IOException) ex; } else { - if (!(ex instanceof ConflictException)) { + if (!(ex instanceof ConflictException || ex instanceof SqlException)) { log.error("Failed while parsing batch", ex); } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index db27a8a374..378e8243b9 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -755,6 +755,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean genAlways) { // make sure channels are read from the database configurationService.reloadChannels(); + List triggersForCurrentNode = getTriggersForCurrentNode(); inactivateTriggers(triggersForCurrentNode, sqlBuffer); updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, genAlways); @@ -899,7 +900,12 @@ protected void updateOrCreateDatabaseTriggers(List triggers, StringBuil Set tables = getTablesForTrigger(trigger, triggers); if (tables.size() > 0) { - for (Table table : tables) { + for (Table table : tables) { + if (table.getPrimaryKeyColumnCount() == 0) { + table = table.copy(); + table.makeAllColumnsPrimaryKeys(); + } + TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger( trigger.getTriggerId(), trigger.getSourceCatalogName(), diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java index 391842b206..78fb261d1f 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/model/Table.java @@ -98,7 +98,7 @@ public Table(String catalog, String schema, String tableName, String[] columnNam } } - + public void removeAllColumns() { columns.clear(); } @@ -962,23 +962,6 @@ public Column[] getDistributionKeyColumns() { } } - public void reOrderColumns(Column[] targetOrder, boolean copyPrimaryKeys) { - ArrayList orderedColumns = new ArrayList(targetOrder.length); - for (int i = 0; i < targetOrder.length; i++) { - String name = targetOrder[i].getName(); - for (Column column : columns) { - if (column.getName().equalsIgnoreCase(name)) { - orderedColumns.add(column); - if (copyPrimaryKeys) { - column.setPrimaryKey(targetOrder[i].isPrimaryKey()); - } - break; - } - } - } - columns = orderedColumns; - } - public void orderColumns(String[] columnNames) { Column[] orderedColumns = orderColumns(columnNames, this); this.columns.clear(); @@ -1019,6 +1002,36 @@ public Table copy() { } } + public Table copyAndFilterColumns(String[] orderedColumnNames, String[] pkColumnNames, + boolean setPrimaryKeys) { + Table table = copy(); + orderColumns(orderedColumnNames); + + if (setPrimaryKeys) { + for (Column column : columns) { + column.setPrimaryKey(false); + } + + if (pkColumnNames != null) { + for (Column column : columns) { + for (String pkColumnName : pkColumnNames) { + if (column.getName().equals(pkColumnName)) { + column.setPrimaryKey(true); + } + } + } + } + } + + return table; + } + + public void makeAllColumnsPrimaryKeys() { + for (Column column : columns) { + column.setPrimaryKey(true); + } + } + public String[] getColumnNames() { String[] columnNames = new String[columns.size()]; int i = 0; @@ -1037,7 +1050,7 @@ public String[] getPrimaryKeyColumnNames() { } return columnNames; } - + public int calculateTableHashcode() { final int PRIME = 31; int result = 1; @@ -1046,7 +1059,7 @@ public int calculateTableHashcode() { result = PRIME * result + calculateHashcodeForColumns(PRIME, getPrimaryKeyColumns()); return result; } - + private static int calculateHashcodeForColumns(final int PRIME, Column[] cols) { int result = 1; if (cols != null && cols.length > 0) { @@ -1059,7 +1072,6 @@ private static int calculateHashcodeForColumns(final int PRIME, Column[] cols) { return result; } - public static boolean areAllColumnsPrimaryKeys(Column[] columns) { boolean allPks = true; if (columns != null) { diff --git a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java index 628b4b6d78..cb20a4f29d 100644 --- a/symmetric/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java +++ b/symmetric/symmetric-db/src/main/java/org/jumpmind/db/platform/AbstractDatabasePlatform.java @@ -71,11 +71,12 @@ public abstract class AbstractDatabasePlatform implements IDatabasePlatform { public static final String[] TIME_PATTERNS = { "HH:mm:ss.S", "HH:mm:ss", "yyyy-MM-dd HH:mm:ss.S", "yyyy-MM-dd HH:mm:ss" }; - - public static final FastDateFormat TIMESTAMP_FORMATTER = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS"); + + public static final FastDateFormat TIMESTAMP_FORMATTER = FastDateFormat + .getInstance("yyyy-MM-dd HH:mm:ss.SSS"); public static final FastDateFormat TIME_FORMATTER = FastDateFormat.getInstance("HH:mm:ss.SSS"); - + public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " "; /* The default name for models read from the database, if no name as given. */ @@ -106,7 +107,7 @@ public abstract class AbstractDatabasePlatform implements IDatabasePlatform { public AbstractDatabasePlatform() { } - + public DatabaseInfo getDatabaseInfo() { return getDdlBuilder().getDatabaseInfo(); } @@ -246,7 +247,7 @@ public Table readTableFromDatabase(String catalogName, String schemaName, String } } } - + if (table != null && log.isDebugEnabled()) { log.debug("Just read table: \n{}", table.toVerboseString()); } @@ -303,74 +304,83 @@ public Object[] getObjectValues(BinaryEncoding encoding, String[] values, public Object[] getObjectValues(BinaryEncoding encoding, String[] values, Column[] orderedMetaData, boolean useVariableDates) { - List list = new ArrayList(values.length); - for (int i = 0; i < values.length; i++) { - String value = values[i]; - Object objectValue = value; - Column column = orderedMetaData.length > i ? orderedMetaData[i] : null; - try { - if (column != null) { - int type = column.getMappedTypeCode(); - if ((value == null || (getDdlBuilder().getDatabaseInfo().isEmptyStringNulled() && value - .equals(""))) && column.isRequired() && column.isOfTextType()) { - objectValue = REQUIRED_FIELD_NULL_SUBSTITUTE; - } - if (value != null) { - if (type == Types.DATE || type == Types.TIMESTAMP || type == Types.TIME) { - objectValue = parseDate(type, value, useVariableDates); - } else if (type == Types.CHAR) { - String charValue = value.toString(); - if ((StringUtils.isBlank(charValue) && getDdlBuilder() - .getDatabaseInfo().isBlankCharColumnSpacePadded()) - || (StringUtils.isNotBlank(charValue) && getDdlBuilder() - .getDatabaseInfo().isNonBlankCharColumnSpacePadded())) { - objectValue = StringUtils.rightPad(value.toString(), - column.getSizeAsInt(), ' '); - } - } else if (type == Types.INTEGER || type == Types.SMALLINT - || type == Types.BIT) { - objectValue = Integer.valueOf(value); - } else if (type == Types.NUMERIC || type == Types.DECIMAL - || type == Types.FLOAT || type == Types.DOUBLE) { - // The number will have either one period or one - // comma - // for the decimal point, but we need a period - objectValue = new BigDecimal(value.replace(',', '.')); - } else if (type == Types.BOOLEAN) { - objectValue = value.equals("1") ? Boolean.TRUE : Boolean.FALSE; - } else if (type == Types.BLOB || type == Types.LONGVARBINARY - || type == Types.BINARY || type == Types.VARBINARY || - // SQLServer ntext type - type == -10) { - if (encoding == BinaryEncoding.NONE) { - objectValue = value.getBytes(); - } else if (encoding == BinaryEncoding.BASE64) { - objectValue = Base64.decodeBase64(value.getBytes()); - } else if (encoding == BinaryEncoding.HEX) { - objectValue = Hex.decodeHex(value.toCharArray()); + if (values != null) { + List list = new ArrayList(values.length); + for (int i = 0; i < values.length; i++) { + String value = values[i]; + Object objectValue = value; + Column column = orderedMetaData.length > i ? orderedMetaData[i] : null; + try { + if (column != null) { + int type = column.getMappedTypeCode(); + if ((value == null || (getDdlBuilder().getDatabaseInfo() + .isEmptyStringNulled() && value.equals(""))) + && column.isRequired() + && column.isOfTextType()) { + objectValue = REQUIRED_FIELD_NULL_SUBSTITUTE; + } + if (value != null) { + if (type == Types.DATE || type == Types.TIMESTAMP || type == Types.TIME) { + objectValue = parseDate(type, value, useVariableDates); + } else if (type == Types.CHAR) { + String charValue = value.toString(); + if ((StringUtils.isBlank(charValue) && getDdlBuilder() + .getDatabaseInfo().isBlankCharColumnSpacePadded()) + || (StringUtils.isNotBlank(charValue) && getDdlBuilder() + .getDatabaseInfo() + .isNonBlankCharColumnSpacePadded())) { + objectValue = StringUtils.rightPad(value.toString(), + column.getSizeAsInt(), ' '); + } + } else if (type == Types.INTEGER || type == Types.SMALLINT + || type == Types.BIT) { + objectValue = Integer.valueOf(value); + } else if (type == Types.NUMERIC || type == Types.DECIMAL + || type == Types.FLOAT || type == Types.DOUBLE) { + // The number will have either one period or one + // comma + // for the decimal point, but we need a period + objectValue = new BigDecimal(value.replace(',', '.')); + } else if (type == Types.BOOLEAN) { + objectValue = value.equals("1") ? Boolean.TRUE : Boolean.FALSE; + } else if (type == Types.BLOB || type == Types.LONGVARBINARY + || type == Types.BINARY || type == Types.VARBINARY || + // SQLServer ntext type + type == -10) { + if (encoding == BinaryEncoding.NONE) { + objectValue = value.getBytes(); + } else if (encoding == BinaryEncoding.BASE64) { + objectValue = Base64.decodeBase64(value.getBytes()); + } else if (encoding == BinaryEncoding.HEX) { + objectValue = Hex.decodeHex(value.toCharArray()); + } + } else if (type == Types.ARRAY) { + objectValue = createArray(column, value); } - } else if (type == Types.ARRAY) { - objectValue = createArray(column, value); } + if (objectValue instanceof String) { + objectValue = cleanTextForTextBasedColumns((String) objectValue); + } + list.add(objectValue); } - if (objectValue instanceof String) { - objectValue = cleanTextForTextBasedColumns((String) objectValue); - } - list.add(objectValue); + } catch (Exception ex) { + log.error("Could not convert a value of {} for column {} of type {}", + new Object[] { value, column.getName(), column.getMappedType() }); + log.error(ex.getMessage(), ex); + throw new RuntimeException(ex); } - } catch (Exception ex) { - log.error("Could not convert a value of {} for column {} of type {}", new Object[] { - value, column.getName(), column.getMappedType() }); - log.error(ex.getMessage(), ex); - throw new RuntimeException(ex); } - } - return list.toArray(); + return list.toArray(); + } else { + return null; + } } - // TODO: this should be AbstractDdlBuilder.getInsertSql(Table table, Map columnValues, boolean genPlaceholders) - public String[] getStringValues(BinaryEncoding encoding, Column[] metaData, Row row, boolean useVariableDates) { + // TODO: this should be AbstractDdlBuilder.getInsertSql(Table table, + // Map columnValues, boolean genPlaceholders) + public String[] getStringValues(BinaryEncoding encoding, Column[] metaData, Row row, + boolean useVariableDates) { String[] values = new String[metaData.length]; for (int i = 0; i < metaData.length; i++) { Column column = metaData[i]; @@ -407,7 +417,8 @@ public String[] getStringValues(BinaryEncoding encoding, Column[] metaData, Row return values; } - public String replaceSql(String sql, BinaryEncoding encoding, Column[] metaData, Row row, boolean useVariableDates) { + public String replaceSql(String sql, BinaryEncoding encoding, Column[] metaData, Row row, + boolean useVariableDates) { String newSql = sql; String quote = getDatabaseInfo().getValueQuoteToken(); String regex = "\\?"; @@ -419,13 +430,14 @@ public String replaceSql(String sql, BinaryEncoding encoding, Column[] metaData, if (row.get(name) != null) { if (column.isOfTextType()) { try { - String value = row.getString(name); - value = value.replace("\\", "\\\\"); - value = value.replace("$", "\\$"); - value = value.replace("'", "''"); - newSql = newSql.replaceFirst(regex, quote + value + quote); + String value = row.getString(name); + value = value.replace("\\", "\\\\"); + value = value.replace("$", "\\$"); + value = value.replace("'", "''"); + newSql = newSql.replaceFirst(regex, quote + value + quote); } catch (RuntimeException ex) { - log.error("Failed to replace ? in {" + sql + "} with " + name + "=" + row.getString(name)); + log.error("Failed to replace ? in {" + sql + "} with " + name + "=" + + row.getString(name)); throw ex; } } else if (column.isOfNumericType()) { @@ -434,20 +446,25 @@ public String replaceSql(String sql, BinaryEncoding encoding, Column[] metaData, Date date = row.getDateTime(name); if (useVariableDates) { long diff = date.getTime() - System.currentTimeMillis(); - newSql = newSql.replaceFirst(regex, "${curdate" + (diff > 0 ? "+" : "-") + "}"); + newSql = newSql.replaceFirst(regex, "${curdate" + (diff > 0 ? "+" : "-") + + "}"); } else if (type == Types.TIME) { - newSql = newSql.replaceFirst(regex, "ts {" + quote + TIME_FORMATTER.format(date) + quote + "}"); + newSql = newSql.replaceFirst(regex, + "ts {" + quote + TIME_FORMATTER.format(date) + quote + "}"); } else { - newSql = newSql.replaceFirst(regex, "ts {" + quote + TIME_FORMATTER.format(date) + quote + "}"); + newSql = newSql.replaceFirst(regex, + "ts {" + quote + TIME_FORMATTER.format(date) + quote + "}"); } } else if (column.isOfBinaryType()) { byte[] bytes = row.getBytes(name); if (encoding == BinaryEncoding.NONE) { newSql = newSql.replaceFirst(regex, quote + row.getString(name)); } else if (encoding == BinaryEncoding.BASE64) { - newSql = newSql.replaceFirst(regex, quote + new String(Base64.encodeBase64(bytes)) + quote); + newSql = newSql.replaceFirst(regex, + quote + new String(Base64.encodeBase64(bytes)) + quote); } else if (encoding == BinaryEncoding.HEX) { - newSql = newSql.replaceFirst(regex, quote + new String(Hex.encodeHex(bytes)) + quote); + newSql = newSql.replaceFirst(regex, quote + + new String(Hex.encodeHex(bytes)) + quote); } } } @@ -475,11 +492,12 @@ protected Array createArray(Column column, final String value) { protected String cleanTextForTextBasedColumns(String text) { return text; } - + protected java.util.Date parseDate(int type, String value, boolean useVariableDates) { try { - boolean useTimestamp = (type == Types.TIMESTAMP) || (type == Types.DATE && - getDdlBuilder().getDatabaseInfo().isDateOverridesToTimestamp()); + boolean useTimestamp = (type == Types.TIMESTAMP) + || (type == Types.DATE && getDdlBuilder().getDatabaseInfo() + .isDateOverridesToTimestamp()); if (useVariableDates && value.startsWith("${curdate")) { long time = Long.parseLong(value.substring(10, value.length() - 1)); diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java index 39fb72c23a..af43a07b40 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java @@ -97,13 +97,13 @@ public void process(DataContext context) { } } catch (Exception ex) { try { - if (listener != null) { - listener.batchInError(context, ex); - } - } finally { if (dataWriter != null && !endBatchCalled) { dataWriter.end(currentBatch, true); } + } finally { + if (listener != null) { + listener.batchInError(context, ex); + } } rethrow(ex); } finally { diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java index 779f14d568..b58317d6f2 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriter.java @@ -394,6 +394,10 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) { break; } } + + if (lookupKeys == null || lookupKeys.length == 0) { + lookupKeys = targetTable.getColumns(); + } this.currentDmlStatement = platform.createDmlStatement(DmlType.DELETE, targetTable.getCatalog(), targetTable.getSchema(), targetTable.getName(), @@ -441,7 +445,8 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC } } if (changedColumnNameList.size() > 0) { - if (requireNewStatement(DmlType.UPDATE, data, applyChangesOnly, useConflictDetection)) { + if (requireNewStatement(DmlType.UPDATE, data, applyChangesOnly, + useConflictDetection)) { lastApplyChangesOnly = applyChangesOnly; lastUseConflictDetection = useConflictDetection; Column[] lookupKeys = null; @@ -496,6 +501,11 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC break; } } + + if (lookupKeys == null || lookupKeys.length == 0) { + lookupKeys = targetTable.getColumns(); + } + this.currentDmlStatement = platform .createDmlStatement(DmlType.UPDATE, targetTable.getCatalog(), targetTable.getSchema(), targetTable.getName(), lookupKeys, @@ -773,12 +783,10 @@ protected Table lookupTableAtTarget(Table sourceTable) { table = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), sourceTable.getName(), false); if (table != null) { - table = table.copy(); - table.reOrderColumns(sourceTable.getColumns(), + table = table.copyAndFilterColumns(sourceTable.getColumnNames(), + sourceTable.getPrimaryKeyColumnNames(), this.writerSettings.isUsePrimaryKeysFromSource()); - boolean setAllColumnsAsPrimaryKey = table.getPrimaryKeyColumnCount() == 0; - if (StringUtils.isBlank(sourceTable.getCatalog())) { table.setCatalog(null); } @@ -794,10 +802,6 @@ protected Table lookupTableAtTarget(Table sourceTable) { && (typeCode == Types.DATE || typeCode == Types.TIME || typeCode == Types.TIMESTAMP)) { column.setMappedTypeCode(Types.VARCHAR); } - - if (setAllColumnsAsPrimaryKey) { - column.setPrimaryKey(true); - } } } } diff --git a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java index 258fa0775b..b9b0f536ce 100644 --- a/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java +++ b/symmetric/symmetric-jdbc/src/main/java/org/jumpmind/db/sql/JdbcSqlTransaction.java @@ -34,19 +34,27 @@ public class JdbcSqlTransaction implements ISqlTransaction { protected boolean oldAutoCommitValue; - protected List markers = new ArrayList(); - + protected List markers = new ArrayList(); + public JdbcSqlTransaction(JdbcSqlTemplate jdbcSqlTemplate) { + this.jdbcSqlTemplate = jdbcSqlTemplate; + this.init(); + } + + protected void init() { + if (this.connection != null) { + close(); + } try { - this.jdbcSqlTemplate = jdbcSqlTemplate; this.connection = jdbcSqlTemplate.getDataSource().getConnection(); this.oldAutoCommitValue = this.connection.getAutoCommit(); this.connection.setAutoCommit(false); SqlUtils.addSqlTransaction(this); } catch (SQLException ex) { - JdbcSqlTemplate.close(connection); + close(); throw jdbcSqlTemplate.translate(ex); } + } public void setInBatchMode(boolean useBatching) { @@ -74,6 +82,7 @@ public void commit() { public void rollback() { rollback(true); + init(); } protected void rollback(boolean clearMarkers) { @@ -97,7 +106,7 @@ public void close() { } catch (SQLException ex) { // do nothing } - JdbcSqlTemplate.close(connection); + JdbcSqlTemplate.close(connection); connection = null; SqlUtils.removeSqlTransaction(this); } @@ -147,7 +156,7 @@ public T execute(Connection con) throws SQLException { } }); } - + public int execute(final String sql) { return executeCallback(new IConnectionCallback() { public Integer execute(Connection con) throws SQLException { @@ -167,7 +176,7 @@ public Integer execute(Connection con) throws SQLException { } }); - } + } public int prepareAndExecute(final String sql, final Object[] args, final int[] types) { return executeCallback(new IConnectionCallback() {