Skip to content

Commit

Permalink
0002357: Need to be able to set the field terminator and record
Browse files Browse the repository at this point in the history
terminator for the mssql bulk loader
  • Loading branch information
chenson42 committed Jul 31, 2015
1 parent 6d2f9dd commit 65e5cc0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 10 deletions.
Expand Up @@ -22,6 +22,7 @@

import java.util.List;

import org.apache.commons.lang.StringEscapeUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcUtils;
Expand Down Expand Up @@ -66,8 +67,10 @@ public IDataWriter getDataWriter(String sourceNodeId,
100000);
boolean fireTriggers = parameterService.is("mssql.bulk.load.fire.triggers", false);
String uncPath = parameterService.getString("mssql.bulk.load.unc.path");
String rowTerminator = StringEscapeUtils.unescapeJava(parameterService.getString("mssql.bulk.load.row.terminator", "\\r\\n"));
String fieldTerminator = StringEscapeUtils.unescapeJava(parameterService.getString("mssql.bulk.load.field.terminator", "||"));
return new MsSqlBulkDatabaseWriter(symmetricDialect.getPlatform(),
stagingManager, jdbcExtractor, maxRowsBeforeFlush, fireTriggers, uncPath);
stagingManager, jdbcExtractor, maxRowsBeforeFlush, fireTriggers, uncPath, fieldTerminator, rowTerminator);
}

public void setSymmetricEngine(ISymmetricEngine engine) {
Expand Down
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.io;

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.OutputStream;
import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -28,6 +30,7 @@

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
Expand All @@ -45,11 +48,12 @@

public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {

protected static final byte[] DELIMITER = "||".getBytes();
protected NativeJdbcExtractor jdbcExtractor;
protected int maxRowsBeforeFlush;
protected IStagingManager stagingManager;
protected IStagedResource stagedInputFile;
protected String rowTerminator = "\r\n";
protected String fieldTerminator = "||";
protected int loadedRows = 0;
protected boolean fireTriggers;
protected String uncPath;
Expand All @@ -60,12 +64,18 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {

public MsSqlBulkDatabaseWriter(IDatabasePlatform platform,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
int maxRowsBeforeFlush, boolean fireTriggers, String uncPath) {
int maxRowsBeforeFlush, boolean fireTriggers, String uncPath, String fieldTerminator, String rowTerminator) {
super(platform);
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.stagingManager = stagingManager;
this.fireTriggers = fireTriggers;
if (isNotBlank(fieldTerminator)) {
this.fieldTerminator = fieldTerminator;
}
if (isNotBlank(rowTerminator)) {
this.rowTerminator = rowTerminator;
}
this.uncPath = uncPath;
}

Expand Down Expand Up @@ -144,7 +154,7 @@ public void write(CsvData data) {
out.write(columnData.getBytes());
}
if (i + 1 < columnNames.length) {
out.write(DELIMITER);
out.write(fieldTerminator.getBytes());
}
}
} else {
Expand All @@ -153,12 +163,11 @@ public void write(CsvData data) {
out.write(parsedData[i].getBytes());
}
if (i + 1 < parsedData.length) {
out.write(DELIMITER);
out.write(fieldTerminator.getBytes());
}
}
}
out.write('\r');
out.write('\n');
out.write(rowTerminator.getBytes());
loadedRows++;
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
Expand Down Expand Up @@ -199,7 +208,8 @@ protected void flush() {
String sql = String.format("BULK INSERT " +
this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) +
" FROM '" + filename) + "'" +
" WITH ( FIELDTERMINATOR='||', KEEPIDENTITY " + (fireTriggers ? ", FIRE_TRIGGERS" : "") + ");";
" WITH ( FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" +
(fireTriggers ? ", FIRE_TRIGGERS" : "") + ", ROWTERMINATOR='"+StringEscapeUtils.escapeJava(rowTerminator)+"');";
Statement stmt = c.createStatement();

//TODO: clean this up, deal with errors, etc.?
Expand Down
Expand Up @@ -70,7 +70,7 @@ protected boolean shouldTestRun(IDatabasePlatform platform) {

protected long writeData(List<CsvData> data) {
Table table = platform.getTableFromCache(getTestTable(), false);
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null), new TableCsvData(table, data));
return writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null, "||", "\r\n"), new TableCsvData(table, data));
}

@Test
Expand All @@ -86,7 +86,7 @@ public void testInsertReorderColumns() throws Exception {
Column firstColumn = table.getColumn(0);
table.removeColumn(firstColumn);
table.addColumn(firstColumn);
writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null),
writeData(new MsSqlBulkDatabaseWriter(platform, stagingManager, new CommonsDbcpNativeJdbcExtractor(), 1000, false, null, "||", "\r\n"),
new TableCsvData(table, data));
values = (String[]) ArrayUtils.remove(values, values.length - 1);
values = (String[]) ArrayUtils.add(values, 0, id);
Expand Down
14 changes: 14 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -1391,6 +1391,20 @@ mssql.bulk.load.fire.triggers=false
# Tags: other, mssql
mssql.bulk.load.unc.path=

# Specify the line terminator used by the SQL Server bulk loader. Pick something that does not exist in the
# data in your database.
#
# DatabaseOverridable: true
# Tags: other, mssql
mssql.bulk.load.row.terminator=\\r\\n

# Specify the field terminator used by the SQL Server bulk loader. Pick something that does not exist in the
# data in your database.
#
# DatabaseOverridable: true
# Tags: other, mssql
mssql.bulk.load.field.terminator=||

# Automatically update data, data_event and outgoing_batch tables to allow only
# row level locking.
#
Expand Down

0 comments on commit 65e5cc0

Please sign in to comment.