Skip to content

Commit

Permalink
dbcompare fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Mar 4, 2016
1 parent 996f21a commit 6856c3b
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 61 deletions.
Expand Up @@ -101,8 +101,10 @@ protected boolean executeWithOptions(CommandLine line) throws Exception {
}

DbCompareReport report = dbCompare.compare();
for (TableReport tableReport : report.getTableReports()) {
System.out.println(tableReport);
if (report.getTableReports() != null) {
for (TableReport tableReport : report.getTableReports()) {
System.out.println(tableReport);
}
}

return false;
Expand Down
150 changes: 93 additions & 57 deletions symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java
Expand Up @@ -28,6 +28,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseInfo;
Expand Down Expand Up @@ -79,18 +80,26 @@ public DbCompare(ISymmetricEngine sourceEngine, ISymmetricEngine targetEngine) {

public DbCompareReport compare() {
DbCompareReport report = new DbCompareReport();
long start = System.currentTimeMillis();
List<DbCompareTables> tablesToCompare = getTablesToCompare();
for (DbCompareTables tables : tablesToCompare) {
TableReport tableReport = compareTables(tables);
report.addTableReport(tableReport);
long elapsed = System.currentTimeMillis() - start;
log.info("Completed table {}. Elapsed time: {}", tableReport,
DurationFormatUtils.formatDurationWords((elapsed), true, true));
}

long totalTime = System.currentTimeMillis() - start;
log.info("dbcompare complete. Total Time: {}",
DurationFormatUtils.formatDurationWords((totalTime), true, true));

return report;
}

protected TableReport compareTables(DbCompareTables tables) {
String sourceSelect = getComparisonSQL(tables.getSourceTable(), sourceEngine.getDatabasePlatform());
String targetSelect = getComparisonSQL(tables.getTargetTable(), targetEngine.getDatabasePlatform());
String sourceSelect = getSourceComparisonSQL(tables, sourceEngine.getDatabasePlatform());
String targetSelect = getTargetComparisonSQL(tables, targetEngine.getDatabasePlatform());

CountingSqlReadCursor sourceCursor = new CountingSqlReadCursor(sourceEngine.getDatabasePlatform().
getSqlTemplate().queryForCursor(sourceSelect, defaultRowMapper));
Expand All @@ -113,39 +122,42 @@ protected TableReport compareTables(DbCompareTables tables) {
}

counter++;
if ((counter % 10000) == 0) {
log.info("{} rows processed for table {}. Elapsed time {}.",
counter, tables.getSourceTable().getName(), (System.currentTimeMillis()-startTime));
if ((counter % 50000) == 0) {
long elapsed = System.currentTimeMillis() - startTime;
log.info("{} rows processed for table {}. Elapsed time {}. ({} ms.) Current report status {}",
counter, tables.getSourceTable().getName(),
DurationFormatUtils.formatDurationWords((elapsed), true, true), elapsed,
tableReport);
}

DbCompareRow sourceCompareRow = sourceRow != null ?
new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null;
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

// System.out.println("Source: " + sourceCompareRow.getRowPkValues() + " -> " + targetCompareRow.getRowPkValues());
int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}

sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
writeInsert(sourceCompareRow, tables);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

// System.out.println("Source: " + sourceCompareRow.getRowPkValues() + " -> " + targetCompareRow.getRowPkValues());

int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}

sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
writeInsert(sourceCompareRow, tables);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
}

tableReport.setSourceRows(sourceCursor.count);
Expand Down Expand Up @@ -181,7 +193,7 @@ protected void writeInsert(DbCompareRow sourceCompareRow, DbCompareTables tables
if (sqlDiffStream == null) {
return;
}

Table targetTable = tables.getTargetTable();

DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.INSERT,
Expand All @@ -196,11 +208,11 @@ protected void writeInsert(DbCompareRow sourceCompareRow, DbCompareTables tables
if (targetColumn == null) {
continue;
}

row.put(targetColumn.getName(), sourceCompareRow.getRowValues().
get(sourceColumn.getName()));
}

String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, false);

writeStatement(sql);
Expand Down Expand Up @@ -254,7 +266,27 @@ protected int comparePk(DbCompareTables tables, DbCompareRow sourceCompareRow, D
return sourceCompareRow.comparePks(tables, targetCompareRow);
}

protected String getComparisonSQL(Table table, IDatabasePlatform platform) {
protected String getSourceComparisonSQL(DbCompareTables tables, IDatabasePlatform platform) {
return getComparisonSQL(tables.getSourceTable(),
tables.getSourceTable().getPrimaryKeyColumns(), platform);
}

protected String getTargetComparisonSQL(DbCompareTables tables, IDatabasePlatform platform) {
List<Column> mappedPkColumns = new ArrayList<Column>();

for (Column sourcePkColumn : tables.getSourceTable().getPrimaryKeyColumns()) {
Column targetColumn = tables.getColumnMapping().get(sourcePkColumn);
if (targetColumn == null) {
log.warn("No target column mapped to source PK column {}. Dbcompare may be inaccurate for this table.", sourcePkColumn);
} else {
mappedPkColumns.add(targetColumn);
}
}

return getComparisonSQL(tables.getTargetTable(), mappedPkColumns.toArray(new Column[0]), platform);
}

protected String getComparisonSQL(Table table, Column[] sortByColumns, IDatabasePlatform platform) {
DmlStatement statement = platform.createDmlStatement(DmlType.SELECT,
table.getCatalog(), table.getSchema(), table.getName(),
null, table.getColumns(),
Expand All @@ -263,24 +295,18 @@ protected String getComparisonSQL(Table table, IDatabasePlatform platform) {
StringBuilder sql = new StringBuilder(statement.getSql());
sql.append("1=1 ");

sql.append(buildOrderBy(table, platform));
sql.append(buildOrderBy(table, sortByColumns, platform));
log.info("Comparison SQL: {}", sql);
return sql.toString();
}

protected String buildOrderBy(Table table, IDatabasePlatform platform) {
protected String buildOrderBy(Table table, Column[] sortByColumns, IDatabasePlatform platform) {
DatabaseInfo databaseInfo = platform.getDatabaseInfo();
String quote = databaseInfo.getDelimiterToken() == null ? "" : databaseInfo.getDelimiterToken();
StringBuilder orderByClause = new StringBuilder("ORDER BY ");
for (Column pkColumn : table.getPrimaryKeyColumns()) {
String columnName = new StringBuilder(quote).append(pkColumn.getName()).append(quote).toString();
if (platform.getName().startsWith("db2") && pkColumn.isOfTextType() ) {
orderByClause.append("TRANSLATE ")
.append("(").append(columnName).append(", 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789',")
.append("'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz')");
} else {
orderByClause.append(columnName);
}
for (Column sortByColumn : sortByColumns) {
String columnName = new StringBuilder(quote).append(sortByColumn.getName()).append(quote).toString();
orderByClause.append(columnName);
orderByClause.append(",");
}
orderByClause.setLength(orderByClause.length()-1);
Expand Down Expand Up @@ -315,12 +341,21 @@ protected List<DbCompareTables> loadTablesFromConfig() {
}

protected List<DbCompareTables> loadTables(List<String> tableNames) {
List<DbCompareTables> tablesFromConfig = new ArrayList<DbCompareTables>();


List<DbCompareTables> tablesFromConfig = new ArrayList<DbCompareTables>(1);

List<String> filteredTablesNames = filterTables(tableNames);

for (String tableName : filteredTablesNames) {
Table sourceTable = sourceEngine.getDatabasePlatform().getTableFromCache(tableName, true);
Table sourceTable = null;
Map<String, String> tableNameParts = sourceEngine.getDatabasePlatform().parseQualifiedTableName(tableName);
if (tableNameParts.size() == 1) {
sourceTable = sourceEngine.getDatabasePlatform().getTableFromCache(tableName, true);
} else {
sourceTable = sourceEngine.getDatabasePlatform().
getTableFromCache(tableNameParts.get("catalog"), tableNameParts.get("schema"), tableNameParts.get("table"), true);
}

if (sourceTable == null) {
log.warn("No source table found for table name {}", tableName);
continue;
Expand All @@ -336,10 +371,11 @@ protected List<DbCompareTables> loadTables(List<String> tableNames) {
if (targetTable == null) {
log.warn("No target table found for table {}", tableName);
continue;
} else if (targetTable.getPrimaryKeyColumnCount() == 0) {
log.warn("Target table {} doesn't have any primary key columns and will not be considered in the comparison.", targetTable);
continue;
}
}
// else if (targetTable.getPrimaryKeyColumnCount() == 0) {
// log.warn("Target table {} doesn't have any primary key columns and will not be considered in the comparison.", targetTable);
// continue;
// }

tables.applyColumnMappings();
tablesFromConfig.add(tables);
Expand Down Expand Up @@ -425,10 +461,10 @@ protected List<String> filterTables(List<String> tables) {
List<String> excludedTables = new ArrayList<String>(filteredTables);

for (String excludedTableName : excludedTableNames) {
for (String tableName : filteredTables) {
if (StringUtils.equalsIgnoreCase(tableName.trim(), excludedTableName.trim())) {
excludedTables.remove(tableName);
}
for (String tableName : filteredTables) {
if (StringUtils.equalsIgnoreCase(tableName.trim(), excludedTableName.trim())) {
excludedTables.remove(tableName);
}
}
}
return excludedTables;
Expand Down
Expand Up @@ -50,6 +50,9 @@ public int comparePks(DbCompareTables tables, DbCompareRow targetRow) {
for (Column sourcePkColumn : table.getPrimaryKeyColumns()) {
Column targetPkColumn = tables.getColumnMapping().get(sourcePkColumn);

if (targetPkColumn == null) {
return 0;
}
int result = dbValueComparator.compareValues(sourcePkColumn, targetPkColumn,
rowValues.get(sourcePkColumn.getName()), targetRow.getRowValues().get(targetPkColumn.getName()));

Expand Down
Expand Up @@ -21,10 +21,13 @@
package org.jumpmind.symmetric.io;

import java.math.BigDecimal;
import java.sql.Types;
import java.util.Calendar;
import java.util.Date;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang.time.DateUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
import org.jumpmind.symmetric.ISymmetricEngine;
Expand Down Expand Up @@ -110,10 +113,17 @@ public int compareDateTime(Column sourceColumn, Column targetColumn, String sour
if (sourceValue == null || targetValue == null) {
return compareDefault(sourceColumn, targetColumn, sourceValue, targetValue);
}

Date sourceDate = sourceEngine.getDatabasePlatform().parseDate(sourceColumn.getJdbcTypeCode(), sourceValue, false);
Date targetDate = targetEngine.getDatabasePlatform().parseDate(targetColumn.getJdbcTypeCode(), targetValue, false);

// if either column is a simple date, clear the time for comparison purposes.
if (sourceColumn.getJdbcTypeCode() == Types.DATE
|| targetColumn.getJdbcTypeCode() == Types.DATE) {
sourceDate = DateUtils.truncate(sourceDate, Calendar.DATE);
targetDate = DateUtils.truncate(targetDate, Calendar.DATE);
}

return compareDefault(sourceColumn, targetColumn, sourceDate, targetDate);
}

Expand Down
Expand Up @@ -34,10 +34,12 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;

import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Base64;
Expand Down Expand Up @@ -585,6 +587,44 @@ public java.util.Date parseDate(int type, String value, boolean useVariableDates
}
}

@Override
public Map<String, String> parseQualifiedTableName(String tableName) {

Map<String, String> tableNameParts = new LinkedHashMap<String, String>();
if (StringUtils.isEmpty(tableName)) {
return tableNameParts;
}

String[] initialSplit = tableName.split(Pattern.quote(getDatabaseInfo().getCatalogSeparator()));
if (initialSplit.length == 0) {
initialSplit = new String[] {tableName};
}
List<String> nameComponents = new ArrayList<String>();
for (String part : initialSplit) {
String[] subParts = part.split(Pattern.quote(getDatabaseInfo().getSchemaSeparator()));
if (subParts.length == 0) {
subParts = new String[] {part};
}
for (String subPart : subParts) {
if (!StringUtils.isEmpty(subPart)) {
nameComponents.add(subPart);
}
}
}

if (nameComponents.size() >= 3) {
tableNameParts.put("catalog", nameComponents.get(0));
tableNameParts.put("schema", nameComponents.get(1));
tableNameParts.put("table", nameComponents.get(2));
} else if (nameComponents.size() == 2) {
tableNameParts.put("schema", nameComponents.get(0));
tableNameParts.put("table", nameComponents.get(1));
} else {
tableNameParts.put("table", nameComponents.get(0));
}

return tableNameParts;
}

public Table makeAllColumnsPrimaryKeys(Table table) {
Table result = table.copy();
Expand Down
Expand Up @@ -170,6 +170,8 @@ public Object[] getObjectValues(BinaryEncoding encoding, String[] values,
public boolean isMetadataIgnoreCase();

public java.util.Date parseDate(int type, String value, boolean useVariableDates);

public Map<String, String> parseQualifiedTableName(String tableName);

public Table makeAllColumnsPrimaryKeys(Table table);

Expand Down
Expand Up @@ -129,6 +129,21 @@ public String buildDynamicSql(BinaryEncoding encoding, Row row,

newSql = newSql.replace(QUESTION_MARK, "?");
return newSql + databaseInfo.getSqlCommandDelimiter();
}
}

protected void appendColumnNameForSql(StringBuilder sql, Column column, boolean select) {
String columnName = column.getName();

if (select && column.isPrimaryKey() && column.isOfTextType()) {
// CAST to ASCII to support standard ORDER BY ordering.
// CAST("VALD_VAL" AS VARCHAR(4096)CCSID ASCII) AS "VALD_VAL"
String quotedColumn = quote+columnName+quote;
sql.append("CAST(").append(quotedColumn).append(" AS VARCHAR(4096)CCSID ASCII) AS ").append(quotedColumn);
} else {
sql.append(quote).append(columnName).append(quote);
}


}

}

0 comments on commit 6856c3b

Please sign in to comment.