From 070a6759b661c698427059f898c93742d9a80416 Mon Sep 17 00:00:00 2001 From: erilong Date: Mon, 10 Mar 2014 18:43:12 +0000 Subject: [PATCH] 0001639: MySQL and SQL-Server bulk loaders loading rows multiple times --- .../symmetric/io/MySqlBulkDatabaseWriter.java | 18 +++++++++++------- .../data/writer/MsSqlBulkDatabaseWriter.java | 12 ++++++++---- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java index b021aca16c..2f98d54545 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java @@ -27,6 +27,7 @@ public class MySqlBulkDatabaseWriter extends DefaultDatabaseWriter { protected IStagingManager stagingManager; protected IStagedResource stagedInputFile; protected int loadedRows = 0; + protected Table table = null; public MySqlBulkDatabaseWriter(IDatabasePlatform platform, IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor, @@ -40,11 +41,12 @@ public MySqlBulkDatabaseWriter(IDatabasePlatform platform, } public boolean start(Table table) { + this.table = table; if (super.start(table)) { //TODO: Did this because start is getting called multiple times // for the same table in a single batch before end is being called if (this.stagedInputFile == null) { - createStagingFile(table); + createStagingFile(); } return true; } else { @@ -55,9 +57,9 @@ public boolean start(Table table) { @Override public void end(Table table) { try { - this.stagedInputFile.close(); flush(); - this.stagedInputFile.delete(); + this.stagedInputFile.close(); + this.stagedInputFile.delete(); } finally { super.end(table); } @@ -98,13 +100,14 @@ public void write(CsvData data) { } protected void flush() { - statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); if (loadedRows > 0) { + this.stagedInputFile.close(); + statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); try { JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction; Connection c = jdbcTransaction.getConnection(); String sql = String.format("LOAD DATA " + (isLocal ? "LOCAL " : "") + - "INFILE '" + stagedInputFile.getFile().getAbsolutePath()).replace('\\', '/') + "' " + + "INFILE '" + stagedInputFile.getFile().getAbsolutePath()).replace('\\', '/') + "' " + (isReplace ? "REPLACE " : "IGNORE ") + "INTO TABLE " + this.getTargetTable().getFullyQualifiedTableName() + " FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' STARTING BY ''"; @@ -114,16 +117,17 @@ protected void flush() { log.debug(sql); stmt.execute(sql); stmt.close(); - } catch (SQLException ex) { throw platform.getSqlTemplate().translate(ex); } finally { statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS); } + this.stagedInputFile.delete(); + createStagingFile(); } } - protected void createStagingFile(Table table) { + protected void createStagingFile() { //TODO: We should use constants for dir structure path, // but we don't want to depend on symmetric core. this.stagedInputFile = stagingManager.create(0, "bulkloaddir", diff --git a/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java b/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java index 4a388ae0ca..b4b6ccc3eb 100644 --- a/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java +++ b/symmetric-mssql/src/main/java/org/jumpmind/symmetric/io/data/writer/MsSqlBulkDatabaseWriter.java @@ -27,6 +27,7 @@ public class MsSqlBulkDatabaseWriter extends DefaultDatabaseWriter { protected int loadedRows = 0; protected boolean fireTriggers; protected boolean needsBinaryConversion; + protected Table table = null; public MsSqlBulkDatabaseWriter(IDatabasePlatform platform, IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor, @@ -50,7 +51,7 @@ public boolean start(Table table) { //TODO: Did this because start is getting called multiple times // for the same table in a single batch before end is being called if (this.stagedInputFile == null) { - createStagingFile(table); + createStagingFile(); } return true; } else { @@ -61,8 +62,8 @@ public boolean start(Table table) { @Override public void end(Table table) { try { - this.stagedInputFile.close(); flush(); + this.stagedInputFile.close(); this.stagedInputFile.delete(); } finally { super.end(table); @@ -113,8 +114,9 @@ public void write(CsvData data) { } protected void flush() { - statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); if (loadedRows > 0) { + this.stagedInputFile.close(); + statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); try { JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction; Connection c = jdbcTransaction.getConnection(); @@ -133,10 +135,12 @@ protected void flush() { } finally { statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS); } + this.stagedInputFile.delete(); + createStagingFile(); } } - protected void createStagingFile(Table table) { + protected void createStagingFile() { //TODO: We should use constants for dir structure path, // but we don't want to depend on symmetric core. this.stagedInputFile = stagingManager.create(0, "bulkloaddir",