Skip to content

Commit

Permalink
FLUME-2786 FLUME-3056 FLUME-3117 Application enters a deadlock when s…
Browse files Browse the repository at this point in the history
…topped while handleConfigurationEvent

Adding better locking mechanism to Application class to prevent deadlock.

this closes #108
this closes #144

Revievers: Denes Arvay, Attila Simon, Benedict Jin, Ferenc Szabo

(Andras Beni, Yan Jian via Ferenc Szabo)
  • Loading branch information
Andras Beni authored and szaboferee committed Mar 9, 2018
1 parent d1f24f5 commit 0d43781
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 13 deletions.
45 changes: 34 additions & 11 deletions flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

public class Application {

Expand All @@ -65,6 +66,7 @@ public class Application {
private final LifecycleSupervisor supervisor;
private MaterializedConfiguration materializedConfiguration;
private MonitorService monitorServer;
private final ReentrantLock lifecycleLock = new ReentrantLock();

public Application() {
this(new ArrayList<LifecycleAware>(0));
Expand All @@ -75,23 +77,44 @@ public Application(List<LifecycleAware> components) {
supervisor = new LifecycleSupervisor();
}

public synchronized void start() {
for (LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
public void start() {
lifecycleLock.lock();
try {
for (LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
} finally {
lifecycleLock.unlock();
}
}

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
startAllComponents(conf);
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
lifecycleLock.lockInterruptibly();
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
logger.info("Interrupted while trying to handle configuration event");
return;
} finally {
// If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
lifecycleLock.unlock();
}
}
}

public synchronized void stop() {
supervisor.stop();
if (monitorServer != null) {
monitorServer.stop();
public void stop() {
lifecycleLock.lock();
try {
supervisor.stop();
if (monitorServer != null) {
monitorServer.stop();
}
} finally {
lifecycleLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ public void stop() {

executorService.shutdown();
try {
while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for file watcher to terminate");
if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
executorService.shutdownNow();
while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
LOGGER.debug("Waiting for file watcher to terminate");
}
}
} catch (InterruptedException e) {
LOGGER.debug("Interrupted while waiting for file watcher to terminate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,46 @@ public void testFLUME1854() throws Exception {
application.stop();
}
}

@Test(timeout = 10000L)
public void testFLUME2786() throws Exception {
final String agentName = "test";
final int interval = 1;
final long intervalMs = 1000L;

File configFile = new File(baseDir, "flume-conf.properties");
Files.copy(new File(getClass().getClassLoader()
.getResource("flume-conf.properties.2786").getFile()), configFile);
File mockConfigFile = spy(configFile);
when(mockConfigFile.lastModified()).then(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(intervalMs);
return System.currentTimeMillis();
}
});

EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(agentName,
mockConfigFile, eventBus, interval);
PollingPropertiesFileConfigurationProvider mockConfigurationProvider =
spy(configurationProvider);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Thread.sleep(intervalMs);
invocation.callRealMethod();
return null;
}
}).when(mockConfigurationProvider).stop();

List<LifecycleAware> components = Lists.newArrayList();
components.add(mockConfigurationProvider);
Application application = new Application(components);
eventBus.register(application);
application.start();
Thread.sleep(1500L);
application.stop();
}
}
35 changes: 35 additions & 0 deletions flume-ng-node/src/test/resources/flume-conf.properties.2786
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# Flume Configuration for testing FLUME-2786
#

test.sources = source1
test.channels = channel1
test.sinks = sink1

test.sources.source1.type = seq
test.sources.source1.totalEvents = 10000
test.sources.source1.channels = channel1

test.channels.channel1.type = memory
test.channels.channel1.capacity = 10000

test.sinks.sink1.type = null
test.sinks.sink1.channel = channel1

0 comments on commit 0d43781

Please sign in to comment.