Skip to content

Commit

Permalink
Decouple the event type and the scheduled rebalance cache refresh opt…
Browse files Browse the repository at this point in the history
…ion. (apache#638)

The previous design is that both on-demand and periodic rebalance scheduling task will request for a cache refresh. This won't be always true moving forward.
For example, the WAGED rebalancer async baseline calculating requests for a scheduled rebalance. But cache refresh won't be necessary.
This PR does not change any business logic. It prepares for future feature change.
This PR ensures strict backward compatibility.
  • Loading branch information
jiajunwang authored and huizhilu committed Aug 16, 2020
1 parent 0d24791 commit c53102f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -221,17 +222,29 @@ public GenericHelixController(String clusterName, Set<Pipeline.Type> enabledPipe
class RebalanceTask extends TimerTask {
final HelixManager _manager;
final ClusterEventType _clusterEventType;
private final Optional<Boolean> _shouldRefreshCacheOption;
private long _nextRebalanceTime;

public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
this(manager, clusterEventType, -1);
}

public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime) {
this(manager, clusterEventType, nextRebalanceTime, Optional.empty());
}

public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime, boolean shouldRefreshCache) {
this(manager, clusterEventType, nextRebalanceTime, Optional.of(shouldRefreshCache));
}

public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType, long nextRebalanceTime) {
private RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
long nextRebalanceTime, Optional<Boolean> shouldRefreshCacheOption) {
_manager = manager;
_clusterEventType = clusterEventType;
_nextRebalanceTime = nextRebalanceTime;
_shouldRefreshCacheOption = shouldRefreshCacheOption;
}

public long getNextRebalanceTime() {
Expand All @@ -241,8 +254,9 @@ public long getNextRebalanceTime() {
@Override
public void run() {
try {
if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
.equals(ClusterEventType.OnDemandRebalance)) {
if (_shouldRefreshCacheOption.orElse(
_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
.equals(ClusterEventType.OnDemandRebalance))) {
requestDataProvidersFullRefresh();

HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Expand Down Expand Up @@ -360,7 +374,17 @@ public void scheduleRebalance(long rebalanceTime) {
* Schedule an on demand rebalance pipeline.
* @param delay
*/
@Deprecated
public void scheduleOnDemandRebalance(long delay) {
scheduleOnDemandRebalance(delay, true);
}

/**
* Schedule an on demand rebalance pipeline.
* @param delay
* @param shouldRefreshCache true if refresh the cache before scheduling a rebalance.
*/
public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
if (_helixManager == null) {
logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
_clusterName);
Expand All @@ -378,7 +402,8 @@ public void scheduleOnDemandRebalance(long delay) {
}

RebalanceTask newTask =
new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime,
shouldRefreshCache);

_onDemandRebalanceTimer.schedule(newTask, delay);
logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
Expand Down Expand Up @@ -1233,4 +1258,4 @@ private void initPipeline(Thread eventThread, BaseControllerDataProvider cache)
eventThread.setDaemon(true);
eventThread.start();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public static String[] parseStates(String clusterName, StateModelDefinition stat
}

public static void scheduleOnDemandPipeline(String clusterName, long delay) {
scheduleOnDemandPipeline(clusterName, delay, true);
}

public static void scheduleOnDemandPipeline(String clusterName, long delay,
boolean shouldRefreshCache) {
if (clusterName == null) {
LOG.error("Failed to issue a pipeline run. ClusterName is null.");
return;
Expand All @@ -153,7 +158,7 @@ public static void scheduleOnDemandPipeline(String clusterName, long delay) {
}
GenericHelixController controller = GenericHelixController.getController(clusterName);
if (controller != null) {
controller.scheduleOnDemandRebalance(delay);
controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
} else {
LOG.error("Failed to issue a pipeline. Controller for cluster {} does not exist.",
clusterName);
Expand Down

0 comments on commit c53102f

Please sign in to comment.