Skip to content

Commit

Permalink
0002883: DBCompare overrides SQL output when using a single SQL file
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Oct 28, 2016
1 parent 629f6df commit ba0c70f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 92 deletions.
122 changes: 70 additions & 52 deletions symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.io;

import java.io.Closeable;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -88,13 +89,15 @@ public DbCompare(ISymmetricEngine sourceEngine, ISymmetricEngine targetEngine) {
public DbCompareReport compare() {
dbValueComparator.setNumericScale(numericScale);

OutputStream sqlDiffOutput = getSqlDiffOutputStream();

DbCompareReport report = new DbCompareReport();
long start = System.currentTimeMillis();
List<DbCompareTables> tablesToCompare = getTablesToCompare();
report.printReportHeader(System.out);
for (DbCompareTables tables : tablesToCompare) {
try {
TableReport tableReport = compareTables(tables);
TableReport tableReport = compareTables(tables, sqlDiffOutput);
report.addTableReport(tableReport);
long elapsed = System.currentTimeMillis() - start;
log.info("Completed table {}. Elapsed time: {}", tableReport,
Expand All @@ -115,33 +118,36 @@ public DbCompareReport compare() {
return report;
}

public List<TableReport> compareForList() {
dbValueComparator.setNumericScale(numericScale);
List<TableReport> list = new ArrayList<TableReport>();
long start = System.currentTimeMillis();
List<DbCompareTables> tablesToCompare = getTablesToCompare();

for (DbCompareTables tables : tablesToCompare) {
try {
TableReport tableReport = compareTables(tables);
list.add(tableReport);
long elapsed = System.currentTimeMillis() - start;
log.info("Completed table {}. Elapsed time: {}", tableReport,
DurationFormatUtils.formatDurationWords((elapsed), true, true));
protected OutputStream getSqlDiffOutputStream() {
if (!StringUtils.isEmpty(sqlDiffFileName) && !sqlDiffFileName.contains("%t")) {
try {
return new FirstUseFileOutputStream(sqlDiffFileName);
} catch (Exception e) {
log.error("Exception while comparing " + tables.getSourceTable() +
" to " + tables.getTargetTable(), e);
}
throw new RuntimeException("Failed to open stream to file '" + sqlDiffFileName + "'", e);
}
} else {
return null;
}
}

protected OutputStream getSqlDiffOutputStream(DbCompareTables tables) {
if (!StringUtils.isEmpty(sqlDiffFileName)) {
// allow file per table.
String fileNameFormatted = sqlDiffFileName.replace("%t", "%s");
fileNameFormatted = String.format(fileNameFormatted, tables.getSourceTable().getName());
fileNameFormatted = fileNameFormatted.replaceAll("\"", "").replaceAll("\\]", "").replaceAll("\\[", "");
try {
return new FirstUseFileOutputStream(fileNameFormatted);
} catch (Exception e) {
throw new RuntimeException("Failed to open stream to file '" + fileNameFormatted + "'", e);
}
} else {
return null;
}

long totalTime = System.currentTimeMillis() - start;
log.info("dbcompare complete. Total Time: {}",
DurationFormatUtils.formatDurationWords((totalTime), true, true));

return list;
}


protected TableReport compareTables(DbCompareTables tables) {
protected TableReport compareTables(DbCompareTables tables, OutputStream sqlDiffOutput) {
String sourceSelect = getSourceComparisonSQL(tables, sourceEngine.getDatabasePlatform());
String targetSelect = getTargetComparisonSQL(tables, targetEngine.getDatabasePlatform());

Expand All @@ -159,7 +165,17 @@ protected TableReport compareTables(DbCompareTables tables) {

int counter = 0;
long startTime = System.currentTimeMillis();
DbCompareDiffWriter diffWriter = new DbCompareDiffWriter(targetEngine, tables, sqlDiffFileName);

boolean localStreamCreated = false;

DbCompareDiffWriter diffWriter = null;
OutputStream stream = null;
if (sqlDiffOutput != null) {
diffWriter = new DbCompareDiffWriter(targetEngine, tables, sqlDiffOutput);
} else {
stream = getSqlDiffOutputStream(tables);
diffWriter = new DbCompareDiffWriter(targetEngine, tables, stream);
}

try {
while (true) {
Expand All @@ -178,35 +194,37 @@ protected TableReport compareTables(DbCompareTables tables) {

DbCompareRow sourceCompareRow = sourceRow != null ?
new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null;
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
diffWriter.writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}
sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
diffWriter.writeInsert(sourceCompareRow);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
diffWriter.writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
tableReport.setSourceRows(sourceCursor.count);
tableReport.setTargetRows(targetCursor.count);
DbCompareRow targetCompareRow = targetRow != null ?
new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null;

int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow);
if (comparePk == 0) {
Map<Column, String> deltas = sourceCompareRow.compareTo(tables, targetCompareRow);
if (deltas.isEmpty()) {
tableReport.countMatchedRow();
} else {
diffWriter.writeUpdate(targetCompareRow, deltas);
tableReport.countDifferentRow();
}

sourceRow = sourceCursor.next();
targetRow = targetCursor.next();
} else if (comparePk < 0) {
diffWriter.writeInsert(sourceCompareRow);
tableReport.countMissingRow();
sourceRow = sourceCursor.next();
} else {
diffWriter.writeDelete(targetCompareRow);
tableReport.countExtraRow();
targetRow = targetCursor.next();
}
tableReport.setSourceRows(sourceCursor.count);
tableReport.setTargetRows(targetCursor.count);
}
} finally {
diffWriter.close();
if (stream != null) {
IOUtils.closeQuietly(stream);
}
IOUtils.closeQuietly(sourceCursor);
IOUtils.closeQuietly(targetCursor);
}
Expand Down
Expand Up @@ -20,11 +20,10 @@
*/
package org.jumpmind.symmetric.io;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.DmlStatement;
Expand All @@ -39,20 +38,19 @@ public class DbCompareDiffWriter {

final static Logger log = LoggerFactory.getLogger(DbCompareDiffWriter.class);

public DbCompareDiffWriter(ISymmetricEngine targetEngine, DbCompareTables tables, String fileName) {
public DbCompareDiffWriter(ISymmetricEngine targetEngine, DbCompareTables tables, OutputStream stream) {
super();
this.targetEngine = targetEngine;
this.tables = tables;
this.fileName = getFormattedFileName(fileName);
this.stream = stream;
}

private ISymmetricEngine targetEngine;
private DbCompareTables tables;
private String fileName;
private FileOutputStream stream;
private OutputStream stream;

public void writeDelete(DbCompareRow targetCompareRow) {
stream = initStreamIfNeeded(stream, fileName);
if (stream == null) {
return;
}
Expand All @@ -77,7 +75,6 @@ public void writeDelete(DbCompareRow targetCompareRow) {
}

public void writeInsert(DbCompareRow sourceCompareRow) {
stream = initStreamIfNeeded(stream, fileName);
if (stream == null) {
return;
}
Expand Down Expand Up @@ -107,7 +104,6 @@ public void writeInsert(DbCompareRow sourceCompareRow) {
}

public void writeUpdate(DbCompareRow targetCompareRow, Map<Column, String> deltas) {
stream = initStreamIfNeeded(stream, fileName);
if (stream == null) {
return;
}
Expand Down Expand Up @@ -154,36 +150,4 @@ protected void writeLine(String line) {
throw new RuntimeException("failed to write to stream '" + line + "'", ex);
}
}

protected String getFormattedFileName(String intputFileName) {
if (!StringUtils.isEmpty(intputFileName)) {
// allow file per table.
String fileNameFormatted = intputFileName.replace("%t", "%s");
fileNameFormatted = String.format(fileNameFormatted, tables.getSourceTable().getName());
fileNameFormatted = fileNameFormatted.replaceAll("\"", "").replaceAll("\\]", "").replaceAll("\\[", "");
return fileNameFormatted;
} else {
return null;
}
}

protected FileOutputStream initStreamIfNeeded(FileOutputStream diffStream, String fileName) {
if (diffStream != null) {
return diffStream;
} else if (fileName == null) {
return null;
} else {
if (fileName == null) {
return null;
}
log.info("Writing diffs to {}", fileName);
try {
return new FileOutputStream(fileName);
} catch (Exception e) {
throw new RuntimeException("Failed to open stream to file '" + fileName + "'", e);
}
}
}


}
@@ -0,0 +1,47 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* A FileOutputStream that delays creating the physical file until the first write operation, to
* avoid empty files.
*/
public class FirstUseFileOutputStream extends OutputStream {

private String fileName;
private FileOutputStream fileOutputStream;

public FirstUseFileOutputStream(String fileName) {
this.fileName = fileName;
}

@Override
public void write(int b) throws IOException {
if (fileOutputStream == null) {
fileOutputStream = new FileOutputStream(fileName);
}
fileOutputStream.write(b);
}
}

0 comments on commit ba0c70f

Please sign in to comment.