Skip to content

Commit

Permalink
Added column policy
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 5, 2012
1 parent 76a1e5d commit b1f54b0
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 15 deletions.
Expand Up @@ -11,6 +11,7 @@
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.transform.ColumnPolicy;
import org.jumpmind.symmetric.io.data.transform.DeleteAction;
import org.jumpmind.symmetric.io.data.transform.TransformColumn;
import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType;
Expand Down Expand Up @@ -148,7 +149,8 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable) {
.getTargetCatalogName(), transformTable.getTargetSchemaName(), transformTable
.getTargetTableName(), transformTable.getTransformPoint().toString(),
transformTable.isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction().toString(),
transformTable.getTransformOrder(), transformTable.getTransformId()) == 0) {
transformTable.getTransformOrder(), transformTable.getColumnPolicy().toString(),
transformTable.getTransformId()) == 0) {
transaction.prepareAndExecute(getSql("insertTransformTableSql"), transformTable
.getNodeGroupLink().getSourceNodeGroupId(), transformTable
.getNodeGroupLink().getTargetNodeGroupId(), transformTable
Expand All @@ -157,7 +159,8 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable) {
transformTable.getTargetSchemaName(), transformTable.getTargetTableName(),
transformTable.getTransformPoint().toString(), transformTable
.isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction().toString(),
transformTable.getTransformOrder(), transformTable.getTransformId());
transformTable.getTransformOrder(), transformTable.getColumnPolicy().toString(),
transformTable.getTransformId());
}
deleteTransformColumns(transaction, transformTable.getTransformId());
List<TransformColumn> columns = transformTable.getTransformColumns();
Expand Down Expand Up @@ -229,6 +232,7 @@ public TransformTableNodeGroupLink mapRow(Row rs) {
}
table.setTransformOrder(rs.getInt("transform_order"));
table.setUpdateFirst(rs.getBoolean("update_first"));
table.setColumnPolicy(ColumnPolicy.valueOf(rs.getString("column_policy")));
table.setDeleteAction(DeleteAction.valueOf(rs.getString("delete_action")));
return table;
}
Expand Down
Expand Up @@ -18,7 +18,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" target_table_name, " +
" transform_point, " +
" transform_order, " +
" update_first, delete_action " +
" update_first, delete_action, column_policy " +
" from " +
" $(transform_table) order by transform_order " +
" asc " );
Expand Down Expand Up @@ -59,7 +59,8 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" transform_point=?, " +
" update_first=?, " +
" delete_action=?, " +
" transform_order=? " +
" transform_order=?, " +
" column_policy=? " +
" where " +
" transform_id=? " );

Expand All @@ -84,8 +85,8 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map<String, String> re
" source_schema_name, source_table_name, " +
" target_catalog_name, target_schema_name, target_table_name, " +
" transform_point, update_first, delete_action, transform_order, " +
" transform_id) " +
" values(?,?,?,?,?,?,?,?,?,?,?,?,?) " );
" column_policy, transform_id) " +
" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?) " );

