Skip to content

Commit

Permalink
0001529: Columns in old data are not transformed causing conflict det…
Browse files Browse the repository at this point in the history
…ection
  • Loading branch information
chenson42 committed Jan 30, 2014
1 parent c0efba2 commit a97e174
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 19 deletions.
20 changes: 20 additions & 0 deletions symmetric-io/pom.xml
Expand Up @@ -114,5 +114,25 @@
<artifactId>jeval</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -53,8 +53,9 @@ public class TransformedData implements Cloneable {
protected Map<String, String> sourceValues;

public TransformedData(TransformTable transformation, DataEventType sourceDmlType,
Map<String, String> sourceKeyValues, Map<String, String> oldSourceValues,
Map<String, String> sourceValues) {
Map<String, String> sourceKeyValues, Map<String, String> oldSourceValues,
Map<String, String> sourceValues) {

this.transformation = transformation;
this.targetDmlType = sourceDmlType;
this.sourceDmlType = sourceDmlType;
Expand All @@ -64,30 +65,37 @@ public TransformedData(TransformTable transformation, DataEventType sourceDmlTyp
}

public String getFullyQualifiedTableName() {

return transformation.getFullyQualifiedTargetTableName();
}

public DataEventType getTargetDmlType() {

return targetDmlType;
}

public void setTargetDmlType(DataEventType dmlType) {

this.targetDmlType = dmlType;
}

public String getTableName() {

return transformation.getTargetTableName();
}

public String getCatalogName() {

return transformation.getTargetCatalogName();
}

public String getSchemaName() {

return transformation.getTargetSchemaName();
}

public void put(TransformColumn column, String columnValue, boolean recordAsKey) {

if (recordAsKey) {
if (keysBy == null) {
keysBy = new HashMap<TransformColumn.IncludeOnType, LinkedHashMap<String, String>>(
Expand All @@ -114,6 +122,7 @@ public void put(TransformColumn column, String columnValue, boolean recordAsKey)
protected List<String> retrieve(
Map<TransformColumn.IncludeOnType, LinkedHashMap<String, String>> source,
boolean getColumnNames) {

List<String> list = new ArrayList<String>(source == null ? 0 : source.size());
if (source != null) {
LinkedHashMap<String, String> values = source.get(IncludeOnType.ALL);
Expand Down Expand Up @@ -145,30 +154,36 @@ protected List<String> retrieve(
}

public String[] getKeyNames() {

List<String> list = retrieve(keysBy, true);
return list.toArray(new String[list.size()]);
}

public String[] getKeyValues() {

List<String> list = retrieve(keysBy, false);
return list.toArray(new String[list.size()]);
}

public String[] getColumnNames() {

List<String> list = retrieve(columnsBy, true);
return list.toArray(new String[list.size()]);
}

public String[] getColumnValues() {

List<String> list = retrieve(columnsBy, false);
return list.toArray(new String[list.size()]);
}

public DataEventType getSourceDmlType() {

return sourceDmlType;
}

public TransformedData copy() {

try {
TransformedData clone = (TransformedData) this.clone();
clone.columnsBy = copy(columnsBy);
Expand All @@ -180,19 +195,23 @@ public TransformedData copy() {
}

public TransformTable getTransformation() {

return transformation;
}

public void setGeneratedIdentityNeeded(boolean generatedIdentityNeeded) {

this.generatedIdentityNeeded = generatedIdentityNeeded;
}

public boolean isGeneratedIdentityNeeded() {

return generatedIdentityNeeded;
}

protected Map<TransformColumn.IncludeOnType, LinkedHashMap<String, String>> copy(
Map<TransformColumn.IncludeOnType, LinkedHashMap<String, String>> toCopy) {

Map<TransformColumn.IncludeOnType, LinkedHashMap<String, String>> newMap = new HashMap<TransformColumn.IncludeOnType, LinkedHashMap<String, String>>(
toCopy.size());
for (TransformColumn.IncludeOnType key : toCopy.keySet()) {
Expand All @@ -203,18 +222,22 @@ protected Map<TransformColumn.IncludeOnType, LinkedHashMap<String, String>> copy
}

public Map<String, String> getSourceKeyValues() {

return sourceKeyValues;
}

public Map<String, String> getOldSourceValues() {

return oldSourceValues;
}

public Map<String, String> getSourceValues() {

return sourceValues;
}

public boolean hasSameKeyValues(String[] otherKeyValues) {

String[] keyValues = getKeyValues();
if (otherKeyValues != null) {
if (keyValues != null) {
Expand All @@ -236,6 +259,7 @@ public boolean hasSameKeyValues(String[] otherKeyValues) {
}

public Table buildTargetTable() {

Table table = null;
String[] columnNames = getColumnNames();
String[] keyNames = getKeyNames();
Expand All @@ -258,6 +282,7 @@ public Table buildTargetTable() {
}

public CsvData buildTargetCsvData() {

CsvData data = new CsvData(this.targetDmlType);
data.putParsedData(CsvData.OLD_DATA, getOldColumnValues());
data.putParsedData(CsvData.ROW_DATA, getColumnValues());
Expand All @@ -267,29 +292,100 @@ public CsvData buildTargetCsvData() {
}

public String[] getOldColumnValues() {

List<String> names = retrieve(columnsBy, true);
List<String> keyNames = retrieve(keysBy, true);
List<String> keyValues = retrieve(keysBy, false);
List<String> values = new ArrayList<String>();

for (String name : names) {
if (keyNames.contains(name)) {
/*
* Must always use the transformed value for the key else
* deletes and updates won't work.
*/
values.add(keyValues.get(keyNames.indexOf(name)));
} else {
if (oldSourceValues.containsKey(name)) {
values.add(oldSourceValues.get(name));
} else {
/*
* TODO copy transforms should probably copy the old value
* to the new value. Currently, it will be null.
*/
values.add(null);
}
}
addOldValue(keyNames, keyValues, values, name);
}

return values.toArray(new String[values.size()]);
}

private void addOldValue(List<String> keyNames, List<String> keyValues, List<String> values, String name) {

if (keyNames.contains(name)) {
/*
* Must always use the transformed value for the key else
* deletes and updates won't work.
*/
values.add(keyValues.get(keyNames.indexOf(name)));
return;
}

TransformColumn transformColumn = findTransformColumn(name);

if (null == transformColumn) {
if (oldSourceValues.containsKey(name)) {
values.add(oldSourceValues.get(name));
return;
}

values.add(null);
return;
}

String transformType = transformColumn.getTransformType();

if (CopyColumnTransform.NAME.equals(transformType)) {
values.add(oldSourceValues.get(transformColumn.getSourceColumnName()));
return;
} else if (RemoveColumnTransform.NAME.equalsIgnoreCase(transformType)) {
values.add(null);
return;
} else if (ConstantColumnTransform.NAME.equalsIgnoreCase(transformType)) {
values.add(transformColumn.getTransformExpression());
return;
} else if (SubstrColumnTransform.NAME.equalsIgnoreCase(transformType)) {

String value = oldSourceValues.get(transformColumn.getSourceColumnName());

try {
values.add(new SubstrColumnTransform().transform(
null, // IDatabasePlatform
null, // DataContext
transformColumn,
null, // TransformedData
null, // Map<String, String> sourceValue
value, // String newValue
null // String oldValue
));
} catch (IgnoreColumnException e) {
throw new RuntimeException(e);
} catch (IgnoreRowException e) {
throw new RuntimeException(e);
}

return;
} else if (doesSelectedConflictTypeRequireOldData()) {
throw new UnsupportedOperationException(
"'" + transformType + "' transformation of OLD values hasn't been implemented"
);
}

values.add(null);
}

private boolean doesSelectedConflictTypeRequireOldData() {

// TODO return true if USE_OLD_DATA or USE_CHANGED_DATA
// needed to inject DatabaseWriterSettings and invoke the method Conflict pickConflict(Table table, Batch batch)

// Conflict.DetectConflict type = DatabaseWriterSettings#pickConflict(table, batch).getDetectType();
// return type == Conflict.DetectConflict.USE_OLD_DATA || type == Conflict.DetectConflict.USE_CHANGED_DATA;

return false;
}

private TransformColumn findTransformColumn(String name) {

for (TransformColumn tc : transformation.getTransformColumns())
if (tc.getTargetColumnName().equals(name))
return tc;
return null;
}

}

0 comments on commit a97e174

Please sign in to comment.