Skip to content

Commit

Permalink
transform fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 9, 2012
1 parent 660a685 commit 9ae0b75
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 18 deletions.
Expand Up @@ -205,7 +205,7 @@ public String[] getPkData(Table table) {
}

public String[] getPkData(Table table, String key) {
Map<String, String> data = toColumnNameValuePairs(table, key);
Map<String, String> data = toColumnNameValuePairs(table.getPrimaryKeyColumnNames(), key);
String[] keyNames = table.getPrimaryKeyColumnNames();
if (keyNames != null && data.size() > 0) {
String[] keyValues = new String[keyNames.length];
Expand All @@ -218,9 +218,8 @@ public String[] getPkData(Table table, String key) {
}
}

public Map<String, String> toColumnNameValuePairs(Table table, String key) {
public Map<String, String> toColumnNameValuePairs(String[] keyNames, String key) {
String[] values = getParsedData(key);
String[] keyNames = table.getColumnNames();
if (values != null && keyNames != null && values.length >= keyNames.length) {
Map<String, String> map = new LinkedCaseInsensitiveMap<String>(keyNames.length);
for (int i = 0; i < keyNames.length; i++) {
Expand Down
Expand Up @@ -90,13 +90,13 @@ public String transform(IDatabasePlatform platform, DataContext context,

String[] keyNames = data.getKeyNames();
List<Column> columns = new ArrayList<Column>();
List<String> keyNamesList = new ArrayList<String>();
List<String> keyValuesList = new ArrayList<String>();
boolean addedFirstKey = false;
for (int i = 0; i < keyNames.length; i++) {
Column targetCol = table.getColumnWithName(keyNames[i]);
if (targetCol != null) {
columns.add(targetCol);
keyNamesList.add(keyNames[i]);
keyValuesList.add(sourceValues.get(keyNames[i]));
if (addedFirstKey) {
sql.append("and ");
} else {
Expand All @@ -109,11 +109,13 @@ public String transform(IDatabasePlatform platform, DataContext context,
}
}

log.debug("SQL: "+sql);
if (log.isDebugEnabled()) {
log.debug("SQL: "+sql);
}
if (0 < platform.getSqlTemplate().update(
sql.toString(),
platform.getObjectValues(context.getBatch().getBinaryEncoding(),
keyNamesList.toArray(new String[keyNamesList.size()]),
keyValuesList.toArray(new String[keyValuesList.size()]),
columns.toArray(new Column[columns.size()])))) {
throw new IgnoreColumnException();
}
Expand Down
Expand Up @@ -662,9 +662,9 @@ protected String[] getLookupKeyData(CsvData data) {
}

Map<String, String> keyData = data
.toColumnNameValuePairs(targetTable, CsvData.OLD_DATA);
.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.OLD_DATA);
if (keyData == null || keyData.size() == 0) {
keyData = data.toColumnNameValuePairs(targetTable, CsvData.ROW_DATA);
keyData = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA);
}

if (keyData != null && keyData.size() > 0) {
Expand Down
Expand Up @@ -182,7 +182,7 @@ protected boolean isTimestampNewer(Conflict conflictSetting, DatabaseWriter writ
String sql = stmt.getColumnsSql(new Column[] { table.getColumnWithName(columnName) });
Timestamp existingTs = writer.getTransaction().queryForObject(sql, Timestamp.class,
objectValues);
Map<String, String> newData = data.toColumnNameValuePairs(table, CsvData.ROW_DATA);
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
Timestamp loadingTs = Timestamp.valueOf(newData.get(columnName));
return loadingTs.after(existingTs);
}
Expand All @@ -197,7 +197,7 @@ protected boolean isVersionNewer(Conflict conflictSetting, DatabaseWriter writer
String sql = stmt.getColumnsSql(new Column[] { table.getColumnWithName(columnName) });
Long existingVersion = writer.getTransaction()
.queryForObject(sql, Long.class, objectValues);
Map<String, String> newData = data.toColumnNameValuePairs(table, CsvData.ROW_DATA);
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
Long loadingVersion = Long.valueOf(newData.get(columnName));
return loadingVersion > existingVersion;
}
Expand Down
Expand Up @@ -131,15 +131,18 @@ protected boolean isTransformable(DataEventType eventType) {
public void write(CsvData data) {
DataEventType eventType = data.getDataEventType();
if (activeTransforms != null && activeTransforms.size() > 0 && isTransformable(eventType)) {
Map<String, String> sourceValues = data.toColumnNameValuePairs(this.sourceTable,
Map<String, String> sourceValues = data.toColumnNameValuePairs(this.sourceTable.getColumnNames(),
CsvData.ROW_DATA);
Map<String, String> oldSourceValues = data.toColumnNameValuePairs(this.sourceTable,
Map<String, String> oldSourceValues = data.toColumnNameValuePairs(this.sourceTable.getColumnNames(),
CsvData.OLD_DATA);
Map<String, String> sourceKeyValues = oldSourceValues.size() > 0 ? oldSourceValues
: sourceValues;

if (data.contains(CsvData.PK_DATA) && oldSourceValues.size() == 0) {
sourceKeyValues = data.toColumnNameValuePairs(this.sourceTable, CsvData.PK_DATA);
Map<String, String> sourceKeyValues = null;

if (data.contains(CsvData.PK_DATA)) {
sourceKeyValues = data.toColumnNameValuePairs(this.sourceTable.getPrimaryKeyColumnNames(), CsvData.PK_DATA);
} else if (oldSourceValues.size() > 0) {
sourceKeyValues = oldSourceValues;
} else {
sourceKeyValues = sourceValues;
}

if (eventType == DataEventType.DELETE) {
Expand Down

0 comments on commit 9ae0b75

Please sign in to comment.