Skip to content

Commit

Permalink
Return idle connection from Consumer to avoid timeout
Browse files Browse the repository at this point in the history
Fixes #732
  • Loading branch information
Brokkonaut committed Dec 11, 2018
1 parent 9f8fd3e commit 31ef2d9
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/main/java/de/diddiz/LogBlock/Consumer.java
Expand Up @@ -56,6 +56,7 @@
public class Consumer extends Thread {
private static final int MAX_SHUTDOWN_TIME_MILLIS = 20000;
private static final int WAIT_FOR_CONNECTION_TIME_MILLIS = 10000;
private static final int RETURN_IDLE_CONNECTION_TIME_MILLIS = 120000;
private static final int RETRIES_ON_UNKNOWN_CONNECTION_ERROR = 2;

private final Deque<Row> queue = new ArrayDeque<Row>();
Expand Down Expand Up @@ -434,7 +435,6 @@ public void run() {
try {
if (conn == null) {
batchHelper.reset();
logblock.getLogger().info("[Consumer] Connecting to the database!");
conn = logblock.getConnection();
if (conn != null) {
// initialize connection
Expand Down Expand Up @@ -476,7 +476,19 @@ public void run() {
if (r == null) {
try {
if (currentRows.isEmpty() && !shutdown) {
queue.wait(); // nothing to do for us
// nothing to do for us
// wait some time before closing the connection
queue.wait(RETURN_IDLE_CONNECTION_TIME_MILLIS);
// if there is still nothing to do, close the connection and go to sleep
if (queue.isEmpty() && !shutdown) {
try {
conn.close();
} catch (Exception e) {
// ignored
}
conn = null;
queue.wait();
}
} else {
processBatch = true;
}
Expand All @@ -499,7 +511,7 @@ public void run() {
r.process(conn, batchHelper);
}
}
if (currentRows.size() >= (processBatch ? 1 : (Config.forceToProcessAtLeast * 10))) {
if (currentRows.size() >= Math.max((processBatch ? 1 : (Config.forceToProcessAtLeast * 10)), 1)) {
batchHelper.processStatements(conn);
conn.commit();
currentRows.clear();
Expand Down

0 comments on commit 31ef2d9

Please sign in to comment.