Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -172,13 +173,15 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
private boolean _inMaintenanceMode;

/**
* The timer that can periodically run the rebalancing pipeline. The timer will start if there is
* one resource group has the config to use the timer.
* The executors that can periodically run the rebalancing pipeline. A
* SingleThreadScheduledExecutor will start if there is resource group that has the config to do
* periodically rebalance.
*/
Timer _periodicalRebalanceTimer = null;
private static final ScheduledExecutorService _periodicalRebalanceExecutor =
Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture _periodicRebalanceFutureTask = null;
long _timerPeriod = Long.MAX_VALUE;


/**
* The timer that triggers the on-demand rebalance pipeline.
*/
Expand Down Expand Up @@ -329,19 +332,23 @@ private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
/**
* Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
* period is smaller than the current period, cancel the current timer and use the new period.
* note: For case where 2 threads change value from v0->v1 and v0->v2 at the same time, the
* result is indeterminable.
*/
void startPeriodRebalance(long period, HelixManager manager) {
if (period != _timerPeriod) {
logger.info("Controller starting periodical rebalance timer at period " + period);
if (_periodicalRebalanceTimer != null) {
_periodicalRebalanceTimer.cancel();
ScheduledFuture lastScheduledFuture;
synchronized (_periodicalRebalanceExecutor) {
lastScheduledFuture = _periodicRebalanceFutureTask;
_timerPeriod = period;
_periodicRebalanceFutureTask = _periodicalRebalanceExecutor
.scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
_timerPeriod, _timerPeriod, TimeUnit.MILLISECONDS);
}
if (lastScheduledFuture != null && !lastScheduledFuture.isCancelled()) {
lastScheduledFuture.cancel(false /* mayInterruptIfRunning */);
}
_periodicalRebalanceTimer =
new Timer("GenericHelixController_" + _clusterName + "_periodical_Timer", true);
_timerPeriod = period;
_periodicalRebalanceTimer
.scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
_timerPeriod, _timerPeriod);
} else {
logger.info("Controller already has periodical rebalance timer at period " + _timerPeriod);
}
Expand All @@ -352,11 +359,11 @@ void startPeriodRebalance(long period, HelixManager manager) {
*/
void stopPeriodRebalance() {
logger.info("Controller stopping periodical rebalance timer at period " + _timerPeriod);
if (_periodicalRebalanceTimer != null) {
_periodicalRebalanceTimer.cancel();
_periodicalRebalanceTimer = null;
_timerPeriod = Long.MAX_VALUE;
logger.info("Controller stopped periodical rebalance timer at period " + _timerPeriod);
synchronized (_periodicalRebalanceExecutor) {
if (_periodicRebalanceFutureTask != null && !_periodicRebalanceFutureTask.isCancelled()) {
_periodicRebalanceFutureTask.cancel(false /* mayInterruptIfRunning */);
_timerPeriod = Long.MAX_VALUE;
}
}
}

Expand Down Expand Up @@ -1300,6 +1307,12 @@ protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,

public void shutdown() throws InterruptedException {
stopPeriodRebalance();
_periodicalRebalanceExecutor.shutdown();
if (!_periodicalRebalanceExecutor
.awaitTermination(EVENT_THREAD_JOIN_TIMEOUT, TimeUnit.MILLISECONDS)) {
_periodicalRebalanceExecutor.shutdownNow();
}

shutdownOnDemandTimer();
logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
shutdownPipeline(_eventThread, _eventQueue);
Expand Down