Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -169,7 +168,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
// Controller will switch to run management mode pipeline when set to true.
private boolean _inManagementMode;
private final ClusterEventBlockingQueue _managementModeEventQueue;
private final ClusterEventProcessor _managementModeEventThread;

private final Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> _asyncFIFOWorkerPool;

Expand Down Expand Up @@ -207,7 +205,6 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
private long _lastPipelineEndTimestamp;

private final String _clusterName;
private final Set<Pipeline.Type> _enabledPipelineTypes;

private HelixManager _helixManager;

Expand All @@ -228,7 +225,7 @@ protected StatefulRebalancer createRebalancer(HelixManager helixManager) {
* 2) then ClusterDataCache.refresh triggers rebalance pipeline.
*/
/* Map of cluster->Set of GenericHelixController*/
private static Map<String, ImmutableSet<GenericHelixController>> _helixControllerFactory =
private static final Map<String, ImmutableSet<GenericHelixController>> _helixControllerFactory =
new ConcurrentHashMap<>();

public static GenericHelixController getLeaderController(String clusterName) {
Expand Down Expand Up @@ -308,7 +305,7 @@ class RebalanceTask extends TimerTask {
final HelixManager _manager;
final ClusterEventType _clusterEventType;
private final Optional<Boolean> _shouldRefreshCacheOption;
private long _nextRebalanceTime;
private final long _nextRebalanceTime;

public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
this(manager, clusterEventType, -1);
Expand Down Expand Up @@ -683,7 +680,6 @@ public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRe
private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry,
PipelineRegistry managementModeRegistry, final String clusterName,
Set<Pipeline.Type> enabledPipelineTypes) {
_enabledPipelineTypes = enabledPipelineTypes;
_registry = registry;
_taskRegistry = taskRegistry;
_managementModeRegistry = managementModeRegistry;
Expand All @@ -695,11 +691,8 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR
_clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);

_asyncTasksThreadPool =
Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
return new Thread(r, "HelixController-async_tasks-" + _clusterName);
}
});
Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE,
runnable -> new Thread(runnable, "HelixController-async_tasks-" + _clusterName));
_asyncFIFOWorkerPool = new HashMap<>();
initializeAsyncFIFOWorkers();

Expand All @@ -708,7 +701,7 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR

// TODO: refactor to simplify below similar code of the 3 pipelines
// initialize pipelines at the end so we have everything else prepared
if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
if (enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
_resourceControlDataProvider = new ResourceControllerDataProvider(clusterName);
_eventQueue = new ClusterEventBlockingQueue();
Expand All @@ -722,7 +715,7 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR
_eventThread = null;
}

