From 2695bcd3918e623b490ca149a8feae52cb19d853 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Mon, 7 Mar 2016 17:15:29 -0500 Subject: [PATCH] dbcompare: support file per table output. --- .../jumpmind/symmetric/DbCompareCommand.java | 3 +- .../org/jumpmind/symmetric/io/DbCompare.java | 200 +++++------------- .../symmetric/io/DbCompareDiffWriter.java | 184 ++++++++++++++++ .../jumpmind/symmetric/io/DbCompareRow.java | 1 - 4 files changed, 243 insertions(+), 145 deletions(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/DbCompareCommand.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/DbCompareCommand.java index 9b80452963..3ae36e8d60 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/DbCompareCommand.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/DbCompareCommand.java @@ -90,8 +90,7 @@ protected boolean executeWithOptions(CommandLine line) throws Exception { DbCompare dbCompare = new DbCompare(sourceEngine, targetEngine); if (line.hasOption(OPTION_OUTPUT_SQL)) { - FileOutputStream fos = new FileOutputStream(line.getOptionValue(OPTION_OUTPUT_SQL)); - dbCompare.setSqlDiffStream(fos); + dbCompare.setSqlDiffFileName(line.getOptionValue(OPTION_OUTPUT_SQL)); } if (line.hasOption(OPTION_EXCLUDE)) { dbCompare.setExcludedTableNames(Arrays.asList(line.getOptionValue(OPTION_EXCLUDE).split(","))); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java index 292cdc7696..aa56ad342b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompare.java @@ -20,6 +20,7 @@ */ package org.jumpmind.symmetric.io; +import java.io.FileOutputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -29,6 +30,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DurationFormatUtils; +import org.apache.xalan.templates.FuncKey; import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.DatabaseInfo; @@ -66,7 +68,7 @@ public Row mapRow(Row row) { private ISymmetricEngine sourceEngine; private ISymmetricEngine targetEngine; - private OutputStream sqlDiffStream; + private String sqlDiffFileName; private List includedTableNames; private List excludedTableNames; private boolean useSymmetricConfig = true; @@ -112,148 +114,62 @@ protected TableReport compareTables(DbCompareTables tables) { Row sourceRow = sourceCursor.next(); Row targetRow = targetCursor.next(); + int counter = 0; long startTime = System.currentTimeMillis(); + DbCompareDiffWriter diffWriter = new DbCompareDiffWriter(targetEngine, tables, sqlDiffFileName); - while (true) { - if (sourceRow == null && targetRow == null) { - break; - } - - counter++; - if ((counter % 50000) == 0) { - long elapsed = System.currentTimeMillis() - startTime; - log.info("{} rows processed for table {}. Elapsed time {}. ({} ms.) Current report status {}", - counter, tables.getSourceTable().getName(), - DurationFormatUtils.formatDurationWords((elapsed), true, true), elapsed, - tableReport); + try { + while (true) { + if (sourceRow == null && targetRow == null) { + break; + } + + counter++; + if ((counter % 50000) == 0) { + long elapsed = System.currentTimeMillis() - startTime; + log.info("{} rows processed for table {}. Elapsed time {}. ({} ms.) Current report status {}", + counter, tables.getSourceTable().getName(), + DurationFormatUtils.formatDurationWords((elapsed), true, true), elapsed, + tableReport); + } + + DbCompareRow sourceCompareRow = sourceRow != null ? + new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null; + DbCompareRow targetCompareRow = targetRow != null ? + new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null; + + int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow); + if (comparePk == 0) { + Map deltas = sourceCompareRow.compareTo(tables, targetCompareRow); + if (deltas.isEmpty()) { + tableReport.countMatchedRow(); + } else { + diffWriter.writeUpdate(targetCompareRow, deltas); + tableReport.countDifferentRow(); + } + + sourceRow = sourceCursor.next(); + targetRow = targetCursor.next(); + } else if (comparePk < 0) { + diffWriter.writeInsert(sourceCompareRow); + tableReport.countMissingRow(); + sourceRow = sourceCursor.next(); + } else { + diffWriter.writeDelete(targetCompareRow); + tableReport.countExtraRow(); + targetRow = targetCursor.next(); + } + tableReport.setSourceRows(sourceCursor.count); + tableReport.setTargetRows(targetCursor.count); } - - DbCompareRow sourceCompareRow = sourceRow != null ? - new DbCompareRow(sourceEngine, dbValueComparator, tables.getSourceTable(), sourceRow) : null; - DbCompareRow targetCompareRow = targetRow != null ? - new DbCompareRow(targetEngine, dbValueComparator, tables.getTargetTable(), targetRow) : null; - - // System.out.println("Source: " + sourceCompareRow.getRowPkValues() + " -> " + targetCompareRow.getRowPkValues()); - - int comparePk = comparePk(tables, sourceCompareRow, targetCompareRow); - if (comparePk == 0) { - Map deltas = sourceCompareRow.compareTo(tables, targetCompareRow); - if (deltas.isEmpty()) { - tableReport.countMatchedRow(); - } else { - writeUpdate(targetCompareRow, deltas); - tableReport.countDifferentRow(); - } - - sourceRow = sourceCursor.next(); - targetRow = targetCursor.next(); - } else if (comparePk < 0) { - writeInsert(sourceCompareRow, tables); - tableReport.countMissingRow(); - sourceRow = sourceCursor.next(); - } else { - writeDelete(targetCompareRow); - tableReport.countExtraRow(); - targetRow = targetCursor.next(); - } + } finally { + diffWriter.close(); } - tableReport.setSourceRows(sourceCursor.count); - tableReport.setTargetRows(targetCursor.count); - return tableReport; } - protected void writeDelete(DbCompareRow targetCompareRow) { - if (sqlDiffStream == null) { - return; - } - - Table table = targetCompareRow.getTable(); - - DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.DELETE, - table.getCatalog(), table.getSchema(), table.getName(), - table.getPrimaryKeyColumns(), null, - null, null); - - Row row = new Row(targetCompareRow.getTable().getPrimaryKeyColumnCount()); - - for (int i = 0; i < targetCompareRow.getTable().getPrimaryKeyColumnCount(); i++) { - row.put(table.getColumn(i).getName(), - targetCompareRow.getRowValues().get(targetCompareRow.getTable().getColumn(i).getName())); - } - - String sql = statement.buildDynamicDeleteSql(BinaryEncoding.HEX, row, false, true); - - writeStatement(sql); - } - - protected void writeInsert(DbCompareRow sourceCompareRow, DbCompareTables tables) { - if (sqlDiffStream == null) { - return; - } - - Table targetTable = tables.getTargetTable(); - - DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.INSERT, - targetTable.getCatalog(), targetTable.getSchema(), targetTable.getName(), - targetTable.getPrimaryKeyColumns(), targetTable.getColumns(), - null, null); - - Row row = new Row(targetTable.getColumnCount()); - - for (Column sourceColumn : tables.getSourceTable().getColumns()) { - Column targetColumn = tables.getColumnMapping().get(sourceColumn); - if (targetColumn == null) { - continue; - } - - row.put(targetColumn.getName(), sourceCompareRow.getRowValues(). - get(sourceColumn.getName())); - } - - String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, false); - - writeStatement(sql); - } - - protected void writeUpdate(DbCompareRow targetCompareRow, Map deltas) { - if (sqlDiffStream == null) { - return; - } - - Table table = targetCompareRow.getTable(); - - Column[] changedColumns = deltas.keySet().toArray(new Column[deltas.keySet().size()]); - - DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.UPDATE, - table.getCatalog(), table.getSchema(), table.getName(), - table.getPrimaryKeyColumns(), changedColumns, - null, null); - - Row row = new Row(changedColumns.length+table.getPrimaryKeyColumnCount()); - for (Column changedColumn : deltas.keySet()) { - String value = deltas.get(changedColumn); - row.put(changedColumn.getName(), value); - } - for (String pkColumnName : table.getPrimaryKeyColumnNames()) { - String value = targetCompareRow.getRow().getString(pkColumnName); - row.put(pkColumnName, value); - } - String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, true); - - writeStatement(sql); - } - - protected void writeStatement(String statement) { - try { - sqlDiffStream.write(statement.getBytes()); - sqlDiffStream.write("\r\n".getBytes()); - } catch (Exception ex) { - throw new RuntimeException("failed to write to sqlDiffStream.", ex); - } - } protected int comparePk(DbCompareTables tables, DbCompareRow sourceCompareRow, DbCompareRow targetCompareRow) { if (sourceCompareRow != null && targetCompareRow == null) { @@ -473,14 +389,6 @@ protected List filterTables(List tables) { return filteredTables; } - public OutputStream getSqlDiffStream() { - return sqlDiffStream; - } - - public void setSqlDiffStream(OutputStream sqlDiffStream) { - this.sqlDiffStream = sqlDiffStream; - } - public List getIncludedTableNames() { return includedTableNames; } @@ -528,4 +436,12 @@ public void close() { wrapped.close(); } } + + public String getSqlDiffFileName() { + return sqlDiffFileName; + } + + public void setSqlDiffFileName(String sqlDiffFileName) { + this.sqlDiffFileName = sqlDiffFileName; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java new file mode 100644 index 0000000000..668ae81ef2 --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareDiffWriter.java @@ -0,0 +1,184 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.sql.DmlStatement; +import org.jumpmind.db.sql.DmlStatement.DmlType; +import org.jumpmind.db.sql.Row; +import org.jumpmind.db.util.BinaryEncoding; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DbCompareDiffWriter { + + final static Logger log = LoggerFactory.getLogger(DbCompareDiffWriter.class); + + public DbCompareDiffWriter(ISymmetricEngine targetEngine, DbCompareTables tables, String fileName) { + super(); + this.targetEngine = targetEngine; + this.tables = tables; + this.fileName = getFormattedFileName(fileName); + } + + private ISymmetricEngine targetEngine; + private DbCompareTables tables; + private String fileName; + private FileOutputStream stream; + + public void writeDelete(DbCompareRow targetCompareRow) { + stream = initStreamIfNeeded(stream, fileName); + if (stream == null) { + return; + } + + Table table = targetCompareRow.getTable(); + + DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.DELETE, + table.getCatalog(), table.getSchema(), table.getName(), + table.getPrimaryKeyColumns(), null, + null, null); + + Row row = new Row(targetCompareRow.getTable().getPrimaryKeyColumnCount()); + + for (int i = 0; i < targetCompareRow.getTable().getPrimaryKeyColumnCount(); i++) { + row.put(table.getColumn(i).getName(), + targetCompareRow.getRowValues().get(targetCompareRow.getTable().getColumn(i).getName())); + } + + String sql = statement.buildDynamicDeleteSql(BinaryEncoding.HEX, row, false, true); + + writeLine(sql); + } + + public void writeInsert(DbCompareRow sourceCompareRow) { + stream = initStreamIfNeeded(stream, fileName); + if (stream == null) { + return; + } + + Table targetTable = tables.getTargetTable(); + + DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.INSERT, + targetTable.getCatalog(), targetTable.getSchema(), targetTable.getName(), + targetTable.getPrimaryKeyColumns(), targetTable.getColumns(), + null, null); + + Row row = new Row(targetTable.getColumnCount()); + + for (Column sourceColumn : tables.getSourceTable().getColumns()) { + Column targetColumn = tables.getColumnMapping().get(sourceColumn); + if (targetColumn == null) { + continue; + } + + row.put(targetColumn.getName(), sourceCompareRow.getRowValues(). + get(sourceColumn.getName())); + } + + String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, false); + + writeLine(sql); + } + + public void writeUpdate(DbCompareRow targetCompareRow, Map deltas) { + stream = initStreamIfNeeded(stream, fileName); + if (stream == null) { + return; + } + + Table table = targetCompareRow.getTable(); + + Column[] changedColumns = deltas.keySet().toArray(new Column[deltas.keySet().size()]); + + DmlStatement statement = targetEngine.getDatabasePlatform().createDmlStatement(DmlType.UPDATE, + table.getCatalog(), table.getSchema(), table.getName(), + table.getPrimaryKeyColumns(), changedColumns, + null, null); + + Row row = new Row(changedColumns.length+table.getPrimaryKeyColumnCount()); + for (Column changedColumn : deltas.keySet()) { + String value = deltas.get(changedColumn); + row.put(changedColumn.getName(), value); + } + for (String pkColumnName : table.getPrimaryKeyColumnNames()) { + String value = targetCompareRow.getRow().getString(pkColumnName); + row.put(pkColumnName, value); + } + String sql = statement.buildDynamicSql(BinaryEncoding.HEX, row, false, true); + + writeLine(sql); + } + + public void close() { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + log.debug("CAUGHT EXCEPTION while closing stream", e); + } + stream = null; + } + } + + protected void writeLine(String line) { + try { + stream.write(line.getBytes()); + stream.write("\r\n".getBytes()); + } catch (Exception ex) { + throw new RuntimeException("failed to write to stream '" + line + "'", ex); + } + } + + protected String getFormattedFileName(String intputFileName) { + if (!StringUtils.isEmpty(intputFileName)) { + // allow file per table. + String fileNameFormatted = intputFileName.replace("%t", "%s"); + fileNameFormatted = String.format(fileNameFormatted, tables.getSourceTable().getName()); + fileNameFormatted = fileNameFormatted.replaceAll("\"", "").replaceAll("\\]", "").replaceAll("\\[", ""); + return fileNameFormatted; + } else { + return null; + } + } + + protected FileOutputStream initStreamIfNeeded(FileOutputStream diffStream, String fileName) { + if (diffStream != null) { + return diffStream; + } else { + log.info("Writing diffs to {}", fileName); + try { + return new FileOutputStream(fileName); + } catch (Exception e) { + throw new RuntimeException("Failed to open stream to file '" + fileName + "'", e); + } + } + } + + +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareRow.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareRow.java index 6414c22e63..7128d1de19 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareRow.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DbCompareRow.java @@ -67,7 +67,6 @@ public int comparePks(DbCompareTables tables, DbCompareRow targetRow) { public Map compareTo(DbCompareTables tables, DbCompareRow targetRow) { Map deltas = new LinkedHashMap(); - // TODO maybe should operate on non-pk columns here. for (Column sourceColumn : table.getColumns()) { Column targetColumn = tables.getColumnMapping().get(sourceColumn); if (targetColumn == null) {