Skip to content

Commit

Permalink
0001728: MSSQL Bulk Loader support for columns in different order
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 22, 2014
1 parent ba33fa2 commit 9929be0
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 7 deletions.
@@ -1,19 +1,24 @@
package org.jumpmind.symmetric.io.data.writer;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.mssql.MsSql2000DatabasePlatform;
import org.jumpmind.db.platform.mssql.MsSql2005DatabasePlatform;
import org.jumpmind.db.platform.mssql.MsSql2008DatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor;

public class MsSqlBulkDatabaseWriterTest extends AbstractBulkDatabaseWriterTest {
Expand Down Expand Up @@ -47,4 +52,25 @@ protected long writeData(List<CsvData> data) {
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null), new TableCsvData(table, data));
}

@Test
public void testInsertReorderColumns() throws Exception {
if (shouldTestRun(platform)) {
String id = getNextId();
String[] values = { "string with space in it", "string-with-no-space", "string with space in it",
"string-with-no-space", "2007-01-02 00:00:00.000", "2007-02-03 04:05:06.000", "0", "47", "67.89", "-0.0747663",
encode("string with space in it"), id };
List<CsvData> data = new ArrayList<CsvData>();
data.add(new CsvData(DataEventType.INSERT, (String[]) ArrayUtils.clone(values)));
Table table = (Table) platform.getTableFromCache(getTestTable(), false).clone();
Column firstColumn = table.getColumn(0);
table.removeColumn(firstColumn);
table.addColumn(firstColumn);
writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null),
new TableCsvData(table, data));
values = (String[]) ArrayUtils.remove(values, values.length - 1);
values = (String[]) ArrayUtils.add(values, 0, id);
assertTestTableEquals(id, values);
}
}

}
Expand Up @@ -4,6 +4,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
Expand All @@ -21,6 +22,7 @@

public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {

protected static final byte[] DELIMITER = "||".getBytes();
protected NativeJdbcExtractor jdbcExtractor;
protected int maxRowsBeforeFlush;
protected IStagingManager stagingManager;
Expand All @@ -29,7 +31,9 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {
protected boolean fireTriggers;
protected String uncPath;
protected boolean needsBinaryConversion;
protected boolean needsColumnsReordered;
protected Table table = null;
protected Table databaseTable = null;

public MsSqlBulkDatabaseWriter(IDatabasePlatform platform,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
Expand All @@ -53,7 +57,18 @@ public boolean start(Table table) {
break;
}
}
}
}
databaseTable = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(),
sourceTable.getName(), false);
String[] csvNames = targetTable.getColumnNames();
String[] columnNames = databaseTable.getColumnNames();
needsColumnsReordered = false;
for (int i = 0; i < csvNames.length; i++) {
if (! csvNames[i].equals(columnNames[i])) {
needsColumnsReordered = true;
break;
}
}
//TODO: Did this because start is getting called multiple times
// for the same table in a single batch before end is being called
if (this.stagedInputFile == null) {
Expand Down Expand Up @@ -96,14 +111,27 @@ public void write(CsvData data) {
}
}
}

OutputStream out = this.stagedInputFile.getOutputStream();
for (int i = 0; i < parsedData.length; i++) {
if (parsedData[i] != null) {
out.write(parsedData[i].getBytes());
if (needsColumnsReordered) {
Map<String, String> mapData = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA);
String[] columnNames = databaseTable.getColumnNames();
for (int i = 0; i < columnNames.length; i++) {
String columnData = mapData.get(columnNames[i]);
if (columnData != null) {
out.write(columnData.getBytes());
}
if (i + 1 < columnNames.length) {
out.write(DELIMITER);
}
}
if (i + 1 < parsedData.length) {
out.write("||".getBytes());
} else {
for (int i = 0; i < parsedData.length; i++) {
if (parsedData[i] != null) {
out.write(parsedData[i].getBytes());
}
if (i + 1 < parsedData.length) {
out.write(DELIMITER);
}
}
}
out.write('\r');
Expand Down

0 comments on commit 9929be0

Please sign in to comment.