putSql("insertTransformColumnSql" ,"" +
"insert into $(transform_column) " +
Expand Down
Expand Up @@ -412,10 +412,11 @@
<column name="source_table_name" type="VARCHAR" size="128" required="true" description="The name of the source table that will be transformed." />
<column name="target_catalog_name" type="VARCHAR" size="128" description="Optional name for the catalog a target table is in. Only use this if the target table is not in the default catalog." />
<column name="target_schema_name" type="VARCHAR" size="128" description="Optional name of the schema a target table is in. Only use this if the target table is not in the default schema." />
<column name="target_table_name" type="VARCHAR" size="128" description="Optional name for a target table. Use this if the target table name is different than the source." />
<column name="target_table_name" type="VARCHAR" size="128" description="Optional name for a target table. Use this if the target table name is different than the source." />
<column name="update_first" type="BOOLEANINT" size="1" default="0" description="If true, the target actions are attempted as updates first, regardless of whether the source operation was an insert or an update."/>
<column name="delete_action" type="VARCHAR" size="10" required="true" description="An action to take upon delete of a row." />
<column name="delete_action" type="VARCHAR" size="10" required="true" description="An action to take upon delete of a row. Possible values are: DEL_ROW, UPDATE_COL, or NONE." />
<column name="transform_order" type="INTEGER" required="true" default="1" description="Specifies the order in which to apply transforms if more than one target operation occurs."/>
<column name="column_policy" type="VARCHAR" size="10" default="SPECIFIED" required="true" description="Specifies whether all columns need to be specified or whether they are implied. Possible values are SPECIFIED or IMPLIED." />
</table>

<table name="transform_column" description="Defines the column mappings and optional data transformation for a data loader transformation ">
Expand Down
@@ -0,0 +1,5 @@
package org.jumpmind.symmetric.io.data.transform;

public enum ColumnPolicy {
SPECIFIED, IMPLIED
}
Expand Up @@ -2,10 +2,12 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;

public class TransformTable {
public class TransformTable implements Cloneable {

protected String transformId;
protected String sourceCatalogName;
Expand All @@ -18,6 +20,7 @@ public class TransformTable {
protected List<TransformColumn> transformColumns;
protected List<TransformColumn> primaryKeyColumns;
protected DeleteAction deleteAction = DeleteAction.NONE;
protected ColumnPolicy columnPolicy = ColumnPolicy.SPECIFIED;
protected boolean updateFirst = false;
protected int transformOrder = 0;

Expand Down Expand Up @@ -183,6 +186,14 @@ public boolean isUpdateFirst() {
return updateFirst;
}

public ColumnPolicy getColumnPolicy() {
return columnPolicy;
}

public void setColumnPolicy(ColumnPolicy columnPolicy) {
this.columnPolicy = columnPolicy;
}

@Override
public int hashCode() {
if (transformId != null) {
Expand Down Expand Up @@ -213,4 +224,78 @@ public String toString() {
return super.toString();
}
}

public TransformTable enhanceWithImpliedColumns(Map<String, String> sourceKeyValues,
Map<String, String> oldSourceValues, Map<String, String> sourceValues) {
TransformTable copiedVersion;
try {
copiedVersion = (TransformTable) this.clone();
if (transformColumns != null) {
copiedVersion.transformColumns = new ArrayList<TransformColumn>(transformColumns);
} else {
copiedVersion.transformColumns = new ArrayList<TransformColumn>();
}
if (primaryKeyColumns != null) {
copiedVersion.primaryKeyColumns = new ArrayList<TransformColumn>(primaryKeyColumns);
} else {
copiedVersion.primaryKeyColumns = new ArrayList<TransformColumn>();
}

if (columnPolicy == ColumnPolicy.IMPLIED) {
for (String column : sourceKeyValues.keySet()) {
boolean add = true;
if (primaryKeyColumns != null) {
for (TransformColumn xCol : primaryKeyColumns) {
if (StringUtils.isNotBlank(xCol.getSourceColumnName())
&& xCol.getSourceColumnName().equals(column)) {
add = false;
}
if (StringUtils.isNotBlank(xCol.getTargetColumnName())
&& xCol.getTargetColumnName().equals(column)) {
add = false;
}
}
}

if (add) {
TransformColumn newCol = new TransformColumn();
newCol.setTransformId(transformId);
newCol.setPk(true);
newCol.setTransformType(CopyColumnTransform.NAME);
newCol.setSourceColumnName(column);
newCol.setTargetColumnName(column);
copiedVersion.primaryKeyColumns.add(newCol);
copiedVersion.transformColumns.add(newCol);
}
}

for (String column : sourceValues.keySet()) {
boolean add = true;
for (TransformColumn xCol : copiedVersion.transformColumns) {
if (StringUtils.isNotBlank(xCol.getSourceColumnName())
&& xCol.getSourceColumnName().equals(column)) {
add = false;
}
if (StringUtils.isNotBlank(xCol.getTargetColumnName())
&& xCol.getTargetColumnName().equals(column)) {
add = false;
}
}

if (add) {
TransformColumn newCol = new TransformColumn();
newCol.setTransformId(transformId);
newCol.setTransformType(CopyColumnTransform.NAME);
newCol.setSourceColumnName(column);
newCol.setTargetColumnName(column);
copiedVersion.transformColumns.add(newCol);
}
}
}

return copiedVersion;
} catch (CloneNotSupportedException e) {
throw new IllegalStateException(e);
}
}
}
Expand Up @@ -61,7 +61,7 @@ public class TransformWriter implements IDataWriter {
public static void addColumnTransform(IColumnTransform<?> columnTransform) {
columnTransforms.put(columnTransform.getName(), columnTransform);
}

public static Map<String, IColumnTransform<?>> getColumnTransforms() {
return columnTransforms;
}
Expand All @@ -77,7 +77,7 @@ public TransformWriter(IDatabasePlatform platform, TransformPoint transformPoint
this.transformsBySourceTable = toMap(transforms);
this.targetWriter = targetWriter;
}

public void setTargetWriter(IDataWriter targetWriter) {
this.targetWriter = targetWriter;
}
Expand Down Expand Up @@ -144,7 +144,7 @@ public void write(CsvData data) {

if (eventType == DataEventType.DELETE) {
sourceValues = oldSourceValues;

if (sourceValues.size() == 0) {
sourceValues = sourceKeyValues;
}
Expand All @@ -158,7 +158,8 @@ public void write(CsvData data) {
}

List<TransformedData> dataThatHasBeenTransformed = new ArrayList<TransformedData>();
for (TransformTable transformation : activeTransforms) {
for (TransformTable transformation : activeTransforms) {
transformation = transformation.enhanceWithImpliedColumns(sourceKeyValues, oldSourceValues, sourceValues);
dataThatHasBeenTransformed.addAll(transform(eventType, context, transformation,
sourceKeyValues, oldSourceValues, sourceValues));
}
Expand Down Expand Up @@ -238,6 +239,11 @@ protected boolean perform(DataContext context, TransformedData data,
data.put(transformColumn, value, false);
} catch (IgnoreColumnException e) {
// Do nothing. We are ignoring the column
if (log.isDebugEnabled()) {
log.debug(
"A transform indicated we should ignore the target column {}",
transformColumn.getTargetColumnName());
}
}
}
} else {
Expand All @@ -246,7 +252,8 @@ protected boolean perform(DataContext context, TransformedData data,
}
}

// perform a transformation if there are columns defined for transformation
// perform a transformation if there are columns defined for
// transformation
if (data.getColumnNames().length > 0) {
if (data.getTargetDmlType() != DataEventType.DELETE) {
if (data.getTargetDmlType() == DataEventType.INSERT
Expand Down
@@ -1,4 +1,4 @@
test.root=h2
test.root=firebird
test.client=h2

mysql.db.driver=com.mysql.jdbc.Driver
Expand Down

0 comments on commit b1f54b0

Please sign in to comment.