Skip to content

Commit

Permalink
formatting, clean up resource handling
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 15, 2019
1 parent 62b5da9 commit 52df866
Showing 1 changed file with 64 additions and 58 deletions.
@@ -1,19 +1,14 @@
package org.jumpmind.symmetric.io;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;

import javax.sql.DataSource;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.jumpmind.db.model.Column;
Expand Down Expand Up @@ -66,7 +61,7 @@ public boolean start(Table table) {
}

needsBinaryConversion = false;
if (! batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) {
if (!batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) {
for (Column column : targetTable.getColumns()) {
if (column.isOfBinaryType()) {
needsBinaryConversion = true;
Expand All @@ -75,10 +70,10 @@ public boolean start(Table table) {
}
}

columnHeaderWritten=false;
if (this.stagedInputFile == null) {
createStagingFile();
}
columnHeaderWritten = false;
if (this.stagedInputFile == null) {
createStagingFile();
}

return true;
} else {
Expand Down Expand Up @@ -169,8 +164,7 @@ protected void bulkWrite(CsvData data) {

protected void flush() {
if (loadedRows > 0) {

this.stagedInputFile.close();
this.stagedInputFile.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
Connection c = null;
ResettableBasicDataSource ds = null;
Expand All @@ -183,9 +177,9 @@ protected void flush() {
String tableName = this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator);

try {
cleanUpFastLoadTables(tableName, false);
IDatabasePlatform platform = getPlatform();
cleanUpFastLoadTables(tableName, false);

IDatabasePlatform platform = getPlatform();
ds = ((ResettableBasicDataSource) platform.getDataSource());

boolean containsCommas = ds.getUrl().indexOf(",") > 0;
Expand All @@ -194,7 +188,7 @@ protected void flush() {
fastLoadConnectionString = (lastCharSlash && containsCommas ? "," : lastCharSlash ? "" : "/") + fastLoadConnectionString;

if (ds.getUrl().indexOf(fastLoadConnectionString) < 0) {
ds.setUrl(ds.getUrl() + fastLoadConnectionString);
ds.setUrl(ds.getUrl() + fastLoadConnectionString);
}

c = DriverManager.getConnection(ds.getUrl(), ds.getUsername(), ds.getPassword());
Expand All @@ -209,8 +203,7 @@ protected void flush() {

log.info("Fast load complete.");
} catch (SQLException e) {
while (e != null)
{
while (e != null) {
log.error("SQL State = "
+ e.getSQLState()
+ ", Error Code = "
Expand All @@ -231,20 +224,18 @@ protected void flush() {
cleanUpFastLoadTables(tableName, false);

if (c != null) {
try {
c.close();
}
catch (SQLException sqle) {
log.error("Unable to close teradata connection", sqle);
}
try {
c.close();
} catch (SQLException sqle) {
log.error("Unable to close teradata connection", sqle);
}
}
if (ps != null) {
try {
ps.close();
}
catch (SQLException sqle) {
log.error("Unable to close teradata prepared statement", sqle);
}
try {
ps.close();
} catch (SQLException sqle) {
log.error("Unable to close teradata prepared statement", sqle);
}
}
}
}
Expand All @@ -253,45 +244,60 @@ protected void flush() {
protected void cleanUpFastLoadTables(String tableName, boolean clearTargetTable) {
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) getTargetTransaction();
Connection normalConnection = jdbcTransaction.getConnection();

Statement stmt = null;

if (clearTargetTable) {
try {
Statement stmt = normalConnection.createStatement();
stmt.execute("delete from " + tableName);
stmt.close();
}
catch (Exception e) {
log.info("Unable to delete from teradata table + " + tableName + " before fast load.");
}
try {
stmt = normalConnection.createStatement();
stmt.execute("delete from " + tableName);
} catch (Exception e) {
log.info("Unable to delete from teradata table + " + tableName + " before fast load.");
} finally {
closeStatement(stmt);
}
}

try {
normalConnection.setAutoCommit(true);
Statement stmt = normalConnection.createStatement();
stmt.execute("drop table " + tableName + "_ERR_1");
stmt.close();
normalConnection.setAutoCommit(true);
stmt = normalConnection.createStatement();
stmt.execute("drop table " + tableName + "_ERR_1");
} catch (Exception e) {
log.info("Unable to drop teradata table + " + tableName + "_ERR_1 before fast load.");
} finally {
closeStatement(stmt);
}
catch (Exception e) { }

try {
normalConnection.setAutoCommit(true);
Statement stmt = normalConnection.createStatement();
stmt.execute("drop table " + tableName + "_ERR_2");
stmt.close();
normalConnection.setAutoCommit(true);
stmt = normalConnection.createStatement();
stmt.execute("drop table " + tableName + "_ERR_2");
} catch (Exception e) {
log.info("Unable to drop teradata table + " + tableName + "_ERR_2 before fast load.");
} finally {
closeStatement(stmt);
}
}

protected void closeStatement(Statement stmt) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException sqle) {
log.error("Unable to close statement", sqle);
}
}
catch (Exception e) { }
}

protected String buildSql() {
StringBuffer sql = new StringBuffer("(");
for (int i=0; i < this.getTargetTable().getColumnCount(); i++) {
if (i>0) {
sql.append(",");
}
sql.append("?");
}
sql.append(")");
return sql.toString();
StringBuffer sql = new StringBuffer("(");
for (int i = 0; i < this.getTargetTable().getColumnCount(); i++) {
if (i > 0) {
sql.append(",");
}
sql.append("?");
}
sql.append(")");
return sql.toString();
}

protected void createStagingFile() {
Expand Down

0 comments on commit 52df866

Please sign in to comment.