diff --git a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java index 5ec38c6399..05e59619ec 100644 --- a/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java +++ b/symmetric-client/src/test/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriterTest.java @@ -1,8 +1,11 @@ package org.jumpmind.symmetric.io.data.writer; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.ArrayUtils; import org.jumpmind.db.DbTestUtils; +import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.mssql.MsSql2000DatabasePlatform; @@ -10,10 +13,12 @@ import org.jumpmind.db.platform.mssql.MsSql2008DatabasePlatform; import org.jumpmind.db.util.BasicDataSourcePropertyConstants; import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.io.stage.StagingManager; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Test; import org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor; public class MsSqlBulkDatabaseWriterTest extends AbstractBulkDatabaseWriterTest { @@ -47,4 +52,25 @@ protected long writeData(List data) { return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null), new TableCsvData(table, data)); } + @Test + public void testInsertReorderColumns() throws Exception { + if (shouldTestRun(platform)) { + String id = getNextId(); + String[] values = { "string with space in it", "string-with-no-space", "string with space in it", + "string-with-no-space", "2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663", + encode("string with space in it"), id }; + List data = new ArrayList(); + data.add(new CsvData(DataEventType.INSERT, (String[]) ArrayUtils.clone(values))); + Table table = (Table) platform.getTableFromCache(getTestTable(), false).clone(); + Column firstColumn = table.getColumn(0); + table.removeColumn(firstColumn); + table.addColumn(firstColumn); + writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null), + new TableCsvData(table, data)); + values = (String[]) ArrayUtils.remove(values, values.length - 1); + values = (String[]) ArrayUtils.add(values, 0, id); + assertTestTableEquals(id, values); + } + } + } \ No newline at end of file diff --git a/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java b/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java index 5add61d61a..d9e8423c10 100644 --- a/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java +++ b/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java @@ -4,6 +4,7 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Map; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Hex; @@ -21,6 +22,7 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter { + protected static final byte[] DELIMITER = "||".getBytes(); protected NativeJdbcExtractor jdbcExtractor; protected int maxRowsBeforeFlush; protected IStagingManager stagingManager; @@ -29,7 +31,9 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter { protected boolean fireTriggers; protected String uncPath; protected boolean needsBinaryConversion; + protected boolean needsColumnsReordered; protected Table table = null; + protected Table databaseTable = null; public MsSqlBulkDatabaseWriter(IDatabasePlatform platform, IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor, @@ -53,7 +57,18 @@ public boolean start(Table table) { break; } } - } + } + databaseTable = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), + sourceTable.getName(), false); + String[] csvNames = targetTable.getColumnNames(); + String[] columnNames = databaseTable.getColumnNames(); + needsColumnsReordered = false; + for (int i = 0; i < csvNames.length; i++) { + if (! csvNames[i].equals(columnNames[i])) { + needsColumnsReordered = true; + break; + } + } //TODO: Did this because start is getting called multiple times // for the same table in a single batch before end is being called if (this.stagedInputFile == null) { @@ -96,14 +111,27 @@ public void write(CsvData data) { } } } - OutputStream out = this.stagedInputFile.getOutputStream(); - for (int i = 0; i < parsedData.length; i++) { - if (parsedData[i] != null) { - out.write(parsedData[i].getBytes()); + if (needsColumnsReordered) { + Map mapData = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA); + String[] columnNames = databaseTable.getColumnNames(); + for (int i = 0; i < columnNames.length; i++) { + String columnData = mapData.get(columnNames[i]); + if (columnData != null) { + out.write(columnData.getBytes()); + } + if (i + 1 < columnNames.length) { + out.write(DELIMITER); + } } - if (i + 1 < parsedData.length) { - out.write("||".getBytes()); + } else { + for (int i = 0; i < parsedData.length; i++) { + if (parsedData[i] != null) { + out.write(parsedData[i].getBytes()); + } + if (i + 1 < parsedData.length) { + out.write(DELIMITER); + } } } out.write('\r');