Navigation Menu

Skip to content

Commit

Permalink
dbcompare: support file per table output.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Mar 7, 2016
1 parent 0ed5394 commit 2695bcd
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 145 deletions.
Expand Up @@ -90,8 +90,7 @@ protected boolean executeWithOptions(CommandLine line) throws Exception {
DbCompare dbCompare = new DbCompare(sourceEngine, targetEngine);

if (line.hasOption(OPTION_OUTPUT_SQL)) {
FileOutputStream fos = new FileOutputStream(line.getOptionValue(OPTION_OUTPUT_SQL));
dbCompare.setSqlDiffStream(fos);
dbCompare.setSqlDiffFileName(line.getOptionValue(OPTION_OUTPUT_SQL));
}
if (line.hasOption(OPTION_EXCLUDE)) {
dbCompare.setExcludedTableNames(Arrays.asList(line.getOptionValue(OPTION_EXCLUDE).split(",")));
Expand Down
200 changes: 58 additions & 142 deletions symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.io;

import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -29,6 +30,7 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.xalan.templates.FuncKey;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseInfo;
Expand Down Expand Up @@ -66,7 +68,7 @@ public Row mapRow(Row row) {
private ISymmetricEngine sourceEngine;
private ISymmetricEngine targetEngine;

private OutputStream sqlDiffStream;
private String sqlDiffFileName;
private List<String> includedTableNames;
private List<String> excludedTableNames;
private boolean useSymmetricConfig = true;
Expand Down Expand Up @@ -112,148 +114,62 @@ protected TableReport compareTables(DbCompareTables tables) {

Row sourceRow = sourceCursor.next();
Row targetRow = targetCursor.next();


int counter = 0;
long startTime = System.currentTimeMillis();
DbCompareDiffWriter diffWriter = new DbCompareDiffWriter(targetEngine, tables, sqlDiffFileName);

while (true) {
if (sourceRow == null && targetRow == null) {
break;
}

counter++;
if ((counter % 50000) == 0) {
long elapsed = System.currentTimeMillis() - startTime;
log.info("{} rows processed for table {}. Elapsed time {}. ({} ms.) Current report status {}",
counter, tables.getSourceTable().getName(),
DurationFormatUtils.formatDurationWords((elapsed), true, true), elapsed,
tableReport);
try {
while (true) {
if (sourceRow == null && targetRow == null) {
break;
}

counter++;
if ((counter % 50000) == 0) {
long elapsed = System.currentTimeMillis() - startTime;
log.info("{} rows processed for table {}. Elapsed time {}. ({} ms.) Current report status {}",
counter, tables.getSourceTable().getName(),
DurationFormatUtils.formatDurationWords((elapsed), true, true), elapsed,
tableReport);
}

DbCompareRow sourceCompareRow = sourceRow != null ?
new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null;
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
diffWriter.writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}

sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
diffWriter.writeInsert(sourceCompareRow);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
diffWriter.writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
tableReport.setSourceRows(sourceCursor.count);
tableReport.setTargetRows(targetCursor.count);
}

DbCompareRow sourceCompareRow = sourceRow != null ?
new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null;
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

// System.out.println("Source: " + sourceCompareRow.getRowPkValues() + " -> " + targetCompareRow.getRowPkValues());

int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}

sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
writeInsert(sourceCompareRow, tables);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
} finally {
diffWriter.close();
}

tableReport.setSourceRows(sourceCursor.count);
tableReport.setTargetRows(targetCursor.count);

return tableReport;
}
protected void writeDelete(DbCompareRow targetCompareRow) {
if (sqlDiffStream == null) {
return;
}

Table table = targetCompareRow.getTable();

DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.DELETE,
table.getCatalog(), table.getSchema(), table.getName(),
table.getPrimaryKeyColumns(), null,
null, null);

Row row = new Row(targetCompareRow.getTable().getPrimaryKeyColumnCount());

for (int i = 0; i < targetCompareRow.getTable().getPrimaryKeyColumnCount(); i++) {
row.put(table.getColumn(i).getName(),
targetCompareRow.getRowValues().get(targetCompareRow.getTable().getColumn(i).getName()));
}

String sql = statement.buildDynamicDeleteSql(BinaryEncoding.HEX, row, false, true);

writeStatement(sql);
}

protected void writeInsert(DbCompareRow sourceCompareRow, DbCompareTables tables) {
if (sqlDiffStream == null) {
return;
}

Table targetTable = tables.getTargetTable();

DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.INSERT,
targetTable.getCatalog(), targetTable.getSchema(), targetTable.getName(),
targetTable.getPrimaryKeyColumns(), targetTable.getColumns(),
null, null);

Row row = new Row(targetTable.getColumnCount());

for (Column sourceColumn : tables.getSourceTable().getColumns()) {
Column targetColumn = tables.getColumnMapping().get(sourceColumn);
if (targetColumn == null) {
continue;
}

row.put(targetColumn.getName(), sourceCompareRow.getRowValues().
get(sourceColumn.getName()));
}

String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, false);

writeStatement(sql);
}

protected void writeUpdate(DbCompareRow targetCompareRow, Map<Column, String> deltas) {
if (sqlDiffStream == null) {
return;
}

Table table = targetCompareRow.getTable();

Column[] changedColumns = deltas.keySet().toArray(new Column[deltas.keySet().size()]);

DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.UPDATE,
table.getCatalog(), table.getSchema(), table.getName(),
table.getPrimaryKeyColumns(), changedColumns,
null, null);

Row row = new Row(changedColumns.length+table.getPrimaryKeyColumnCount());
for (Column changedColumn : deltas.keySet()) {
String value = deltas.get(changedColumn);
row.put(changedColumn.getName(), value);
}
for (String pkColumnName : table.getPrimaryKeyColumnNames()) {
String value = targetCompareRow.getRow().getString(pkColumnName);
row.put(pkColumnName, value);
}
String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, true);

writeStatement(sql);
}

protected void writeStatement(String statement) {
try {
sqlDiffStream.write(statement.getBytes());
sqlDiffStream.write("\r\n".getBytes());
} catch (Exception ex) {
throw new RuntimeException("failed to write to sqlDiffStream.", ex);
}
}

protected int comparePk(DbCompareTables tables, DbCompareRow sourceCompareRow, DbCompareRow targetCompareRow) {
if (sourceCompareRow != null && targetCompareRow == null) {
Expand Down Expand Up @@ -473,14 +389,6 @@ protected List<String> filterTables(List<String> tables) {
return filteredTables;
}

public OutputStream getSqlDiffStream() {
return sqlDiffStream;
}

public void setSqlDiffStream(OutputStream sqlDiffStream) {
this.sqlDiffStream = sqlDiffStream;
}

public List<String> getIncludedTableNames() {
return includedTableNames;
}
Expand Down Expand Up @@ -528,4 +436,12 @@ public void close() {
wrapped.close();
}
}

public String getSqlDiffFileName() {
return sqlDiffFileName;
}

public void setSqlDiffFileName(String sqlDiffFileName) {
this.sqlDiffFileName = sqlDiffFileName;
}
}

0 comments on commit 2695bcd

Please sign in to comment.