Skip to content
Permalink
Browse files
Add TASK_THREADPOOL_RESET_TIMEOUT as system property (#2177)
Add TASK_THREADPOOL_RESET_TIMEOUT as system property

Allow users to specify reset timeout with system property.
  • Loading branch information
qqu0127 committed Jul 20, 2022
1 parent 3b28b4e commit 49aef7c54a7f93f119f0b921068235d3dd52cd25
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 4 deletions.
@@ -34,6 +34,9 @@ public class SystemPropertyKeys {
// Task Driver
public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";

// Task executor threadpool reset timeout in ms
public static final String TASK_THREADPOOL_RESET_TIMEOUT = "helixTask.threadpool.resetTimeout";

// ZKHelixManager
public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";

@@ -19,6 +19,7 @@
* under the License.
*/

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -34,17 +35,20 @@
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -54,6 +58,7 @@ public class DefaultMessagingService implements ClusterMessagingService {
private final HelixTaskExecutor _taskExecutor;
// TODO:rename to factory, this is not a service
private final AsyncCallbackService _asyncCallbackService;
private final int _taskThreadpoolResetTimeout;

private static Logger _logger = LoggerFactory.getLogger(DefaultMessagingService.class);
ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded =
@@ -72,8 +77,12 @@ public DefaultMessagingService(HelixManager manager) {
new ParticipantStatusMonitor(isParticipant, manager.getInstanceName()),
new MessageQueueMonitor(manager.getClusterName(), manager.getInstanceName()));
_asyncCallbackService = new AsyncCallbackService();
_taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.name(),
_asyncCallbackService);

_taskThreadpoolResetTimeout = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT,
TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
_taskExecutor.registerMessageHandlerFactory(_asyncCallbackService, TaskExecutor.DEFAULT_PARALLEL_TASKS,
_taskThreadpoolResetTimeout);
}

@Override
@@ -335,6 +344,11 @@ public HelixTaskExecutor getExecutor() {
return _taskExecutor;
}

@VisibleForTesting
int getTaskThreadpoolResetTimeout() {
return _taskThreadpoolResetTimeout;
}

@Override
// TODO if the manager is not Participant or Controller, no reply, so should fail immediately
public int sendAndWait(Criteria recipientCriteria, Message message, AsyncCallback asyncCallback,
@@ -144,8 +144,6 @@ MessageHandlerFactory factory() {
private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
private static final String SESSION_SYNC = "SESSION-SYNC";

private static final int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200; // 200 ms

/**
* Map of MsgType->MsgHandlerFactoryRegistryItem
*/
@@ -25,6 +25,7 @@

public interface TaskExecutor {
int DEFAULT_PARALLEL_TASKS = 40;
int DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS = 200;

/**
* Register MultiType message handler factory that the executor can handle.
@@ -34,10 +34,12 @@
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.messaging.handling.MultiTypeMessageHandlerFactory;
import org.apache.helix.messaging.handling.TaskExecutor;
import org.apache.helix.mock.MockManager;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
@@ -291,4 +293,15 @@ public void TestMessageSend() {
Assert.assertTrue(
svc.getMessageHandlerFactoryMap().containsKey(Message.MessageType.CONTROLLER_MSG.name()));
}

@Test
public void testTaskThreadpoolResetTimeoutProperty() {
HelixManager manager = new MockManager();
System.setProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT, "300");
MockDefaultMessagingService svc = new MockDefaultMessagingService(manager);
Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), 300);
System.clearProperty(SystemPropertyKeys.TASK_THREADPOOL_RESET_TIMEOUT);
svc = new MockDefaultMessagingService(new MockManager());
Assert.assertEquals(svc.getTaskThreadpoolResetTimeout(), TaskExecutor.DEFAULT_MSG_HANDLER_RESET_TIMEOUT_MS);
}
}

0 comments on commit 49aef7c

Please sign in to comment.