Skip to content

Commit

Permalink
0003624: Some situations can cause a transformed value to be written to
Browse files Browse the repository at this point in the history
the old column value instead of the new column value.
  • Loading branch information
chenson42 committed Jul 11, 2018
1 parent 708867f commit d8638a5
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 75 deletions.
Expand Up @@ -152,7 +152,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}

if (result instanceof String) {
if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, (String) result);
} else {
return new NewAndOldValue((String) result, null);
Expand Down
Expand Up @@ -108,7 +108,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
column.getTargetColumnName(), column.getTransformId());
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, lookupValue);
} else {
return new NewAndOldValue(lookupValue, null);
Expand Down
Expand Up @@ -60,7 +60,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
value = parameterService.getString(paramName);
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, value);
} else {
return new NewAndOldValue(value, null);
Expand Down
Expand Up @@ -57,7 +57,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, newValue);
} else {
return new NewAndOldValue(newValue, null);
Expand Down
Expand Up @@ -60,7 +60,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}
String value = convertClarionDate(newValue, clarionTimeStr);

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, value);
} else {
return new NewAndOldValue(value, null);
Expand Down
Expand Up @@ -49,7 +49,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
TransformColumn column, TransformedData data, Map<String, String> sourceValues,
String newValue, String oldValue) throws IgnoreColumnException, IgnoreRowException {

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, column.getTransformExpression());
} else {
return new NewAndOldValue(column.getTransformExpression(), null);
Expand Down
Expand Up @@ -35,7 +35,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, value);
} else {
return new NewAndOldValue(value, null);
Expand Down
Expand Up @@ -62,7 +62,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, value);
} else {
return new NewAndOldValue(value, null);
Expand Down
Expand Up @@ -73,7 +73,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
result = result.substring(0, result.length()-2);
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, result);
} else {
return new NewAndOldValue(result, null);
Expand Down
Expand Up @@ -75,7 +75,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,
}
}

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, newValue);
} else {
return new NewAndOldValue(newValue, null);
Expand Down
Expand Up @@ -82,7 +82,7 @@ public NewAndOldValue transform(IDatabasePlatform platform,

String value = getValue(newValue,column.getTransformExpression());

if (data.getSourceDmlType().equals(DataEventType.DELETE)) {
if (data.getTargetDmlType().equals(DataEventType.DELETE)) {
return new NewAndOldValue(null, value);
} else {
return new NewAndOldValue(value, null);
Expand Down
Expand Up @@ -261,107 +261,102 @@ protected boolean perform(DataContext context, TransformedData data,
Map<String, String> oldSourceValues) throws IgnoreRowException {
boolean persistData = false;
try {

TargetDmlAction targetAction = null;
switch (data.getTargetDmlType()) {
case INSERT:
targetAction = TargetDmlAction.INS_ROW;
break;
case UPDATE:
targetAction = transformation.evaluateTargetDmlAction(context, data);
break;
case DELETE:
targetAction = transformation.getDeleteAction();
break;
default:
persistData = true;
}
if (targetAction != null) {
// how to handle the update/delete action on target..
switch (targetAction) {
case DEL_ROW:
data.setTargetDmlType(DataEventType.DELETE);
break;
case UPDATE_COL:
case UPD_ROW:
data.setTargetDmlType(DataEventType.UPDATE);
break;
case INS_ROW:
data.setTargetDmlType(DataEventType.INSERT);

break;
case NONE:
default:
break;
}
}

DataEventType eventType = data.getSourceDmlType();
for (TransformColumn transformColumn : transformation.getTransformColumns()) {
if (!transformColumn.isPk()) {
IncludeOnType includeOn = transformColumn.getIncludeOn();
if (includeOn == IncludeOnType.ALL
|| (includeOn == IncludeOnType.INSERT && eventType == DataEventType.INSERT)
if (includeOn == IncludeOnType.ALL || (includeOn == IncludeOnType.INSERT && eventType == DataEventType.INSERT)
|| (includeOn == IncludeOnType.UPDATE && eventType == DataEventType.UPDATE)
|| (includeOn == IncludeOnType.DELETE && eventType == DataEventType.DELETE)) {
if (StringUtils.isBlank(transformColumn.getSourceColumnName())
|| sourceValues.containsKey(transformColumn.getSourceColumnName())) {
try {
Object value = transformColumn(context, data, transformColumn,
sourceValues, oldSourceValues);
Object value = transformColumn(context, data, transformColumn, sourceValues, oldSourceValues);
if (value instanceof NewAndOldValue) {
data.put(transformColumn,
((NewAndOldValue) value).getNewValue(),
data.put(transformColumn, ((NewAndOldValue) value).getNewValue(),
oldSourceValues != null ? ((NewAndOldValue) value).getOldValue() : null, false);
} else if (value == null || value instanceof String) {
data.put(transformColumn, (String) value, null, false);
} else if (value instanceof List) {
throw new IllegalStateException(String.format("Column transform failed %s.%s. Transforms that multiply rows must be marked as part of the primary key",
transformColumn.getTransformId(), transformColumn.getTargetColumnName()));
} else {
throw new IllegalStateException(String.format("Column transform failed %s.%s. It returned an unexpected type of %s",
transformColumn.getTransformId(), transformColumn.getTargetColumnName(),
value.getClass().getSimpleName()));
throw new IllegalStateException(String.format(
"Column transform failed %s.%s. Transforms that multiply rows must be marked as part of the primary key",
transformColumn.getTransformId(), transformColumn.getTargetColumnName()));
} else {
throw new IllegalStateException(
String.format("Column transform failed %s.%s. It returned an unexpected type of %s",
transformColumn.getTransformId(), transformColumn.getTargetColumnName(),
value.getClass().getSimpleName()));
}
} catch (IgnoreColumnException e) {
// Do nothing. We are ignoring the column
if (log.isDebugEnabled()) {
log.debug(
"A transform indicated we should ignore the target column {}",
log.debug("A transform indicated we should ignore the target column {}",
transformColumn.getTargetColumnName());
}
}
} else {
if (eventType != DataEventType.DELETE) {
log.warn(
"Could not find a source column of {} for the transformation: {}",
transformColumn.getSourceColumnName(),
transformation.getTransformId());
log.warn("Could not find a source column of {} for the transformation: {}",
transformColumn.getSourceColumnName(), transformation.getTransformId());
} else {
log.debug(
"Could not find a source column of {} for the transformation: {}. This is probably because this was a DELETE event and no old data was captured.",
transformColumn.getSourceColumnName(),
transformation.getTransformId());
transformColumn.getSourceColumnName(), transformation.getTransformId());
}
}
}
}
}

// perform a transformation if there are columns defined for
// transformation
if (data.getColumnNames().length > 0) {
TargetDmlAction targetAction = null;
switch (data.getTargetDmlType()) {
case INSERT:
targetAction = TargetDmlAction.INS_ROW;
break;
case UPDATE:
targetAction = transformation.evaluateTargetDmlAction(context, data);
break;
case DELETE:
targetAction = transformation.getDeleteAction();
break;
default:
persistData = true;
}
if (targetAction != null) {
// how to handle the update/delete action on target..
switch (targetAction) {
case DEL_ROW:
data.setTargetDmlType(DataEventType.DELETE);
persistData = true;
break;
case UPDATE_COL:
case UPD_ROW:
data.setTargetDmlType(DataEventType.UPDATE);
persistData = true;
break;
case INS_ROW:
data.setTargetDmlType(DataEventType.INSERT);
persistData = true;
break;
case NONE:
default:
if (log.isDebugEnabled()) {
log.debug(
"The {} transformation is not configured to delete row. Not sending the delete through.",
transformation.getTransformId());
}
break;
}
if (targetAction != null && data.getColumnNames().length > 0 && targetAction != TargetDmlAction.NONE) {
persistData = true;
} else {
if (log.isDebugEnabled()) {
log.debug("The {} transformation is not configured to delete row. Not sending the delete through.",
transformation.getTransformId());
}

}

} catch (IgnoreRowException ex) {
// ignore this row
if (log.isDebugEnabled()) {
log.debug(
"Transform indicated that the target row should be ignored with a target key of: {}",
log.debug("Transform indicated that the target row should be ignored with a target key of: {}",
ArrayUtils.toString(data.getKeyValues()));
}
}
Expand Down

0 comments on commit d8638a5

Please sign in to comment.