Skip to content
Permalink
Browse files
Optimize HelixTaskExecutor reset() in event of shutdown (#2183)
Optimize HelixTaskExecutor reset() in event of shutdown

Some instances maybe reset() multiple times during participant shutdown.
This commit refactor logic in HelixTaskExecutor to reduce unnecessary
method call.
  • Loading branch information
qqu0127 committed Aug 11, 2022
1 parent 11846cc commit 3ea5d4a000d545381b1f280e17fa5ce3608607e2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 20 deletions.
@@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -136,6 +137,8 @@ MessageHandlerFactory factory() {
private final ParticipantStatusMonitor _monitor;
public static final String MAX_THREADS = "maxThreads";

// true if all partition state are "clean" as same after reset()
private volatile boolean _isCleanState = true;
private MessageQueueMonitor _messageQueueMonitor;
private GenericHelixController _controller;
private Long _lastSessionSyncTime;
@@ -677,13 +680,10 @@ private void syncFactoryState() {
}
}

void reset() {
LOG.info("Reset HelixTaskExecutor");

if (_messageQueueMonitor != null) {
_messageQueueMonitor.reset();
}

/**
* Shutdown the registered thread pool executors. This method will be no-op if called repeatedly.
*/
private void shutdownExecutors() {
synchronized (_hdlrFtyRegistry) {
for (String msgType : _hdlrFtyRegistry.keySet()) {
// don't un-register factories, just shutdown all executors
@@ -694,26 +694,45 @@ void reset() {
LOG.info("Reset executor for msgType: " + msgType + ", pool: " + pool);
shutdownAndAwaitTermination(pool, item);
}

if (item.factory() != null) {
try {
item.factory().reset();
} catch (Exception ex) {
LOG.error("Failed to reset the factory {} of message type {}.", item.factory().toString(),
msgType, ex);
}
}
}
}
}

synchronized void reset() {
if (_isCleanState) {
LOG.info("HelixTaskExecutor is in clean state, no need to reset again");
return;
}
LOG.info("Reset HelixTaskExecutor");

if (_messageQueueMonitor != null) {
_messageQueueMonitor.reset();
}

shutdownExecutors();

synchronized (_hdlrFtyRegistry) {
_hdlrFtyRegistry.values()
.stream()
.map(MsgHandlerFactoryRegistryItem::factory)
.distinct()
.filter(Objects::nonNull)
.forEach(factory -> {
try {
factory.reset();
} catch (Exception ex) {
LOG.error("Failed to reset the factory {}.", factory.toString(), ex);
}
});
}
// threads pool specific to STATE_TRANSITION.Key specific pool are not shut down.
// this is a potential area to improve. https://github.com/apache/helix/issues/1245

StringBuilder sb = new StringBuilder();
// Log all tasks that fail to terminate
for (String taskId : _taskMap.keySet()) {
MessageTaskInfo info = _taskMap.get(taskId);
sb.append(
"Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
sb.append("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage() + "\n");
}

LOG.info(sb.toString());
@@ -724,6 +743,7 @@ void reset() {
_knownMessageIds.clear();

_lastSessionSyncTime = null;
_isCleanState = true;
}

void init() {
@@ -744,7 +764,7 @@ void init() {
_monitor.createExecutorMonitor(type, newPool);
return newPool;
});
LOG.info("Setup the thread pool for type: %s, isShutdown: %s", msgType, pool.isShutdown());
LOG.info("Setup the thread pool for type: {}, isShutdown: {}", msgType, pool.isShutdown());
}
}

@@ -835,6 +855,7 @@ public void onMessage(String instanceName, List<Message> messages,
init();
// continue to process messages
}
_isCleanState = false;

// if prefetch is disabled in MessageListenerCallback, we need to read all new messages from zk.
if (messages == null || messages.isEmpty()) {
@@ -1442,7 +1463,7 @@ private void sendNopMessage(HelixDataAccessor accessor, String instanceName) {
nopMsg.setTgtName(instanceName);
accessor
.setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(), nopMsg.getId()), nopMsg);
LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
LOG.info("Send NO_OP message to {}, msgId: {}.", nopMsg.getTgtName(), nopMsg.getId());
} catch (Exception e) {
LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
}
@@ -1454,6 +1475,7 @@ public void shutdown() {
_isShuttingDown = true;
_timer.cancel();

shutdownExecutors();
reset();
_monitor.shutDown();
LOG.info("Shutdown HelixTaskExecutor finished");
@@ -164,6 +164,7 @@ public void sync() {
}

private void loopStateModelFactories(Consumer<StateModel> consumer) {
// TODO: evaluate impact and consider parallelization
for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap
.values()) {
for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) {
@@ -132,6 +132,21 @@ public List<String> getMessageTypes() {
}
}

private class TestMessageHandlerFactory3 extends TestMessageHandlerFactory {
private boolean _resetDone = false;

@Override
public List<String> getMessageTypes() {
return ImmutableList.of("msgType1", "msgType2", "msgType3");
}

@Override
public void reset() {
Assert.assertFalse(_resetDone, "reset() should only be triggered once in TestMessageHandlerFactory3");
_resetDone = true;
}
}

class CancellableHandlerFactory implements MultiTypeMessageHandlerFactory {

int _handlersCreated = 0;
@@ -798,6 +813,29 @@ public void testHandlerResetTimeout() throws Exception {
System.out.println("END TestCMTaskExecutor.testHandlerResetTimeout()");
}

@Test
public void testMsgHandlerRegistryAndShutdown() {
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
TestMessageHandlerFactory3 factoryMulti = new TestMessageHandlerFactory3();
executor.registerMessageHandlerFactory(factory, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);
executor.registerMessageHandlerFactory(factoryMulti, HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, 200);

final Message msg = new Message(factory.getMessageTypes().get(0), UUID.randomUUID().toString());
msg.setTgtSessionId("*");
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");

NotificationContext changeContext = new NotificationContext(manager);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage("some", Collections.singletonList(msg), changeContext);
Assert.assertEquals(executor._hdlrFtyRegistry.size(), 4);
// Ensure TestMessageHandlerFactory3 instance is reset and reset exactly once
executor.shutdown();
Assert.assertTrue(factoryMulti._resetDone, "TestMessageHandlerFactory3 should be reset");
}

@Test()
public void testNoRetry() throws InterruptedException {
System.out.println("START " + TestHelper.getTestMethodName());

0 comments on commit 3ea5d4a

Please sign in to comment.