From 716853ce8fb83d317112ba4c016a9c15d57b1f64 Mon Sep 17 00:00:00 2001 From: Markus Schulz Date: Tue, 25 Aug 2015 10:40:41 +0200 Subject: [PATCH] 0002375: LoadFilter with SQL-Script support currently only SQL-PreparedStatement with COLUMN/OLD_COLUMN variables support --- .../configuration/load-filters/scripts.ad | 18 +- .../load/DynamicDatabaseWriterFilter.java | 2 + .../load/SQLDatabaseWriterFilter.java | 163 ++++++++++++++++++ .../jumpmind/symmetric/model/LoadFilter.java | 2 +- 4 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java diff --git a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad index 84430fb218..b76b023826 100644 --- a/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad +++ b/symmetric-assemble/src/asciidoc/configuration/load-filters/scripts.ad @@ -29,15 +29,15 @@ Handle Error Script:: A script to execute if data cannot be processed. .Variables available within scripts [cols="3,^1,^1,5"] |=== -|Variable|BSH|JAVA|Description - -|engine|X||The Symmetric engine object. -|COLUMN_NAME|X||The source values for the row being inserted, updated or deleted. -|OLD_COLUMN_NAME|X||The old values for the row being inserted, updated or deleted. -|context|X|X|The data context object for the data being inserted, updated or deleted. . -|table|X|X|The table object for the table being inserted, updated or deleted. -|data|X|X|The `CsvData` object for the data change. -|error|X|X|`java.lang.Exception` +|Variable|BSH|SQL|JAVA|Description + +|engine|X|||The Symmetric engine object. +|COLUMN_NAME|X|X||The source values for the row being inserted, updated or deleted. +|OLD_COLUMN_NAME|X|X||The old values for the row being inserted, updated or deleted. +|context|X||X|The data context object for the data being inserted, updated or deleted. . +|table|X||X|The table object for the table being inserted, updated or deleted. +|data|X||X|The `CsvData` object for the data change. +|error|X||X|`java.lang.Exception` |=== diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java index 97684f9e37..bc7e269c0d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DynamicDatabaseWriterFilter.java @@ -77,6 +77,8 @@ public static List getDatabaseWriterFilters(ISymmet databaseWriterFilters.add(new BshDatabaseWriterFilter(engine, entry.getValue())); } else if (entry.getKey().equals(LoadFilterType.JAVA)) { databaseWriterFilters.add(new JavaDatabaseWriterFilter(engine, entry.getValue())); + } else if (entry.getKey().equals(LoadFilterType.SQL)) { + databaseWriterFilters.add(new SQLDatabaseWriterFilter(engine, entry.getValue())); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java new file mode 100644 index 0000000000..33053c41da --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/SQLDatabaseWriterFilter.java @@ -0,0 +1,163 @@ +package org.jumpmind.symmetric.load; + +import bsh.TargetError; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.sql.*; +import org.jumpmind.symmetric.*; +import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.io.data.*; +import org.jumpmind.symmetric.model.*; +import org.jumpmind.util.*; + +import java.util.*; + +import static org.apache.commons.lang.StringUtils.isNotBlank; + +/** + * User: Markus Schulz + * Date: 24.08.15 + * Time: 10:53 + */ +public class SQLDatabaseWriterFilter extends DynamicDatabaseWriterFilter { + + protected static final ISqlRowMapper lookupColumnRowMapper = new ISqlRowMapper() { + @Override + public Boolean mapRow(Row row) { + return Boolean.TRUE.equals(row.values().iterator().next()); + } + }; + + private static final String OLD_ = "OLD_"; + + public SQLDatabaseWriterFilter(ISymmetricEngine engine, Map> loadFilters) { + super(engine, loadFilters); + } + + @Override + protected boolean processLoadFilters(DataContext context, Table table, CsvData data, Exception error, + WriteMethod writeMethod, List loadFiltersForTable) { + + boolean writeRow = true; + LoadFilter currentFilter = null; + List values = null; + try { + LinkedCaseInsensitiveMap namedParams = null; + for (LoadFilter filter : loadFiltersForTable) { + currentFilter = filter; + values = null; + if (filter.isFilterOnDelete() && data.getDataEventType().equals(DataEventType.DELETE) + || filter.isFilterOnInsert() && data.getDataEventType().equals(DataEventType.INSERT) + || filter.isFilterOnUpdate() && data.getDataEventType().equals(DataEventType.UPDATE)) { + String sql = null; + if (writeMethod.equals(WriteMethod.BEFORE_WRITE) && filter.getBeforeWriteScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getBeforeWriteScript()); + } + else if (writeMethod.equals(WriteMethod.AFTER_WRITE) && filter.getAfterWriteScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getAfterWriteScript()); + } + else if (writeMethod.equals(WriteMethod.HANDLE_ERROR) && filter.getHandleErrorScript() != null) { + sql = doTokenReplacementOnSql(context, filter.getHandleErrorScript()); + } + if (sql != null && !sql.trim().isEmpty()) { + if (namedParams == null) { + namedParams = getVariablesMap(table, data); + } + ISqlTransaction transaction = context.findTransaction(); + values = transaction.query(sql, lookupColumnRowMapper, namedParams); + } + + if (values != null && values.size() > 0) { + writeRow = values.get(0); + } + } + } + } + catch (Exception ex) { + processError(currentFilter, table, ex); + } + return writeRow; + } + + private LinkedCaseInsensitiveMap getVariablesMap(Table table, CsvData data) { + LinkedCaseInsensitiveMap namedParams = new LinkedCaseInsensitiveMap(); + if (data != null) { + Map sourceValues = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); + if (sourceValues.size() > 0) { + for (String columnName : sourceValues.keySet()) { + namedParams.put(columnName, sourceValues.get(columnName)); + namedParams.put(columnName.toUpperCase(), sourceValues.get(columnName)); + } + } + else { + Map pkValues = data.toColumnNameValuePairs( + table.getPrimaryKeyColumnNames(), CsvData.PK_DATA); + for (String columnName : pkValues.keySet()) { + namedParams.put(columnName, pkValues.get(columnName)); + namedParams.put(columnName.toUpperCase(), pkValues.get(columnName)); + } + } + + Map oldValues = data.toColumnNameValuePairs(table.getColumnNames(), + CsvData.OLD_DATA); + for (String columnName : oldValues.keySet()) { + namedParams.put(OLD_ + columnName, sourceValues.get(columnName)); + namedParams.put(OLD_ + columnName.toUpperCase(), sourceValues.get(columnName)); + } + } + return namedParams; + } + + @Override + protected void executeScripts(DataContext context, String key, Set scripts, boolean isFailOnError) { + if (scripts != null) { + try { + ISqlTransaction transaction = context.findTransaction(); + for (String script : scripts) { + String sql = doTokenReplacementOnSql(context, script); + transaction.query(sql, lookupColumnRowMapper, null); + } + } + catch (Exception e) { + if (isFailOnError) { + throw (RuntimeException) e; + } + else { + log.error("Failed while executing sql script", e); + } + } + } + } + + protected String doTokenReplacementOnSql(DataContext context, String sql) { + if (isNotBlank(sql)) { + Data csvData = (Data) context.get(Constants.DATA_CONTEXT_CURRENT_CSV_DATA); + + if (csvData != null && csvData.getTriggerHistory() != null) { + sql = FormatUtils + .replaceToken(sql, "sourceCatalogName", csvData.getTriggerHistory().getSourceCatalogName(), true); + } + + if (csvData != null && csvData.getTriggerHistory() != null) { + sql = FormatUtils + .replaceToken(sql, "sourceSchemaName", csvData.getTriggerHistory().getSourceSchemaName(), true); + } + } + return sql; + } + + + protected void processError(LoadFilter currentFilter, Table table, Throwable ex) { + if (ex instanceof TargetError) { + ex = ((TargetError) ex).getTarget(); + } + String formattedMessage = String + .format("Error executing sql script for load filter %s on table %s. The error was: %s", + new Object[]{currentFilter != null ? currentFilter.getLoadFilterId() : "N/A", table.getName(), + ex.getMessage()}); + log.error(formattedMessage); + if (currentFilter.isFailOnError()) { + throw new SymmetricException(formattedMessage, ex); + } + } + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java index 0647c24679..be9940bc41 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/LoadFilter.java @@ -32,7 +32,7 @@ public class LoadFilter implements Serializable { static final Logger logger = LoggerFactory.getLogger(LoadFilter.class); - public enum LoadFilterType { BSH, JAVA }; + public enum LoadFilterType { BSH, JAVA, SQL }; private String loadFilterId;