Skip to content

Commit

Permalink
0002367: TableTransformation operation change support for INSERT/UPDA…
Browse files Browse the repository at this point in the history
…TE-DML (in addition to DELETE)

allow access to COLUMN/OLD_COLUMN values in bsh-script
add the data from EXTERNAL_SELECT of a trigger to the bsh-interpreter as "externalData" variable.
allow evaluation of bsh-script for INSERT *and* UPDATE dml action
TODO: triggerId would be nice too...
TODO: merge deleteAction with updateAction to one single column called "targetDmlAction" where you can use static values or a bsh-script like in updateAction currently.
  • Loading branch information
NiasSt90 committed Aug 25, 2015
1 parent 1282982 commit f5d0525
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 60 deletions.
Expand Up @@ -112,7 +112,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'itemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand Down
Expand Up @@ -26,7 +26,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'update-first', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand All @@ -48,7 +48,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_time, create_time
) values (
'delete-action-update-col', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand Down
Expand Up @@ -22,7 +22,7 @@ ifndef::pro[]
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'extractStoreItemSellingPriceTransform', 'store', 'corp', 'EXTRACT', 'ITEM_SELLING_PRICE',
Expand All @@ -35,7 +35,7 @@ insert into SYM_TRANSFORM_TABLE (
----
insert into SYM_TRANSFORM_TABLE (
transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name,
target_table_name, delete_action, transform_order, column_policy, update_first,
target_table_name, update_action, delete_action, transform_order, column_policy, update_first,
last_update_by, last_update_time, create_time
) values (
'loadCorpItemSellingPriceTransform', 'corp', 'store', 'LOAD', 'ITEM_SELLING_PRICE',
Expand Down
21 changes: 21 additions & 0 deletions symmetric-assemble/src/docbook/configuration.xml
Expand Up @@ -1242,6 +1242,27 @@ column.</listitem>
</itemizedlist>
</listitem>

<listitem>
update_action: When a source operation of Update takes place, there are
three possible ways to handle the transformation at the target. The
options include:
<itemizedlist>
<listitem>NONE - The update results in no target changes.</listitem>

<listitem>DEL_ROW - The update results in a delete of the row
as specified by the pk columns defined in the transformation
configuration.</listitem>

<listitem>UPDATE_COL - The update results in an Update
operation on the target which updates the specific rows and columns
based on the defined transformation.</listitem>

<listitem>BeanShell Script Transform ('bsh'):
script code which returns one of the above items.
you can use COLUMN variables inside the script.</listitem>
</itemizedlist>
</listitem>

<listitem>
delete_action: When a source operation of Delete takes place, there are
three possible ways to handle the transformation at the target. The
Expand Down
Expand Up @@ -42,7 +42,7 @@
import org.jumpmind.symmetric.io.data.transform.ConstantColumnTransform;
import org.jumpmind.symmetric.io.data.transform.CopyColumnTransform;
import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform;
import org.jumpmind.symmetric.io.data.transform.DeleteAction;
import org.jumpmind.symmetric.io.data.transform.TargetDmlAction;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform;
import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform;
Expand Down Expand Up @@ -381,8 +381,8 @@ class TransformTableMapper implements ISqlRowMapper<TransformTableNodeGroupLink>
public TransformTableNodeGroupLink mapRow(Row rs) {
TransformTableNodeGroupLink table = new TransformTableNodeGroupLink();
table.setTransformId(rs.getString("transform_id"));
table.setNodeGroupLink(configurationService.getNodeGroupLinkFor(
rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false));
table.setNodeGroupLink(configurationService
.getNodeGroupLinkFor(rs.getString("source_node_group_id"), rs.getString("target_node_group_id"), false));
table.setSourceCatalogName(rs.getString("source_catalog_name"));
table.setSourceSchemaName(rs.getString("source_schema_name"));
table.setSourceTableName(rs.getString("source_table_name"));
Expand All @@ -401,7 +401,8 @@ public TransformTableNodeGroupLink mapRow(Row rs) {
table.setTransformOrder(rs.getInt("transform_order"));
table.setUpdateFirst(rs.getBoolean("update_first"));
table.setColumnPolicy(ColumnPolicy.valueOf(rs.getString("column_policy")));
table.setDeleteAction(DeleteAction.valueOf(rs.getString("delete_action")));
table.setUpdateActionBeanScript(rs.getString("update_action"));
table.setDeleteAction(TargetDmlAction.valueOf(rs.getString("delete_action")));
table.setCreateTime(rs.getDateTime("create_time"));
table.setLastUpdateBy(rs.getString("last_update_by"));
table.setLastUpdateTime(rs.getDateTime("last_update_time"));
Expand Down
Expand Up @@ -38,7 +38,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" target_table_name, " +
" transform_point, " +
" transform_order, " +
" update_first, delete_action, column_policy, " +
" update_first, update_action, delete_action, column_policy, " +
" last_update_time, last_update_by, create_time " +
" from " +
" $(transform_table) order by transform_order " +
Expand Down Expand Up @@ -81,8 +81,9 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" target_table_name=?, " +
" transform_point=?, " +
" update_first=?, " +
" delete_action=?, " +
" transform_order=?, " +
" update_action=?, " +
" delete_action=?, " +
" transform_order=?, " +
" column_policy=?, " +
" last_update_time=?, " +
" last_update_by=? " +
Expand Down Expand Up @@ -111,7 +112,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" (source_node_group_id, target_node_group_id, source_catalog_name, " +
" source_schema_name, source_table_name, " +
" target_catalog_name, target_schema_name, target_table_name, " +
" transform_point, update_first, delete_action, transform_order, " +
" transform_point, update_first, update_action, delete_action, transform_order, " +
" column_policy, last_update_time, last_update_by, create_time, transform_id) " +
" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) " );

Expand Down
1 change: 1 addition & 0 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -656,6 +656,7 @@
<column name="target_schema_name" type="VARCHAR" size="255" description="Optional name of the schema a target table is in. Only use this if the target table is not in the default schema." />
<column name="target_table_name" type="VARCHAR" size="255" description="The name of the target table." />
<column name="update_first" type="BOOLEANINT" size="1" default="0" description="If true, the target actions are attempted as updates first, regardless of whether the source operation was an insert or an update."/>
<column name="update_action" type="VARCHAR" size="255" required="false" default="UPDATE_COL" description="An action to take upon update of a row. Possible values are: DEL_ROW, UPDATE_COL, or NONE." />
<column name="delete_action" type="VARCHAR" size="10" required="true" description="An action to take upon delete of a row. Possible values are: DEL_ROW, UPDATE_COL, or NONE." />
<column name="transform_order" type="INTEGER" required="true" default="1" description="Specifies the order in which to apply transforms if more than one target operation occurs."/>
<column name="column_policy" type="VARCHAR" size="10" default="SPECIFIED" required="true" description="Specifies whether all columns need to be specified or whether they are implied. Possible values are SPECIFIED or IMPLIED." />
Expand Down
Expand Up @@ -20,8 +20,8 @@
*/
package org.jumpmind.symmetric.io.data.transform;

public enum DeleteAction {
public enum TargetDmlAction {

NONE, UPDATE_COL, DEL_ROW;
NONE, UPDATE_COL, INS_ROW, UPD_ROW, DEL_ROW;

}
Expand Up @@ -20,16 +20,27 @@
*/
package org.jumpmind.symmetric.io.data.transform;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.*;

import bsh.Interpreter;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.*;
import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType;
import org.jumpmind.util.Context;
import org.slf4j.*;

public class TransformTable implements Cloneable {

final String INTERPRETER_KEY = String.format("%s.BshInterpreter", getClass().getName());

protected final Logger log = LoggerFactory.getLogger(getClass());

/*
* Static context object used to maintain objects in memory for reference between BSH transforms.
*/
private static Map<String, Object> bshContext = new HashMap<String, Object>();

protected String transformId;
protected String sourceCatalogName;
protected String sourceSchemaName;
Expand All @@ -40,7 +51,9 @@ public class TransformTable implements Cloneable {
protected TransformPoint transformPoint;
protected List<TransformColumn> transformColumns;
protected List<TransformColumn> primaryKeyColumns;
protected DeleteAction deleteAction = DeleteAction.DEL_ROW;
protected String updateActionBeanScript = null;
protected TargetDmlAction updateAction = TargetDmlAction.UPDATE_COL;
protected TargetDmlAction deleteAction = TargetDmlAction.DEL_ROW;
protected ColumnPolicy columnPolicy = ColumnPolicy.IMPLIED;
protected boolean updateFirst = false;
protected int transformOrder = 0;
Expand Down Expand Up @@ -198,11 +211,79 @@ public void addTransformColumn(TransformColumn column) {
}
}

public void setDeleteAction(DeleteAction deleteAction) {
public void setUpdateActionBeanScript(String updateAction) {
try {
this.updateActionBeanScript = null;
this.updateAction = TargetDmlAction.valueOf(updateAction);
}
catch (IllegalArgumentException e) {
//looks like a bean-shell-script
this.updateActionBeanScript = updateAction;
}
}

public TargetDmlAction evaluateTargetDmlAction(DataContext dataContext, TransformedData transformedData) {
if (updateActionBeanScript != null) {
Interpreter interpreter = getInterpreter(dataContext);
Map<String, String> sourceValues = transformedData.getSourceValues();

try {
interpreter.set("sourceDmlType", transformedData.getSourceDmlType());
interpreter.set("sourceDmlTypeString", transformedData.getSourceDmlType().toString());
interpreter.set("transformedData", transformedData);
CsvData csvData = dataContext.getData();
if (csvData != null) {
interpreter.set("externalData", csvData.getAttribute("externalData"));
}
else {
interpreter.set("externalData", null);
}
for (String columnName : sourceValues.keySet()) {
interpreter.set(columnName.toUpperCase(), sourceValues.get(columnName));
interpreter.set(columnName, sourceValues.get(columnName));
}
if (transformedData.getOldSourceValues() != null) {
for (Map.Entry<String, String> oldColumn : transformedData.getOldSourceValues().entrySet()) {
interpreter.set("OLD_" + oldColumn.getKey(), oldColumn.getValue());
interpreter.set("OLD_" + oldColumn.getKey().toUpperCase(), oldColumn.getValue());
}
}
String transformExpression = updateActionBeanScript;
String methodName = String.format("transform_%d()", Math.abs(transformExpression.hashCode()));
if (dataContext.get(methodName) == null) {
//create BSH-Method if not exists in Context
interpreter.set("context", dataContext);
interpreter.set("bshContext", bshContext);
interpreter.eval(String.format("%s {\n%s\n}", methodName, transformExpression));
dataContext.put(methodName, Boolean.TRUE);
}
//call BSH-Method
Object result = interpreter.eval(methodName);
//evaluate Result of BSH-Script
updateAction = TargetDmlAction.valueOf((String) result);
}
catch (Exception e) {
log.error(e.getMessage(), e);
}
}
return updateAction;
}

protected Interpreter getInterpreter(Context context) {
Interpreter interpreter = (Interpreter) context.get(INTERPRETER_KEY);
if (interpreter == null) {
interpreter = new Interpreter();
context.put(INTERPRETER_KEY, interpreter);
}
return interpreter;
}


public void setDeleteAction(TargetDmlAction deleteAction) {
this.deleteAction = deleteAction;
}

public DeleteAction getDeleteAction() {
public TargetDmlAction getDeleteAction() {
return deleteAction;
}

Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.transform.DeleteAction;
import org.jumpmind.symmetric.io.data.transform.TargetDmlAction;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IgnoreColumnException;
import org.jumpmind.symmetric.io.data.transform.IgnoreRowException;
Expand Down Expand Up @@ -314,27 +314,42 @@ protected boolean perform(DataContext context, TransformedData data,
// perform a transformation if there are columns defined for
// transformation
if (data.getColumnNames().length > 0) {
if (data.getTargetDmlType() != DataEventType.DELETE) {
persistData = true;
} else {
// handle the delete action
DeleteAction deleteAction = transformation.getDeleteAction();
switch (deleteAction) {
TargetDmlAction targetAction = null;
switch (data.getTargetDmlType()) {
case INSERT:
case UPDATE:
targetAction = transformation.evaluateTargetDmlAction(context, data);
break;
case DELETE:
targetAction = transformation.getDeleteAction();
break;
default:
persistData = true;
}
if (targetAction != null) {
// how to handle the update/delete action on target..
switch (targetAction) {
case DEL_ROW:
data.setTargetDmlType(DataEventType.DELETE);
persistData = true;
break;
case UPDATE_COL:
case UPD_ROW:
data.setTargetDmlType(DataEventType.UPDATE);
persistData = true;
break;
case INS_ROW:
data.setTargetDmlType(DataEventType.INSERT);
persistData = true;
break;
case NONE:
default:
if (log.isDebugEnabled()) {
log.debug(
"The {} transformation is not configured to delete row. Not sending the delete through.",
transformation.getTransformId());
"The {} transformation is not configured to delete row. Not sending the delete through.",
transformation.getTransformId());
}
break;
}
}
}
Expand Down

0 comments on commit f5d0525

Please sign in to comment.