if (_enabledPipelineTypes.contains(Pipeline.Type.TASK)) {
if (enabledPipelineTypes.contains(Pipeline.Type.TASK)) {
logger.info("Initializing {} pipeline", Pipeline.Type.TASK.name());
_workflowControlDataProvider = new WorkflowControllerDataProvider(clusterName);
_taskEventQueue = new ClusterEventBlockingQueue();
Expand All @@ -740,10 +733,10 @@ private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskR
_managementControllerDataProvider =
new ManagementControllerDataProvider(clusterName, Pipeline.Type.MANAGEMENT_MODE.name());
_managementModeEventQueue = new ClusterEventBlockingQueue();
_managementModeEventThread =
ClusterEventProcessor managementModeEventThread =
new ClusterEventProcessor(_managementControllerDataProvider, _managementModeEventQueue,
Pipeline.Type.MANAGEMENT_MODE.name() + "-" + clusterName);
initPipeline(_managementModeEventThread, _managementControllerDataProvider);
initPipeline(managementModeEventThread, _managementControllerDataProvider);
logger.info("Initialized {} pipeline", Pipeline.Type.MANAGEMENT_MODE.name());

addController(this);
Expand Down Expand Up @@ -876,7 +869,7 @@ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProv
} else {
// TODO: should be in the initialization of controller.
if (_resourceControlDataProvider != null) {
checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
checkRebalancingTimer(manager, Collections.emptyList(), dataProvider.getClusterConfig());
}
if (_isMonitoring) {
_clusterStatusMonitor.setEnabled(!_inManagementMode);
Expand Down Expand Up @@ -1017,8 +1010,8 @@ public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onStateChange()");
notifyCaches(changeContext, ChangeType.CURRENT_STATE);
pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext, Collections
.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
pushToEventQueues(ClusterEventType.CurrentStateChange, changeContext,
Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onStateChange()");
}

Expand All @@ -1028,8 +1021,8 @@ public void onTaskCurrentStateChange(String instanceName, List<CurrentState> sta
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onTaskCurrentStateChange()");
notifyCaches(changeContext, ChangeType.TASK_CURRENT_STATE);
pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext, Collections
.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
pushToEventQueues(ClusterEventType.TaskCurrentStateChange, changeContext,
Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onTaskCurrentStateChange()");
}

Expand All @@ -1047,17 +1040,14 @@ public void onCustomizedStateRootChange(String instanceName, List<String> custom

// TODO: remove the synchronization here once we move this update into dataCache.
synchronized (_lastSeenCustomizedStateTypesMapRef) {
Map<String, Set<String>> lastSeenCustomizedStateTypesMap =
_lastSeenCustomizedStateTypesMapRef.get();
Map<String, Set<String>> lastSeenCustomizedStateTypesMap = _lastSeenCustomizedStateTypesMapRef.get();
if (null == lastSeenCustomizedStateTypesMap) {
lastSeenCustomizedStateTypesMap = new HashMap();
lastSeenCustomizedStateTypesMap = new HashMap<>();
// lazy init the AtomicReference
_lastSeenCustomizedStateTypesMapRef.set(lastSeenCustomizedStateTypesMap);
}

if (!lastSeenCustomizedStateTypesMap.containsKey(instanceName)) {
lastSeenCustomizedStateTypesMap.put(instanceName, new HashSet<>());
}
lastSeenCustomizedStateTypesMap.putIfAbsent(instanceName, new HashSet<>());

Set<String> lastSeenCustomizedStateTypes = lastSeenCustomizedStateTypesMap.get(instanceName);

Expand Down Expand Up @@ -1092,8 +1082,8 @@ public void onCustomizedStateChange(String instanceName, List<CustomizedState> s
NotificationContext changeContext) {
logger.info("START: GenericClusterController.onCustomizedStateChange()");
notifyCaches(changeContext, ChangeType.CUSTOMIZED_STATE);
pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext, Collections
.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
pushToEventQueues(ClusterEventType.CustomizedStateChange, changeContext,
Collections.singletonMap(AttributeName.instanceName.name(), instanceName));
logger.info("END: GenericClusterController.onCustomizedStateChange()");
}

Expand All @@ -1104,7 +1094,7 @@ public void onMessage(String instanceName, List<Message> messages,
logger.info("START: GenericClusterController.onMessage() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.MESSAGE);
pushToEventQueues(ClusterEventType.MessageChange, changeContext,
Collections.<String, Object>singletonMap(AttributeName.instanceName.name(), instanceName));
Collections.singletonMap(AttributeName.instanceName.name(), instanceName));

logger.info("END: GenericClusterController.onMessage() for cluster " + _clusterName);
}
Expand Down Expand Up @@ -1133,14 +1123,13 @@ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
}

pushToEventQueues(ClusterEventType.LiveInstanceChange, changeContext,
Collections.<String, Object>singletonMap(AttributeName.eventData.name(), liveInstances));
Collections.singletonMap(AttributeName.eventData.name(), liveInstances));

logger.info(
"END: Generic GenericClusterController.onLiveInstanceChange() for cluster " + _clusterName);
}

private void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates,
ClusterConfig clusterConfig) {
private void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates, ClusterConfig clusterConfig) {
if (manager.getConfigAccessor() == null) {
logger.warn(manager.getInstanceName()
+ " config accessor doesn't exist. should be in file-based mode.");
Expand Down Expand Up @@ -1176,8 +1165,7 @@ public void onIdealStateChange(List<IdealState> idealStates, NotificationContext
logger.info(
"START: Generic GenericClusterController.onIdealStateChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.IDEAL_STATE);
pushToEventQueues(ClusterEventType.IdealStateChange, changeContext,
Collections.<String, Object>emptyMap());
pushToEventQueues(ClusterEventType.IdealStateChange, changeContext, Collections.emptyMap());

if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
HelixManager manager = changeContext.getManager();
Expand All @@ -1198,10 +1186,8 @@ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
logger.info(
"START: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName);
notifyCaches(changeContext, ChangeType.INSTANCE_CONFIG);
pushToEventQueues(ClusterEventType.InstanceConfigChange, changeContext,
Collections.<String, Object>emptyMap());
logger.info(
"END: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName);
pushToEventQueues(ClusterEventType.InstanceConfigChange, changeContext, Collections.emptyMap());
logger.info("END: GenericClusterController.onInstanceConfigChange() for cluster " + _clusterName);
}

@Override
Expand All @@ -1211,10 +1197,8 @@ public void onResourceConfigChange(
logger.info(
"START: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.RESOURCE_CONFIG);
pushToEventQueues(ClusterEventType.ResourceConfigChange, context,
Collections.<String, Object>emptyMap());
logger
.info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
pushToEventQueues(ClusterEventType.ResourceConfigChange, context, Collections.emptyMap());
logger.info("END: GenericClusterController.onResourceConfigChange() for cluster " + _clusterName);
}

@Override
Expand All @@ -1226,8 +1210,7 @@ public void onCustomizedStateConfigChange(
"START: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+ _clusterName);
notifyCaches(context, ChangeType.CUSTOMIZED_STATE_CONFIG);
pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context,
Collections.<String, Object> emptyMap());
pushToEventQueues(ClusterEventType.CustomizeStateConfigChange, context, Collections.emptyMap());
logger.info(
"END: GenericClusterController.onCustomizedStateConfigChange() for cluster "
+ _clusterName);
Expand All @@ -1240,8 +1223,7 @@ public void onClusterConfigChange(ClusterConfig clusterConfig,
logger.info(
"START: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
notifyCaches(context, ChangeType.CLUSTER_CONFIG);
pushToEventQueues(ClusterEventType.ClusterConfigChange, context,
Collections.<String, Object>emptyMap());
pushToEventQueues(ClusterEventType.ClusterConfigChange, context, Collections.emptyMap());
logger
.info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName);
}
Expand Down Expand Up @@ -1586,7 +1568,7 @@ private void initPipeline(Thread eventThread, BaseControllerDataProvider cache)
* A wrapper class for the stateful rebalancer instance that will be tracked in the
* GenericHelixController.
*/
private abstract class StatefulRebalancerRef<T extends StatefulRebalancer> {
private abstract static class StatefulRebalancerRef<T extends StatefulRebalancer> {
private T _rebalancer = null;
private boolean _isRebalancerValid = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -73,7 +74,7 @@
* Design Document
* </a>
*/
public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);

// When any of the following change happens, the rebalancer needs to do a global rebalance which
Expand Down Expand Up @@ -259,7 +260,7 @@ public void close() {
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet());
validateInput(clusterData, resourceMap);

Map<String, IdealState> newIdealStates;
Expand Down Expand Up @@ -311,8 +312,7 @@ public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvi
newStateMap == null ? Collections.emptyMap() : newStateMap);
}
});
LOG.info("Finish computing new ideal states for resources: {}",
resourceMap.keySet().toString());
LOG.info("Finish computing new ideal states for resources: {}", resourceMap.keySet());
return newIdealStates;
}

Expand Down Expand Up @@ -358,9 +358,7 @@ protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
// Perform global rebalance for a new baseline assignment
globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
// Perform partial rebalance for a new best possible assignment
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
return newAssignment;
return partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
}

/**
Expand Down Expand Up @@ -614,7 +612,7 @@ private void validateInput(ResourceControllerDataProvider clusterData,
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
"Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s",
nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT);
nonCompatibleResources), HelixRebalanceException.Type.INVALID_INPUT);
}
}

Expand Down
Loading