From b1f54b0cbd694091750876644b4efce274f72aee Mon Sep 17 00:00:00 2001 From: chenson42 Date: Sat, 5 May 2012 16:55:47 +0000 Subject: [PATCH] Added column policy --- .../service/impl/TransformService.java | 8 +- .../service/impl/TransformServiceSqlMap.java | 9 +- .../src/main/resources/symmetric-schema.xml | 5 +- .../io/data/transform/ColumnPolicy.java | 5 ++ .../io/data/transform/TransformTable.java | 87 ++++++++++++++++++- .../io/data/writer/TransformWriter.java | 17 ++-- .../src/test/resources/db-test.properties | 2 +- 7 files changed, 118 insertions(+), 15 deletions(-) create mode 100644 symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/ColumnPolicy.java diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java index cb6eaeccc3..ce45bf7d5f 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformService.java @@ -11,6 +11,7 @@ import org.jumpmind.db.sql.Row; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.io.data.transform.ColumnPolicy; import org.jumpmind.symmetric.io.data.transform.DeleteAction; import org.jumpmind.symmetric.io.data.transform.TransformColumn; import org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType; @@ -148,7 +149,8 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable) { .getTargetCatalogName(), transformTable.getTargetSchemaName(), transformTable .getTargetTableName(), transformTable.getTransformPoint().toString(), transformTable.isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction().toString(), - transformTable.getTransformOrder(), transformTable.getTransformId()) == 0) { + transformTable.getTransformOrder(), transformTable.getColumnPolicy().toString(), + transformTable.getTransformId()) == 0) { transaction.prepareAndExecute(getSql("insertTransformTableSql"), transformTable .getNodeGroupLink().getSourceNodeGroupId(), transformTable .getNodeGroupLink().getTargetNodeGroupId(), transformTable @@ -157,7 +159,8 @@ public void saveTransformTable(TransformTableNodeGroupLink transformTable) { transformTable.getTargetSchemaName(), transformTable.getTargetTableName(), transformTable.getTransformPoint().toString(), transformTable .isUpdateFirst() ? 1 : 0, transformTable.getDeleteAction().toString(), - transformTable.getTransformOrder(), transformTable.getTransformId()); + transformTable.getTransformOrder(), transformTable.getColumnPolicy().toString(), + transformTable.getTransformId()); } deleteTransformColumns(transaction, transformTable.getTransformId()); List columns = transformTable.getTransformColumns(); @@ -229,6 +232,7 @@ public TransformTableNodeGroupLink mapRow(Row rs) { } table.setTransformOrder(rs.getInt("transform_order")); table.setUpdateFirst(rs.getBoolean("update_first")); + table.setColumnPolicy(ColumnPolicy.valueOf(rs.getString("column_policy"))); table.setDeleteAction(DeleteAction.valueOf(rs.getString("delete_action"))); return table; } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java index 7acd9eea2d..93639193c0 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TransformServiceSqlMap.java @@ -18,7 +18,7 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " target_table_name, " + " transform_point, " + " transform_order, " + -" update_first, delete_action " + +" update_first, delete_action, column_policy " + " from " + " $(transform_table) order by transform_order " + " asc " ); @@ -59,7 +59,8 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " transform_point=?, " + " update_first=?, " + " delete_action=?, " + -" transform_order=? " + +" transform_order=?, " + +" column_policy=? " + " where " + " transform_id=? " ); @@ -84,8 +85,8 @@ public TransformServiceSqlMap(IDatabasePlatform platform, Map re " source_schema_name, source_table_name, " + " target_catalog_name, target_schema_name, target_table_name, " + " transform_point, update_first, delete_action, transform_order, " + -" transform_id) " + -" values(?,?,?,?,?,?,?,?,?,?,?,?,?) " ); +" column_policy, transform_id) " + +" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?) " ); putSql("insertTransformColumnSql" ,"" + "insert into $(transform_column) " + diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml index 0d722ef598..8e18c302d3 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml @@ -412,10 +412,11 @@ - + - + + diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/ColumnPolicy.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/ColumnPolicy.java new file mode 100644 index 0000000000..b32ff37558 --- /dev/null +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/ColumnPolicy.java @@ -0,0 +1,5 @@ +package org.jumpmind.symmetric.io.data.transform; + +public enum ColumnPolicy { + SPECIFIED, IMPLIED +} diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java index 78bc867435..b128ab7752 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/transform/TransformTable.java @@ -2,10 +2,12 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.jumpmind.db.model.Table; -public class TransformTable { +public class TransformTable implements Cloneable { protected String transformId; protected String sourceCatalogName; @@ -18,6 +20,7 @@ public class TransformTable { protected List transformColumns; protected List primaryKeyColumns; protected DeleteAction deleteAction = DeleteAction.NONE; + protected ColumnPolicy columnPolicy = ColumnPolicy.SPECIFIED; protected boolean updateFirst = false; protected int transformOrder = 0; @@ -183,6 +186,14 @@ public boolean isUpdateFirst() { return updateFirst; } + public ColumnPolicy getColumnPolicy() { + return columnPolicy; + } + + public void setColumnPolicy(ColumnPolicy columnPolicy) { + this.columnPolicy = columnPolicy; + } + @Override public int hashCode() { if (transformId != null) { @@ -213,4 +224,78 @@ public String toString() { return super.toString(); } } + + public TransformTable enhanceWithImpliedColumns(Map sourceKeyValues, + Map oldSourceValues, Map sourceValues) { + TransformTable copiedVersion; + try { + copiedVersion = (TransformTable) this.clone(); + if (transformColumns != null) { + copiedVersion.transformColumns = new ArrayList(transformColumns); + } else { + copiedVersion.transformColumns = new ArrayList(); + } + if (primaryKeyColumns != null) { + copiedVersion.primaryKeyColumns = new ArrayList(primaryKeyColumns); + } else { + copiedVersion.primaryKeyColumns = new ArrayList(); + } + + if (columnPolicy == ColumnPolicy.IMPLIED) { + for (String column : sourceKeyValues.keySet()) { + boolean add = true; + if (primaryKeyColumns != null) { + for (TransformColumn xCol : primaryKeyColumns) { + if (StringUtils.isNotBlank(xCol.getSourceColumnName()) + && xCol.getSourceColumnName().equals(column)) { + add = false; + } + if (StringUtils.isNotBlank(xCol.getTargetColumnName()) + && xCol.getTargetColumnName().equals(column)) { + add = false; + } + } + } + + if (add) { + TransformColumn newCol = new TransformColumn(); + newCol.setTransformId(transformId); + newCol.setPk(true); + newCol.setTransformType(CopyColumnTransform.NAME); + newCol.setSourceColumnName(column); + newCol.setTargetColumnName(column); + copiedVersion.primaryKeyColumns.add(newCol); + copiedVersion.transformColumns.add(newCol); + } + } + + for (String column : sourceValues.keySet()) { + boolean add = true; + for (TransformColumn xCol : copiedVersion.transformColumns) { + if (StringUtils.isNotBlank(xCol.getSourceColumnName()) + && xCol.getSourceColumnName().equals(column)) { + add = false; + } + if (StringUtils.isNotBlank(xCol.getTargetColumnName()) + && xCol.getTargetColumnName().equals(column)) { + add = false; + } + } + + if (add) { + TransformColumn newCol = new TransformColumn(); + newCol.setTransformId(transformId); + newCol.setTransformType(CopyColumnTransform.NAME); + newCol.setSourceColumnName(column); + newCol.setTargetColumnName(column); + copiedVersion.transformColumns.add(newCol); + } + } + } + + return copiedVersion; + } catch (CloneNotSupportedException e) { + throw new IllegalStateException(e); + } + } } diff --git a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java index c5cf63cb05..a09d6e98a1 100644 --- a/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java +++ b/symmetric/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/TransformWriter.java @@ -61,7 +61,7 @@ public class TransformWriter implements IDataWriter { public static void addColumnTransform(IColumnTransform columnTransform) { columnTransforms.put(columnTransform.getName(), columnTransform); } - + public static Map> getColumnTransforms() { return columnTransforms; } @@ -77,7 +77,7 @@ public TransformWriter(IDatabasePlatform platform, TransformPoint transformPoint this.transformsBySourceTable = toMap(transforms); this.targetWriter = targetWriter; } - + public void setTargetWriter(IDataWriter targetWriter) { this.targetWriter = targetWriter; } @@ -144,7 +144,7 @@ public void write(CsvData data) { if (eventType == DataEventType.DELETE) { sourceValues = oldSourceValues; - + if (sourceValues.size() == 0) { sourceValues = sourceKeyValues; } @@ -158,7 +158,8 @@ public void write(CsvData data) { } List dataThatHasBeenTransformed = new ArrayList(); - for (TransformTable transformation : activeTransforms) { + for (TransformTable transformation : activeTransforms) { + transformation = transformation.enhanceWithImpliedColumns(sourceKeyValues, oldSourceValues, sourceValues); dataThatHasBeenTransformed.addAll(transform(eventType, context, transformation, sourceKeyValues, oldSourceValues, sourceValues)); } @@ -238,6 +239,11 @@ protected boolean perform(DataContext context, TransformedData data, data.put(transformColumn, value, false); } catch (IgnoreColumnException e) { // Do nothing. We are ignoring the column + if (log.isDebugEnabled()) { + log.debug( + "A transform indicated we should ignore the target column {}", + transformColumn.getTargetColumnName()); + } } } } else { @@ -246,7 +252,8 @@ protected boolean perform(DataContext context, TransformedData data, } } - // perform a transformation if there are columns defined for transformation + // perform a transformation if there are columns defined for + // transformation if (data.getColumnNames().length > 0) { if (data.getTargetDmlType() != DataEventType.DELETE) { if (data.getTargetDmlType() == DataEventType.INSERT diff --git a/symmetric/symmetric-jdbc/src/test/resources/db-test.properties b/symmetric/symmetric-jdbc/src/test/resources/db-test.properties index 0990022fe4..05c6f41f06 100644 --- a/symmetric/symmetric-jdbc/src/test/resources/db-test.properties +++ b/symmetric/symmetric-jdbc/src/test/resources/db-test.properties @@ -1,4 +1,4 @@ -test.root=h2 +test.root=firebird test.client=h2 mysql.db.driver=com.mysql.jdbc.Driver