From e8cd0f4ca49deede65bad14fe35b2d391750a30b Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Sun, 19 Feb 2017 15:45:00 +0100 Subject: [PATCH 1/4] Prevent application from hanging when it tries to reconfigure itself while stopping. --- .../java/org/apache/flume/node/Application.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index d6d92f0295..e29d1f1a3e 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -65,6 +65,7 @@ public class Application { private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; + private volatile boolean stopping = false; public Application() { this(new ArrayList(0)); @@ -83,16 +84,25 @@ public synchronized void start() { } @Subscribe - public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { - stopAllComponents(); - startAllComponents(conf); + public void handleConfigurationEvent(MaterializedConfiguration conf) { + if (stopping) { + logger.info("Will not handle Configuration Event while stopping"); + return; + } + synchronized (this) { + stopAllComponents(); + startAllComponents(conf); + + } } public synchronized void stop() { + stopping = true; supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); } + stopping = false; } private void stopAllComponents() { From 510c701375ff91a192af4cb9443e35642571b9b6 Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Mon, 27 Feb 2017 13:00:13 +0100 Subject: [PATCH 2/4] Breaks deadlock by making handleConfigurationEvent trying to lock interruptibly. --- .../org/apache/flume/node/Application.java | 44 ++++++++++++------- ...ngPropertiesFileConfigurationProvider.java | 8 +++- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index e29d1f1a3e..385235ea29 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -33,6 +33,7 @@ import org.apache.flume.Channel; import org.apache.flume.Constants; import org.apache.flume.Context; +import org.apache.flume.FlumeException; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; import org.apache.flume.instrumentation.MonitorService; @@ -52,6 +53,8 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class Application { @@ -65,7 +68,7 @@ public class Application { private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; - private volatile boolean stopping = false; + private volatile Lock lock = new ReentrantLock(); public Application() { this(new ArrayList(0)); @@ -76,33 +79,44 @@ public Application(List components) { supervisor = new LifecycleSupervisor(); } - public synchronized void start() { - for (LifecycleAware component : components) { - supervisor.supervise(component, - new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + public void start() { + lock.lock(); + try { + for (LifecycleAware component : components) { + supervisor.supervise(component, + new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); + } + } finally { + lock.unlock(); } } @Subscribe public void handleConfigurationEvent(MaterializedConfiguration conf) { - if (stopping) { - logger.info("Will not handle Configuration Event while stopping"); + try { + lock.lockInterruptibly(); + } catch (InterruptedException e) { + logger.info("Interrupted while trying to handle configuration event"); return; } - synchronized (this) { + try { stopAllComponents(); startAllComponents(conf); - + } finally { + lock.unlock(); } } - public synchronized void stop() { - stopping = true; - supervisor.stop(); - if (monitorServer != null) { - monitorServer.stop(); + public void stop() { + lock.lock(); + try { + supervisor.stop(); + if (monitorServer != null) { + monitorServer.stop(); + } + } finally { + lock.unlock(); } - stopping = false; } private void stopAllComponents() { diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java index 91a09f0064..13cb38f42c 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingPropertiesFileConfigurationProvider.java @@ -85,8 +85,12 @@ public void stop() { executorService.shutdown(); try { - while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { - LOGGER.debug("Waiting for file watcher to terminate"); + if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor."); + executorService.shutdownNow(); + while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) { + LOGGER.debug("Waiting for file watcher to terminate"); + } } } catch (InterruptedException e) { LOGGER.debug("Interrupted while waiting for file watcher to terminate"); From 6fcf76f3daf1a783fa371ec1616140a01a6f988d Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Tue, 28 Feb 2017 14:54:13 +0100 Subject: [PATCH 3/4] Rename lock object and correct its modifier --- .../java/org/apache/flume/node/Application.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 385235ea29..9be7143948 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -33,7 +33,6 @@ import org.apache.flume.Channel; import org.apache.flume.Constants; import org.apache.flume.Context; -import org.apache.flume.FlumeException; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; import org.apache.flume.instrumentation.MonitorService; @@ -68,7 +67,7 @@ public class Application { private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; - private volatile Lock lock = new ReentrantLock(); + private final Lock lifecycleLock = new ReentrantLock(); public Application() { this(new ArrayList(0)); @@ -80,21 +79,21 @@ public Application(List components) { } public void start() { - lock.lock(); + lifecycleLock.lock(); try { for (LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } } finally { - lock.unlock(); + lifecycleLock.unlock(); } } @Subscribe public void handleConfigurationEvent(MaterializedConfiguration conf) { try { - lock.lockInterruptibly(); + lifecycleLock.lockInterruptibly(); } catch (InterruptedException e) { logger.info("Interrupted while trying to handle configuration event"); return; @@ -103,19 +102,19 @@ public void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } finally { - lock.unlock(); + lifecycleLock.unlock(); } } public void stop() { - lock.lock(); + lifecycleLock.lock(); try { supervisor.stop(); if (monitorServer != null) { monitorServer.stop(); } } finally { - lock.unlock(); + lifecycleLock.unlock(); } } From 06403e693730095629bcac89cbeeb8a8bf223b74 Mon Sep 17 00:00:00 2001 From: Andras Beni Date: Tue, 14 Mar 2017 15:26:50 +0100 Subject: [PATCH 4/4] Clean up locking in handleConfigurationEvent --- .../java/org/apache/flume/node/Application.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java index 9be7143948..7893fcc9ce 100644 --- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java +++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java @@ -52,7 +52,6 @@ import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Application { @@ -67,7 +66,7 @@ public class Application { private final LifecycleSupervisor supervisor; private MaterializedConfiguration materializedConfiguration; private MonitorService monitorServer; - private final Lock lifecycleLock = new ReentrantLock(); + private final ReentrantLock lifecycleLock = new ReentrantLock(); public Application() { this(new ArrayList(0)); @@ -94,15 +93,16 @@ public void start() { public void handleConfigurationEvent(MaterializedConfiguration conf) { try { lifecycleLock.lockInterruptibly(); + stopAllComponents(); + startAllComponents(conf); } catch (InterruptedException e) { logger.info("Interrupted while trying to handle configuration event"); return; - } - try { - stopAllComponents(); - startAllComponents(conf); } finally { - lifecycleLock.unlock(); + // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock + if (lifecycleLock.isHeldByCurrentThread()) { + lifecycleLock.unlock(); + } } }