From d0c34811e3a30ebc97fd784ebf2fdd3f2085358b Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Wed, 11 Jul 2018 09:50:29 +0800 Subject: [PATCH 1/4] Using a Timer to test the jdbc connection periodically and keep it alive If jdbc connction lies idle for a long time, the database will force close the connetion. Keep this connection valid using a timer. --- .../io/jdbc/JDBCAppendTableSinkBuilder.java | 16 ++++++++++ .../api/java/io/jdbc/JDBCOutputFormat.java | 31 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java index da00d74cdced7..59f7911a0ae87 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java @@ -22,6 +22,8 @@ import org.apache.flink.util.Preconditions; import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL; +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; +import static org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; /** * A builder to configure and build the JDBCAppendTableSink. @@ -34,6 +36,8 @@ public class JDBCAppendTableSinkBuilder { private String query; private int batchSize = DEFAULT_BATCH_INTERVAL; private int[] parameterTypes; + private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; + private int idleConnectionCheckTimeout = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; /** * Specify the username of the JDBC connection. @@ -115,6 +119,16 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { return this; } + public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) { + this.idleConnectionCheckInterval = idleConnectionCheckInterval; + return this; + } + + public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) { + this.idleConnectionCheckTimeout = idleConnectionCheckTimeout; + return this; + } + /** * Finalizes the configuration and checks validity. * @@ -133,6 +147,8 @@ public JDBCAppendTableSink build() { .setDrivername(driverName) .setBatchInterval(batchSize) .setSqlTypes(parameterTypes) + .setIdleConnectionCheckInterval(idleConnectionCheckInterval) + .setIdleConnectionCheckTimeout(idleConnectionCheckTimeout) .finish(); return new JDBCAppendTableSink(format); diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index 67c7a0e573978..5d8373fad4117 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -30,6 +30,8 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Timer; +import java.util.TimerTask; /** * OutputFormat to write Rows into a JDBC database. @@ -41,6 +43,8 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; + static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000; + static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); @@ -54,6 +58,10 @@ public class JDBCOutputFormat extends RichOutputFormat { private Connection dbConn; private PreparedStatement upload; + private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; + private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; + private transient Timer timer = new Timer(); + private int batchCount = 0; private int[] typesArray; @@ -91,6 +99,18 @@ private void establishConnection() throws SQLException, ClassNotFoundException { } else { dbConn = DriverManager.getConnection(dbURL, username, password); } + timer.schedule(new TimerTask() { + @Override + public void run() { + try { + if (!dbConn.isValid(idleConnectionCheckTimeOut)) { + throw new RuntimeException("JDBC connection is invalid."); + } + } catch (SQLException e) { + throw new RuntimeException("Error validating JDBC connection.", e); + } + } + }, idleConnectionCheckInterval, idleConnectionCheckInterval); } /** @@ -231,6 +251,7 @@ int[] getTypesArray() { */ @Override public void close() throws IOException { + timer.cancel(); if (upload != null) { flush(); // close the connection @@ -303,6 +324,16 @@ public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { return this; } + public JDBCOutputFormatBuilder setIdleConnectionCheckInterval(long interval) { + format.idleConnectionCheckInterval = interval; + return this; + } + + public JDBCOutputFormatBuilder setIdleConnectionCheckTimeout(int timeout) { + format.idleConnectionCheckTimeOut = timeout; + return this; + } + /** * Finalizes the configuration and checks validity. * From 55425348542606d8dcb63b9c5e773d969a1f10b3 Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Wed, 11 Jul 2018 09:58:40 +0800 Subject: [PATCH 2/4] [FLINK-9794] JDBCOutputFormat does not consider idle connection and multithreads synchronization --- .../io/jdbc/JDBCAppendTableSinkBuilder.java | 11 + .../api/java/io/jdbc/JDBCOutputFormat.java | 214 +++++++++--------- 2 files changed, 124 insertions(+), 101 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java index 59f7911a0ae87..b97ca16c27320 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java @@ -119,11 +119,22 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { return this; } + /** + * Specify the interval that the idle connection will be checked. + * @param idleConnectionCheckInterval the interval in milliseconds that the + * idle connection will be checked. + */ public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) { this.idleConnectionCheckInterval = idleConnectionCheckInterval; return this; } + /** + * Specify the time in seconds to wait for the database operation used to + * validate the connection to complete. + * @param idleConnectionCheckTimeout time in seconds to wait while validating + * the connection. + */ public JDBCAppendTableSinkBuilder setIdleConnectionCheckTimeout(int idleConnectionCheckTimeout) { this.idleConnectionCheckTimeout = idleConnectionCheckTimeout; return this; diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index 5d8373fad4117..a80941523e24e 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -43,7 +43,7 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; - static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000; + static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000L; static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); @@ -58,9 +58,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private Connection dbConn; private PreparedStatement upload; + //the interval in milliseconds that the idle connection will be checked private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; - private int idleConnectionCheckTimeOut = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; - private transient Timer timer = new Timer(); + //time in seconds to wait while validating the connection + private int idleConnectionCheckTimeout = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; + private transient Timer timer; private int batchCount = 0; @@ -99,11 +101,12 @@ private void establishConnection() throws SQLException, ClassNotFoundException { } else { dbConn = DriverManager.getConnection(dbURL, username, password); } + timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { try { - if (!dbConn.isValid(idleConnectionCheckTimeOut)) { + if (!dbConn.isValid(idleConnectionCheckTimeout)) { throw new RuntimeException("JDBC connection is invalid."); } } catch (SQLException e) { @@ -131,110 +134,117 @@ public void writeRecord(Row row) throws IOException { if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } - try { + synchronized (this) { + try { + fillStmt(row); + upload.addBatch(); + batchCount++; + } catch (SQLException e) { + throw new RuntimeException("Preparation of JDBC statement failed.", e); + } - if (typesArray == null) { - // no types provided - for (int index = 0; index < row.getArity(); index++) { - LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index)); - upload.setObject(index + 1, row.getField(index)); - } - } else { - // types provided - for (int index = 0; index < row.getArity(); index++) { - - if (row.getField(index) == null) { - upload.setNull(index + 1, typesArray[index]); - } else { - // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html - switch (typesArray[index]) { - case java.sql.Types.NULL: - upload.setNull(index + 1, typesArray[index]); - break; - case java.sql.Types.BOOLEAN: - case java.sql.Types.BIT: - upload.setBoolean(index + 1, (boolean) row.getField(index)); - break; - case java.sql.Types.CHAR: - case java.sql.Types.NCHAR: - case java.sql.Types.VARCHAR: - case java.sql.Types.LONGVARCHAR: - case java.sql.Types.LONGNVARCHAR: - upload.setString(index + 1, (String) row.getField(index)); - break; - case java.sql.Types.TINYINT: - upload.setByte(index + 1, (byte) row.getField(index)); - break; - case java.sql.Types.SMALLINT: - upload.setShort(index + 1, (short) row.getField(index)); - break; - case java.sql.Types.INTEGER: - upload.setInt(index + 1, (int) row.getField(index)); - break; - case java.sql.Types.BIGINT: - upload.setLong(index + 1, (long) row.getField(index)); - break; - case java.sql.Types.REAL: - upload.setFloat(index + 1, (float) row.getField(index)); - break; - case java.sql.Types.FLOAT: - case java.sql.Types.DOUBLE: - upload.setDouble(index + 1, (double) row.getField(index)); - break; - case java.sql.Types.DECIMAL: - case java.sql.Types.NUMERIC: - upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index)); - break; - case java.sql.Types.DATE: - upload.setDate(index + 1, (java.sql.Date) row.getField(index)); - break; - case java.sql.Types.TIME: - upload.setTime(index + 1, (java.sql.Time) row.getField(index)); - break; - case java.sql.Types.TIMESTAMP: - upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index)); - break; - case java.sql.Types.BINARY: - case java.sql.Types.VARBINARY: - case java.sql.Types.LONGVARBINARY: - upload.setBytes(index + 1, (byte[]) row.getField(index)); - break; - default: - upload.setObject(index + 1, row.getField(index)); - LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.", - typesArray[index], index + 1, row.getField(index)); - // case java.sql.Types.SQLXML - // case java.sql.Types.ARRAY: - // case java.sql.Types.JAVA_OBJECT: - // case java.sql.Types.BLOB: - // case java.sql.Types.CLOB: - // case java.sql.Types.NCLOB: - // case java.sql.Types.DATALINK: - // case java.sql.Types.DISTINCT: - // case java.sql.Types.OTHER: - // case java.sql.Types.REF: - // case java.sql.Types.ROWID: - // case java.sql.Types.STRUC - } - } - } + if (batchCount >= batchInterval) { + // execute batch + flush(); } - upload.addBatch(); - batchCount++; - } catch (SQLException e) { - throw new RuntimeException("Preparation of JDBC statement failed.", e); } + } - if (batchCount >= batchInterval) { - // execute batch - flush(); + private void fillStmt(Row row) throws SQLException { + if (typesArray == null) { + // no types provided + for (int index = 0; index < row.getArity(); index++) { + LOG.warn("Unknown column type for column {}. Best effort approach to set its value: {}.", index + 1, row.getField(index)); + upload.setObject(index + 1, row.getField(index)); + } + } else { + // types provided + for (int index = 0; index < row.getArity(); index++) { + + if (row.getField(index) == null) { + upload.setNull(index + 1, typesArray[index]); + } else { + // casting values as suggested by http://docs.oracle.com/javase/1.5.0/docs/guide/jdbc/getstart/mapping.html + switch (typesArray[index]) { + case java.sql.Types.NULL: + upload.setNull(index + 1, typesArray[index]); + break; + case java.sql.Types.BOOLEAN: + case java.sql.Types.BIT: + upload.setBoolean(index + 1, (boolean) row.getField(index)); + break; + case java.sql.Types.CHAR: + case java.sql.Types.NCHAR: + case java.sql.Types.VARCHAR: + case java.sql.Types.LONGVARCHAR: + case java.sql.Types.LONGNVARCHAR: + upload.setString(index + 1, (String) row.getField(index)); + break; + case java.sql.Types.TINYINT: + upload.setByte(index + 1, (byte) row.getField(index)); + break; + case java.sql.Types.SMALLINT: + upload.setShort(index + 1, (short) row.getField(index)); + break; + case java.sql.Types.INTEGER: + upload.setInt(index + 1, (int) row.getField(index)); + break; + case java.sql.Types.BIGINT: + upload.setLong(index + 1, (long) row.getField(index)); + break; + case java.sql.Types.REAL: + upload.setFloat(index + 1, (float) row.getField(index)); + break; + case java.sql.Types.FLOAT: + case java.sql.Types.DOUBLE: + upload.setDouble(index + 1, (double) row.getField(index)); + break; + case java.sql.Types.DECIMAL: + case java.sql.Types.NUMERIC: + upload.setBigDecimal(index + 1, (java.math.BigDecimal) row.getField(index)); + break; + case java.sql.Types.DATE: + upload.setDate(index + 1, (java.sql.Date) row.getField(index)); + break; + case java.sql.Types.TIME: + upload.setTime(index + 1, (java.sql.Time) row.getField(index)); + break; + case java.sql.Types.TIMESTAMP: + upload.setTimestamp(index + 1, (java.sql.Timestamp) row.getField(index)); + break; + case java.sql.Types.BINARY: + case java.sql.Types.VARBINARY: + case java.sql.Types.LONGVARBINARY: + upload.setBytes(index + 1, (byte[]) row.getField(index)); + break; + default: + upload.setObject(index + 1, row.getField(index)); + LOG.warn("Unmanaged sql type ({}) for column {}. Best effort approach to set its value: {}.", + typesArray[index], index + 1, row.getField(index)); + // case java.sql.Types.SQLXML + // case java.sql.Types.ARRAY: + // case java.sql.Types.JAVA_OBJECT: + // case java.sql.Types.BLOB: + // case java.sql.Types.CLOB: + // case java.sql.Types.NCLOB: + // case java.sql.Types.DATALINK: + // case java.sql.Types.DISTINCT: + // case java.sql.Types.OTHER: + // case java.sql.Types.REF: + // case java.sql.Types.ROWID: + // case java.sql.Types.STRUC + } + } + } } } void flush() { try { - upload.executeBatch(); - batchCount = 0; + synchronized (this) { + upload.executeBatch(); + batchCount = 0; + } } catch (SQLException e) { throw new RuntimeException("Execution of JDBC statement failed.", e); } @@ -251,7 +261,9 @@ int[] getTypesArray() { */ @Override public void close() throws IOException { - timer.cancel(); + if (timer != null) { + timer.cancel(); + } if (upload != null) { flush(); // close the connection @@ -330,7 +342,7 @@ public JDBCOutputFormatBuilder setIdleConnectionCheckInterval(long interval) { } public JDBCOutputFormatBuilder setIdleConnectionCheckTimeout(int timeout) { - format.idleConnectionCheckTimeOut = timeout; + format.idleConnectionCheckTimeout = timeout; return this; } From ce12d058d2365a081dcf2fb3ba22d4a9dfa8e3df Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Thu, 12 Jul 2018 10:39:02 +0800 Subject: [PATCH 3/4] remove synchronization and address review comment we don't nedd synchronization here, because StreamTask ensures that invoke() method and snapshotState() won't be called concurrently. --- .../api/java/io/jdbc/JDBCOutputFormat.java | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index a80941523e24e..43807afe41b8c 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -130,23 +130,20 @@ public void run() { */ @Override public void writeRecord(Row row) throws IOException { - if (typesArray != null && typesArray.length > 0 && typesArray.length != row.getArity()) { LOG.warn("Column SQL types array doesn't match arity of passed Row! Check the passed array..."); } - synchronized (this) { - try { - fillStmt(row); - upload.addBatch(); - batchCount++; - } catch (SQLException e) { - throw new RuntimeException("Preparation of JDBC statement failed.", e); - } + try { + fillStmt(row); + upload.addBatch(); + batchCount++; + } catch (SQLException e) { + throw new RuntimeException("Preparation of JDBC statement failed.", e); + } - if (batchCount >= batchInterval) { - // execute batch - flush(); - } + if (batchCount >= batchInterval) { + // execute batch + flush(); } } @@ -241,10 +238,8 @@ private void fillStmt(Row row) throws SQLException { void flush() { try { - synchronized (this) { - upload.executeBatch(); - batchCount = 0; - } + upload.executeBatch(); + batchCount = 0; } catch (SQLException e) { throw new RuntimeException("Execution of JDBC statement failed.", e); } From ff4cc3f8d17a9fcaeb6dceff95f6a00b060419b2 Mon Sep 17 00:00:00 2001 From: jrthe42 Date: Sun, 5 Aug 2018 21:30:09 +0800 Subject: [PATCH 4/4] Using ScheduledExecutorService instead of Timer, and address reviewer's comment. --- .../io/jdbc/JDBCAppendTableSinkBuilder.java | 6 +-- .../api/java/io/jdbc/JDBCOutputFormat.java | 38 +++++++++---------- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java index b97ca16c27320..c9e7db73de88c 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java @@ -36,7 +36,7 @@ public class JDBCAppendTableSinkBuilder { private String query; private int batchSize = DEFAULT_BATCH_INTERVAL; private int[] parameterTypes; - private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; + private int idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; private int idleConnectionCheckTimeout = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; /** @@ -121,10 +121,10 @@ public JDBCAppendTableSinkBuilder setParameterTypes(int... types) { /** * Specify the interval that the idle connection will be checked. - * @param idleConnectionCheckInterval the interval in milliseconds that the + * @param idleConnectionCheckInterval the interval in seconds that the * idle connection will be checked. */ - public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(long idleConnectionCheckInterval) { + public JDBCAppendTableSinkBuilder setIdleConnectionCheckInterval(int idleConnectionCheckInterval) { this.idleConnectionCheckInterval = idleConnectionCheckInterval; return this; } diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java index 43807afe41b8c..c0f4aacc78d2d 100644 --- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java +++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -30,8 +30,9 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * OutputFormat to write Rows into a JDBC database. @@ -43,7 +44,7 @@ public class JDBCOutputFormat extends RichOutputFormat { private static final long serialVersionUID = 1L; static final int DEFAULT_BATCH_INTERVAL = 5000; - static final long DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60 * 1000L; + static final int DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL = 30 * 60; static final int DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT = 0; private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); @@ -58,11 +59,11 @@ public class JDBCOutputFormat extends RichOutputFormat { private Connection dbConn; private PreparedStatement upload; - //the interval in milliseconds that the idle connection will be checked - private long idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; + //the interval in seconds that the idle connection will be checked, 30 min default + private int idleConnectionCheckInterval = DEFAULT_IDLE_CONNECTION_CHECK_INTERVAL; //time in seconds to wait while validating the connection private int idleConnectionCheckTimeout = DEFAULT_IDLE_CONNECTION_CHECK_TIMEOUT; - private transient Timer timer; + private transient ScheduledExecutorService timerService; private int batchCount = 0; @@ -101,19 +102,14 @@ private void establishConnection() throws SQLException, ClassNotFoundException { } else { dbConn = DriverManager.getConnection(dbURL, username, password); } - timer = new Timer(); - timer.schedule(new TimerTask() { - @Override - public void run() { - try { - if (!dbConn.isValid(idleConnectionCheckTimeout)) { - throw new RuntimeException("JDBC connection is invalid."); - } - } catch (SQLException e) { - throw new RuntimeException("Error validating JDBC connection.", e); - } + timerService = Executors.newSingleThreadScheduledExecutor(); + timerService.scheduleAtFixedRate(() -> { + try { + dbConn.isValid(idleConnectionCheckTimeout); + } catch (SQLException e) { + throw new RuntimeException("Error validating JDBC connection.", e); } - }, idleConnectionCheckInterval, idleConnectionCheckInterval); + }, idleConnectionCheckInterval, idleConnectionCheckInterval, TimeUnit.SECONDS); } /** @@ -256,8 +252,8 @@ int[] getTypesArray() { */ @Override public void close() throws IOException { - if (timer != null) { - timer.cancel(); + if (timerService != null) { + timerService.shutdown(); } if (upload != null) { flush(); @@ -331,7 +327,7 @@ public JDBCOutputFormatBuilder setSqlTypes(int[] typesArray) { return this; } - public JDBCOutputFormatBuilder setIdleConnectionCheckInterval(long interval) { + public JDBCOutputFormatBuilder setIdleConnectionCheckInterval(int interval) { format.idleConnectionCheckInterval = interval; return this; }