Skip to content
Permalink
Browse files

0001367 - SQL Server Bulk Loader

  • Loading branch information...
gwilmer committed Aug 5, 2013
1 parent 836e7bc commit e16483a7e91798f1fa6ed4603c84038e6bf75d19
@@ -0,0 +1,60 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-mssql</artifactId>
<packaging>jar</packaging>
<version>3.5.3-SNAPSHOT</version>
<name>mssql</name>
<url>http://symmetricds.org</url>

<parent>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-parent</artifactId>
<version>3.5.2-SNAPSHOT</version>
<relativePath>../symmetric-parent/pom.xml</relativePath>
</parent>

<dependencies>
<dependency>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-io</artifactId>
</dependency>
<dependency>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-io</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.jumpmind.symmetric</groupId>
<artifactId>symmetric-jdbc</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>jdbc</groupId>
<artifactId>ojdbc</artifactId>
<version>11.2.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,131 @@
package org.jumpmind.symmetric.io.data.writer;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import oracle.jdbc.internal.OracleCallableStatement;
import oracle.sql.ARRAY;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.BulkSqlException;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MsSqlBulkDatabaseWriter extends DatabaseWriter {

protected NativeJdbcExtractor jdbcExtractor;
protected int maxRowsBeforeFlush;
protected IStagingManager stagingManager;
protected IStagedResource stagedInputFile;
protected int loadedRows = 0;

public MsSqlBulkDatabaseWriter(IDatabasePlatform platform,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
int maxRowsBeforeFlush) {
super(platform);
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.stagingManager = stagingManager;
}

public boolean start(Table table) {
if (super.start(table)) {
//TODO: Did this because start is getting called multiple times
// for the same table in a single batch before end is being called
if (this.stagedInputFile == null) {
createStagingFile(table);
}
return true;
} else {
return false;
}
}

@Override
public void end(Table table) {
try {
this.stagedInputFile.close();
flush();
this.stagedInputFile.delete();
} finally {
super.end(table);
}
}

public void write(CsvData data) {
DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
try {
String formattedData = CsvUtils.escapeCsvData(
data.getParsedData(CsvData.ROW_DATA), ' ', ' ',
CsvWriter.ESCAPE_MODE_DOUBLED);
byte[] dataToLoad = formattedData.getBytes();
this.stagedInputFile.getOutputStream().write(dataToLoad);
this.stagedInputFile.getOutputStream().write('\r');
this.stagedInputFile.getOutputStream().write('\n');
loadedRows++;
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
break;
case UPDATE:
case DELETE:
default:
flush();
super.write(data);
break;
}

if (loadedRows >= maxRowsBeforeFlush) {
flush();
loadedRows = 0;
}
}

protected void flush() {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
if (loadedRows > 0) {
try {
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();
String sql = String.format("BULK INSERT " +
this.getTargetTable().getFullyQualifiedTableName() +
" FROM '" + stagedInputFile.getFile().getAbsolutePath()) + "'" +
" WITH ( FIELDTERMINATOR=',');";
Statement stmt = c.createStatement();

//TODO: clean this up, deal with errors, etc.?
stmt.execute(sql);
stmt.close();

} catch (SQLException ex) {
throw platform.getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
}
}

protected void createStagingFile(Table table) {
//TODO: We should use constants for dir structure path,
// but we don't want to depend on symmetric core.
this.stagedInputFile = stagingManager.create(0, "bulkloaddir",
table.getName() + this.getBatch().getBatchId() + ".csv");
}

}

0 comments on commit e16483a

Please sign in to comment.
You can’t perform that action at this time.