Skip to content

Commit

Permalink
0002357: Need to be able to set the field terminator and record
Browse files Browse the repository at this point in the history
terminator for the mssql bulk loader
  • Loading branch information
chenson42 committed Jul 31, 2015
1 parent a576d11 commit d18a43d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 10 deletions.
Expand Up @@ -20,8 +20,6 @@
*/
package org.jumpmind.symmetric.io;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;
Expand Down Expand Up @@ -70,10 +68,10 @@ public MsSqlBulkDatabaseWriter(IDatabasePlatform platform,
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.stagingManager = stagingManager;
this.fireTriggers = fireTriggers;
if (isNotBlank(fieldTerminator)) {
if (fieldTerminator != null && fieldTerminator.length() > 0) {
this.fieldTerminator = fieldTerminator;
}
if (isNotBlank(rowTerminator)) {
if (rowTerminator != null && rowTerminator.length() > 0) {
this.rowTerminator = rowTerminator;
}
this.uncPath = uncPath;
Expand Down Expand Up @@ -205,11 +203,21 @@ protected void flush() {
String schemaSeparator = dbInfo.getSchemaSeparator();
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();
String rowTerminatorString = "";
/*
* There seems to be a bug with the SQL server bulk insert when
* you have one row with binary data at the end using \n as the
* row terminator. It works when you leave the row terminator
* out of the bulk insert statement.
*/
if (!(rowTerminator.equals("\n") || rowTerminator.equals("\r\n"))) {
rowTerminatorString = ", ROWTERMINATOR='" + StringEscapeUtils.escapeJava(rowTerminator) + "'";
}
String sql = String.format("BULK INSERT " +
this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) +
" FROM '" + filename) + "'" +
" WITH ( FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" +
(fireTriggers ? ", FIRE_TRIGGERS" : "") + ", ROWTERMINATOR='"+StringEscapeUtils.escapeJava(rowTerminator)+"');";
(fireTriggers ? ", FIRE_TRIGGERS" : "") + rowTerminatorString +");";
Statement stmt = c.createStatement();

//TODO: clean this up, deal with errors, etc.?
Expand Down
Expand Up @@ -26,15 +26,14 @@
import java.util.Map;
import java.util.Random;

import junit.framework.Assert;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.ArrayUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.AbstractWriterTest;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.junit.Assert;
import org.junit.Test;

public abstract class AbstractBulkDatabaseWriterTest extends AbstractWriterTest {
Expand Down
Expand Up @@ -45,14 +45,16 @@
public class MsSqlBulkDatabaseWriterTest extends AbstractBulkDatabaseWriterTest {

protected static IStagingManager stagingManager;

protected static final String uncPath = "\\\\192.168.42.121\\bulkloaddir";

@BeforeClass
public static void setup() throws Exception {
if (DbTestUtils.getEnvironmentSpecificProperties(DbTestUtils.ROOT).get(BasicDataSourcePropertyConstants.DB_POOL_DRIVER)
.equals("net.sourceforge.jtds.jdbc.Driver")) {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(platform.readDatabaseFromXml("/testBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("tmp");
stagingManager = new StagingManager("target/tmp");
}
}

Expand All @@ -70,7 +72,7 @@ protected boolean shouldTestRun(IDatabasePlatform platform) {

protected long writeData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null, "||", "\r\n"), new TableCsvData(table, data));
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null), new TableCsvData(table, data));
}

@Test
Expand All @@ -86,7 +88,7 @@ public void testInsertReorderColumns() throws Exception {
Column firstColumn = table.getColumn(0);
table.removeColumn(firstColumn);
table.addColumn(firstColumn);
writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null, "||", "\r\n"),
writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, uncPath, null, null),
new TableCsvData(table, data));
values = (String[]) ArrayUtils.remove(values, values.length - 1);
values = (String[]) ArrayUtils.add(values, 0, id);
Expand Down

0 comments on commit d18a43d

Please sign in to comment.