diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms.ad b/symmetric-assemble/src/asciidoc/configuration/transforms.ad index b392e9f70b..bb1b0f922f 100644 --- a/symmetric-assemble/src/asciidoc/configuration/transforms.ad +++ b/symmetric-assemble/src/asciidoc/configuration/transforms.ad @@ -112,7 +112,7 @@ ifndef::pro[] ---- insert into SYM_TRANSFORM_TABLE ( transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, - target_table_name, delete_action, transform_order, column_policy, update_first, + target_table_name, update_action, delete_action, transform_order, column_policy, update_first, last_update_by, last_update_time, create_time ) values ( 'itemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE', diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad index b422c152f6..29a93878be 100644 --- a/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad +++ b/symmetric-assemble/src/asciidoc/configuration/transforms/operation-change.ad @@ -26,7 +26,7 @@ ifndef::pro[] ---- insert into SYM_TRANSFORM_TABLE ( transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, - target_table_name, delete_action, transform_order, column_policy, update_first, + target_table_name, update_action, delete_action, transform_order, column_policy, update_first, last_update_time, create_time ) values ( 'update-first', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE', @@ -48,7 +48,7 @@ ifndef::pro[] ---- insert into SYM_TRANSFORM_TABLE ( transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, - target_table_name, delete_action, transform_order, column_policy, update_first, + target_table_name, update_action, delete_action, transform_order, column_policy, update_first, last_update_time, create_time ) values ( 'delete-action-update-col', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE', diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad index 1e73c5fe7c..ab32993e41 100644 --- a/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad +++ b/symmetric-assemble/src/asciidoc/configuration/transforms/virtual-columns.ad @@ -22,7 +22,7 @@ ifndef::pro[] ---- insert into SYM_TRANSFORM_TABLE ( transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, - target_table_name, delete_action, transform_order, column_policy, update_first, + target_table_name, update_action, delete_action, transform_order, column_policy, update_first, last_update_by, last_update_time, create_time ) values ( 'extractStoreItemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE', @@ -35,7 +35,7 @@ insert into SYM_TRANSFORM_TABLE ( ---- insert into SYM_TRANSFORM_TABLE ( transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, - target_table_name, delete_action, transform_order, column_policy, update_first, + target_table_name, update_action, delete_action, transform_order, column_policy, update_first, last_update_by, last_update_time, create_time ) values ( 'loadCorpItemSellingPriceTransform', 'corp', 'store', 'LOAD', 'ITEM_SELLING_PRICE', diff --git a/symmetric-assemble/src/docbook/configuration.xml b/symmetric-assemble/src/docbook/configuration.xml index 2c3ad840ae..4bc3d4b1f3 100644 --- a/symmetric-assemble/src/docbook/configuration.xml +++ b/symmetric-assemble/src/docbook/configuration.xml @@ -1242,6 +1242,27 @@ column. + +update_action: When a source operation of Update takes place, there are +three possible ways to handle the transformation at the target. The +options include: + +NONE - The update results in no target changes. + +DEL_ROW - The update results in a delete of the row +as specified by the pk columns defined in the transformation +configuration. + +UPDATE_COL - The update results in an Update +operation on the target which updates the specific rows and columns +based on the defined transformation. + +BeanShell Script Transform ('bsh'): + script code which returns one of the above items. + you can use COLUMN variables inside the script. + + + delete_action: When a source operation of Delete takes place, there are three possible ways to handle the transformation at the target. The diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java index 24d4f9350a..daa476004c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java @@ -42,7 +42,7 @@ import org.jumpmind.symmetric.io.data.transform.ConstantColumnTransform; import org.jumpmind.symmetric.io.data.transform.CopyColumnTransform; import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform; -import org.jumpmind.symmetric.io.data.transform.DeleteAction; +import org.jumpmind.symmetric.io.data.transform.TargetDmlAction; import org.jumpmind.symmetric.io.data.transform.IColumnTransform; import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform; import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform; @@ -381,8 +381,8 @@ class TransformTableMapper implements ISqlRowMapper public TransformTableNodeGroupLink mapRow(Row rs) { TransformTableNodeGroupLink table = new TransformTableNodeGroupLink(); table.setTransformId(rs.getString("transform_id")); - table.setNodeGroupLink(configurationService.getNodeGroupLinkFor( - rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false)); + table.setNodeGroupLink(configurationService + .getNodeGroupLinkFor(rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false)); table.setSourceCatalogName(rs.getString("source_catalog_name")); table.setSourceSchemaName(rs.getString("source_schema_name")); table.setSourceTableName(rs.getString("source_table_name")); @@ -401,7 +401,8 @@ public TransformTableNodeGroupLink mapRow(Row rs) { table.setTransformOrder(rs.getInt("transform_order")); table.setUpdateFirst(rs.getBoolean("update_first")); table.setColumnPolicy(ColumnPolicy.valueOf(rs.getString("column_policy"))); - table.setDeleteAction(DeleteAction.valueOf(rs.getString("delete_action"))); + table.setUpdateActionBeanScript(rs.getString("update_action")); + table.setDeleteAction(TargetDmlAction.valueOf(rs.getString("delete_action"))); table.setCreateTime(rs.getDateTime("create_time")); table.setLastUpdateBy(rs.getString("last_update_by")); table.setLastUpdateTime(rs.getDateTime("last_update_time")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java index 47ae43ed35..91c374e714 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java @@ -38,7 +38,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " target_table_name, " + " transform_point, " + " transform_order, " + -" update_first, delete_action, column_policy, " + +" update_first, update_action, delete_action, column_policy, " + " last_update_time, last_update_by, create_time " + " from " + " $(transform_table) order by transform_order " + @@ -81,8 +81,9 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " target_table_name=?, " + " transform_point=?, " + " update_first=?, " + -" delete_action=?, " + -" transform_order=?, " + +" update_action=?, " + +" delete_action=?, " + +" transform_order=?, " + " column_policy=?, " + " last_update_time=?, " + " last_update_by=? " + @@ -111,7 +112,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " (source_node_group_id, target_node_group_id, source_catalog_name, " + " source_schema_name, source_table_name, " + " target_catalog_name, target_schema_name, target_table_name, " + -" transform_point, update_first, delete_action, transform_order, " + +" transform_point, update_first, update_action, delete_action, transform_order, " + " column_policy, last_update_time, last_update_by, create_time, transform_id) " + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " ); diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 203328b9ab..37fc093ef2 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -656,6 +656,7 @@ + diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java similarity index 92% rename from symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java rename to symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java index 3e1813cc5d..9c940cf348 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/DeleteAction.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TargetDmlAction.java @@ -20,8 +20,8 @@ */ package org.jumpmind.symmetric.io.data.transform; -public enum DeleteAction { +public enum TargetDmlAction { - NONE, UPDATE_COL, DEL_ROW; + NONE, UPDATE_COL, INS_ROW, UPD_ROW, DEL_ROW; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java index 40eb9f006d..2df3cd92a9 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java @@ -20,16 +20,27 @@ */ package org.jumpmind.symmetric.io.data.transform; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; +import bsh.Interpreter; import org.apache.commons.lang.StringUtils; import org.jumpmind.db.model.Table; +import org.jumpmind.symmetric.io.data.*; import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType; +import org.jumpmind.util.Context; +import org.slf4j.*; public class TransformTable implements Cloneable { + final String INTERPRETER_KEY = String.format("%s.BshInterpreter", getClass().getName()); + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /* + * Static context object used to maintain objects in memory for reference between BSH transforms. + */ + private static Map bshContext = new HashMap(); + protected String transformId; protected String sourceCatalogName; protected String sourceSchemaName; @@ -40,7 +51,9 @@ public class TransformTable implements Cloneable { protected TransformPoint transformPoint; protected List transformColumns; protected List primaryKeyColumns; - protected DeleteAction deleteAction = DeleteAction.DEL_ROW; + protected String updateActionBeanScript = null; + protected TargetDmlAction updateAction = TargetDmlAction.UPDATE_COL; + protected TargetDmlAction deleteAction = TargetDmlAction.DEL_ROW; protected ColumnPolicy columnPolicy = ColumnPolicy.IMPLIED; protected boolean updateFirst = false; protected int transformOrder = 0; @@ -198,11 +211,79 @@ public void addTransformColumn(TransformColumn column) { } } - public void setDeleteAction(DeleteAction deleteAction) { + public void setUpdateActionBeanScript(String updateAction) { + try { + this.updateActionBeanScript = null; + this.updateAction = TargetDmlAction.valueOf(updateAction); + } + catch (IllegalArgumentException e) { + //looks like a bean-shell-script + this.updateActionBeanScript = updateAction; + } + } + + public TargetDmlAction evaluateTargetDmlAction(DataContext dataContext, TransformedData transformedData) { + if (updateActionBeanScript != null) { + Interpreter interpreter = getInterpreter(dataContext); + Map sourceValues = transformedData.getSourceValues(); + + try { + interpreter.set("sourceDmlType", transformedData.getSourceDmlType()); + interpreter.set("sourceDmlTypeString", transformedData.getSourceDmlType().toString()); + interpreter.set("transformedData", transformedData); + CsvData csvData = dataContext.getData(); + if (csvData != null) { + interpreter.set("externalData", csvData.getAttribute("externalData")); + } + else { + interpreter.set("externalData", null); + } + for (String columnName : sourceValues.keySet()) { + interpreter.set(columnName.toUpperCase(), sourceValues.get(columnName)); + interpreter.set(columnName, sourceValues.get(columnName)); + } + if (transformedData.getOldSourceValues() != null) { + for (Map.Entry oldColumn : transformedData.getOldSourceValues().entrySet()) { + interpreter.set("OLD_" + oldColumn.getKey(), oldColumn.getValue()); + interpreter.set("OLD_" + oldColumn.getKey().toUpperCase(), oldColumn.getValue()); + } + } + String transformExpression = updateActionBeanScript; + String methodName = String.format("transform_%d()", Math.abs(transformExpression.hashCode())); + if (dataContext.get(methodName) == null) { + //create BSH-Method if not exists in Context + interpreter.set("context", dataContext); + interpreter.set("bshContext", bshContext); + interpreter.eval(String.format("%s {\n%s\n}", methodName, transformExpression)); + dataContext.put(methodName, Boolean.TRUE); + } + //call BSH-Method + Object result = interpreter.eval(methodName); + //evaluate Result of BSH-Script + updateAction = TargetDmlAction.valueOf((String) result); + } + catch (Exception e) { + log.error(e.getMessage(), e); + } + } + return updateAction; + } + + protected Interpreter getInterpreter(Context context) { + Interpreter interpreter = (Interpreter) context.get(INTERPRETER_KEY); + if (interpreter == null) { + interpreter = new Interpreter(); + context.put(INTERPRETER_KEY, interpreter); + } + return interpreter; + } + + + public void setDeleteAction(TargetDmlAction deleteAction) { this.deleteAction = deleteAction; } - public DeleteAction getDeleteAction() { + public TargetDmlAction getDeleteAction() { return deleteAction; } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java index bfd2b3775f..1d1743e739 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java @@ -35,7 +35,7 @@ import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.data.IDataWriter; -import org.jumpmind.symmetric.io.data.transform.DeleteAction; +import org.jumpmind.symmetric.io.data.transform.TargetDmlAction; import org.jumpmind.symmetric.io.data.transform.IColumnTransform; import org.jumpmind.symmetric.io.data.transform.IgnoreColumnException; import org.jumpmind.symmetric.io.data.transform.IgnoreRowException; @@ -314,27 +314,42 @@ protected boolean perform(DataContext context, TransformedData data, // perform a transformation if there are columns defined for // transformation if (data.getColumnNames().length > 0) { - if (data.getTargetDmlType() != DataEventType.DELETE) { - persistData = true; - } else { - // handle the delete action - DeleteAction deleteAction = transformation.getDeleteAction(); - switch (deleteAction) { + TargetDmlAction targetAction = null; + switch (data.getTargetDmlType()) { + case INSERT: + case UPDATE: + targetAction = transformation.evaluateTargetDmlAction(context, data); + break; + case DELETE: + targetAction = transformation.getDeleteAction(); + break; + default: + persistData = true; + } + if (targetAction != null) { + // how to handle the update/delete action on target.. + switch (targetAction) { case DEL_ROW: data.setTargetDmlType(DataEventType.DELETE); persistData = true; break; case UPDATE_COL: + case UPD_ROW: data.setTargetDmlType(DataEventType.UPDATE); persistData = true; break; + case INS_ROW: + data.setTargetDmlType(DataEventType.INSERT); + persistData = true; + break; case NONE: default: if (log.isDebugEnabled()) { log.debug( - "The {} transformation is not configured to delete row. Not sending the delete through.", - transformation.getTransformId()); + "The {} transformation is not configured to delete row. Not sending the delete through.", + transformation.getTransformId()); } + break; } } } diff --git a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java index 1af9c82d5a..2b06162485 100644 --- a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java +++ b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/TransformWriterTest.java @@ -28,28 +28,8 @@ import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.symmetric.io.AbstractWriterTest; -import org.jumpmind.symmetric.io.data.CsvData; -import org.jumpmind.symmetric.io.data.DataEventType; -import org.jumpmind.symmetric.io.data.transform.AdditiveColumnTransform; -import org.jumpmind.symmetric.io.data.transform.BinaryLeftColumnTransform; -import org.jumpmind.symmetric.io.data.transform.ClarionDateTimeColumnTransform; -import org.jumpmind.symmetric.io.data.transform.ColumnsToRowsKeyColumnTransform; -import org.jumpmind.symmetric.io.data.transform.ColumnsToRowsValueColumnTransform; -import org.jumpmind.symmetric.io.data.transform.ConstantColumnTransform; -import org.jumpmind.symmetric.io.data.transform.CopyColumnTransform; -import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform; -import org.jumpmind.symmetric.io.data.transform.IColumnTransform; -import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform; -import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform; -import org.jumpmind.symmetric.io.data.transform.LeftColumnTransform; -import org.jumpmind.symmetric.io.data.transform.MathColumnTransform; -import org.jumpmind.symmetric.io.data.transform.MultiplierColumnTransform; -import org.jumpmind.symmetric.io.data.transform.RemoveColumnTransform; -import org.jumpmind.symmetric.io.data.transform.SubstrColumnTransform; -import org.jumpmind.symmetric.io.data.transform.TransformColumn; -import org.jumpmind.symmetric.io.data.transform.TransformPoint; -import org.jumpmind.symmetric.io.data.transform.TransformTable; -import org.jumpmind.symmetric.io.data.transform.ValueMapColumnTransform; +import org.jumpmind.symmetric.io.data.*; +import org.jumpmind.symmetric.io.data.transform.*; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -81,8 +61,8 @@ public void testNoTransform() { public void testTableNameChange() { mockWriter.reset(); Table table = new Table("s1", new Column("id")); - writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, - new String[] { "66" }), new CsvData(DataEventType.INSERT, new String[] { "77" }))); + writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, new String[]{"66"}), + new CsvData(DataEventType.INSERT, new String[]{"77"}))); List datas = mockWriter.writtenDatas.get(table.getFullyQualifiedTableName()); Assert.assertNull(datas); datas = mockWriter.writtenDatas.get("t1"); @@ -95,8 +75,8 @@ public void testTableNameChange() { public void testAddColumn() { mockWriter.reset(); Table table = new Table("s2", new Column("id")); - writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, - new String[] { "2" }), new CsvData(DataEventType.INSERT, new String[] { "1" }))); + writeData(getTransformWriter(), new TableCsvData(table, new CsvData(DataEventType.INSERT, new String[]{"2"}), + new CsvData(DataEventType.INSERT, new String[]{"1"}))); List datas = mockWriter.writtenDatas.get(table.getFullyQualifiedTableName()); Assert.assertNull(datas); datas = mockWriter.writtenDatas.get("t2"); @@ -108,6 +88,25 @@ public void testAddColumn() { } + @Test + public void testUpdateActionBeanShellScript() throws Exception { + mockWriter.reset(); + Table table = new Table("s3", new Column("id")); + writeData(getTransformWriter(), new TableCsvData(table, + new CsvData(DataEventType.UPDATE, new String[]{"1"}), + new CsvData(DataEventType.UPDATE, new String[]{"2"}), + new CsvData(DataEventType.UPDATE, new String[]{"3"}), + new CsvData(DataEventType.UPDATE, new String[]{"4"}), + new CsvData(DataEventType.UPDATE, new String[]{"5"}))); + List datas = mockWriter.writtenDatas.get("t3"); + Assert.assertEquals(4, datas.size()); + Assert.assertEquals(DataEventType.INSERT, datas.get(0).getDataEventType()); + Assert.assertEquals(DataEventType.DELETE, datas.get(1).getDataEventType()); + Assert.assertEquals(DataEventType.UPDATE, datas.get(2).getDataEventType()); + Assert.assertEquals(DataEventType.UPDATE, datas.get(3).getDataEventType()); + } + + @Test public void testSimpleTableBeanShellMapping() throws Exception { } @@ -125,11 +124,17 @@ public void testIgnoreRowExceptionFromBshMapping() throws Exception { } protected TransformWriter getTransformWriter() { + TransformTable transformTable3 = + new TransformTable("s3", "t3", TransformPoint.LOAD, new TransformColumn("id", "id", true)); + transformTable3.setUpdateActionBeanScript("switch (id) { case \"1\": return \"INS_ROW\"; case \"2\": " + + "return \"DEL_ROW\"; case \"3\": return \"UPD_ROW\"; case \"4\": return \"NONE\"; case \"5\": " + + "return \"UPDATE_COL\"; }"); return new TransformWriter(platform, TransformPoint.LOAD, mockWriter, buildDefaultColumnTransforms(), new TransformTable[] { - new TransformTable("s1", "t1", TransformPoint.LOAD, new TransformColumn("id", "id", - true)), - new TransformTable("s2", "t2", TransformPoint.LOAD, new TransformColumn("id", "id", - true), new TransformColumn(null, "col2", false, "const", "added")) }); + new TransformTable("s1", "t1", TransformPoint.LOAD, new TransformColumn("id", "id", true)), + new TransformTable("s2", "t2", TransformPoint.LOAD, new TransformColumn("id", "id", true), + new TransformColumn(null, "col2", false, "const", "added")), + transformTable3 + }); } public static Map> buildDefaultColumnTransforms() {