Skip to content

Commit

Permalink
0001639: MySQL and SQL-Server bulk loaders loading rows multiple times
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Mar 10, 2014
1 parent af24172 commit 070a675
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Expand Up @@ -27,6 +27,7 @@ public class MySqlBulkDatabaseWriter extends DefaultDatabaseWriter {
protected IStagingManager stagingManager;
protected IStagedResource stagedInputFile;
protected int loadedRows = 0;
protected Table table = null;

public MySqlBulkDatabaseWriter(IDatabasePlatform platform,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
Expand All @@ -40,11 +41,12 @@ public MySqlBulkDatabaseWriter(IDatabasePlatform platform,
}

public boolean start(Table table) {
this.table = table;
if (super.start(table)) {
//TODO: Did this because start is getting called multiple times
// for the same table in a single batch before end is being called
if (this.stagedInputFile == null) {
createStagingFile(table);
createStagingFile();
}
return true;
} else {
Expand All @@ -55,9 +57,9 @@ public boolean start(Table table) {
@Override
public void end(Table table) {
try {
this.stagedInputFile.close();
flush();
this.stagedInputFile.delete();
this.stagedInputFile.close();
this.stagedInputFile.delete();
} finally {
super.end(table);
}
Expand Down Expand Up @@ -98,13 +100,14 @@ public void write(CsvData data) {
}

protected void flush() {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
if (loadedRows > 0) {
this.stagedInputFile.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
try {
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();
String sql = String.format("LOAD DATA " + (isLocal ? "LOCAL " : "") +
"INFILE '" + stagedInputFile.getFile().getAbsolutePath()).replace('\\', '/') + "' " +
"INFILE '" + stagedInputFile.getFile().getAbsolutePath()).replace('\\', '/') + "' " +
(isReplace ? "REPLACE " : "IGNORE ") + "INTO TABLE " +
this.getTargetTable().getFullyQualifiedTableName() +
" FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' STARTING BY ''";
Expand All @@ -114,16 +117,17 @@ protected void flush() {
log.debug(sql);
stmt.execute(sql);
stmt.close();

} catch (SQLException ex) {
throw platform.getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
this.stagedInputFile.delete();
createStagingFile();
}
}

protected void createStagingFile(Table table) {
protected void createStagingFile() {
//TODO: We should use constants for dir structure path,
// but we don't want to depend on symmetric core.
this.stagedInputFile = stagingManager.create(0, "bulkloaddir",
Expand Down
Expand Up @@ -27,6 +27,7 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter {
protected int loadedRows = 0;
protected boolean fireTriggers;
protected boolean needsBinaryConversion;
protected Table table = null;

public MsSqlBulkDatabaseWriter(IDatabasePlatform platform,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
Expand All @@ -50,7 +51,7 @@ public boolean start(Table table) {
//TODO: Did this because start is getting called multiple times
// for the same table in a single batch before end is being called
if (this.stagedInputFile == null) {
createStagingFile(table);
createStagingFile();
}
return true;
} else {
Expand All @@ -61,8 +62,8 @@ public boolean start(Table table) {
@Override
public void end(Table table) {
try {
this.stagedInputFile.close();
flush();
this.stagedInputFile.close();
this.stagedInputFile.delete();
} finally {
super.end(table);
Expand Down Expand Up @@ -113,8 +114,9 @@ public void write(CsvData data) {
}

protected void flush() {
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
if (loadedRows > 0) {
this.stagedInputFile.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
try {
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();
Expand All @@ -133,10 +135,12 @@ protected void flush() {
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
this.stagedInputFile.delete();
createStagingFile();
}
}

protected void createStagingFile(Table table) {
protected void createStagingFile() {
//TODO: We should use constants for dir structure path,
// but we don't want to depend on symmetric core.
this.stagedInputFile = stagingManager.create(0, "bulkloaddir",
Expand Down

0 comments on commit 070a675

Please sign in to comment.