Skip to content
Permalink
Browse files
HELIX-705: implement view cluster config change related logics and tests
RB=1205359
BUG=HELIX-705
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=lxia
  • Loading branch information
zhan849 authored and junkaixue committed Apr 11, 2022
1 parent e619e17 commit 7bfbb36444cb852f0ee6d5c20329877844221138
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 255 deletions.
@@ -19,203 +19,274 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.common.ClusterEventProcessor;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.view.common.ViewAggregatorEventAttributes;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.dataprovider.ViewClusterConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Main logic for Helix view aggregator
*/
public class HelixViewAggregator {
public class HelixViewAggregator implements ClusterConfigChangeListener {
private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
private static final int PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS = 3 * 1000;
private final String _viewClusterName;
private final HelixManager _viewClusterManager;
private ViewAggregationWorker _aggregationWorker;
private ViewConfigWorker _viewConfigWorker;
private long _lastViewClusterRefreshTimestampMs;
private HelixDataAccessor _dataAccessor;

// Worker that processes source cluster events and refresh view cluster
private ClusterEventProcessor _aggregator;
private boolean _refreshViewCluster;

// Worker that processes view cluster config change
private ClusterEventProcessor _viewConfigProcessor;

private Map<String, SourceClusterDataProvider> _dataProviderMap;
private ViewClusterConfigProvider _viewClusterConfigProvider;
private List<ViewClusterSourceConfig> _sourceConfigs;
private long _refreshPeriodMs;
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;

public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
_lastViewClusterRefreshTimestampMs = 0L;
_refreshPeriodMs = -1L;
_sourceConfigs = new ArrayList<>();
_dataProviderMap = new HashMap<>();
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
_refreshViewCluster = false;
_aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
@Override
public void handleEvent(ClusterEvent event) {
handleSourceClusterEvent(event);
}
};

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
_viewConfigProcessor = new ClusterEventProcessor(_viewClusterName, "ViewConfigProcessor") {
@Override
public void run() {
shutdown();
public void handleEvent(ClusterEvent event) {
handleViewClusterConfigChange(event);
}
}));
};
}

/**
* Start controller main logic
* @throws Exception
*/
public void start() throws Exception {
// Start workers
_aggregator.start();
_viewConfigProcessor.start();

// Setup manager
try {
_viewClusterManager.connect();
_viewClusterManager.addClusterfigChangeListener(this);
_dataAccessor = _viewClusterManager.getHelixDataAccessor();
} catch (Exception e) {
throw new HelixException("Failed to connect view cluster helix manager", e);
}

// set up view cluster refresher
// Set up view cluster refresher
_viewClusterRefresher =
new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
_dataProviderMap);

// Start workers
_aggregationWorker = new ViewAggregationWorker();
_aggregationWorker.start();
_viewConfigWorker = new ViewConfigWorker();
_viewConfigWorker.start();

// Start cluster config provider
_viewClusterConfigProvider =
new ViewClusterConfigProvider(_viewClusterName, _viewClusterManager, _viewConfigWorker);
_viewClusterConfigProvider.setup();
}

/**
* Process view cluster config change events
*/
private class ViewConfigWorker extends ClusterEventProcessor {
public ViewConfigWorker() {
super(_viewClusterName, "ViewConfigWorker");
public void shutdown() {
// Stop all workers
_aggregator.interrupt();
_viewConfigProcessor.interrupt();

// Stop timer
if (_viewClusterRefreshTimer != null) {
logger.info("Shutting down view cluster refresh timer");
_viewClusterRefreshTimer.cancel();
}
@Override
public void handleEvent(ClusterEvent event) {
logger.info("Processing event " + event.getEventType().name());
switch (event.getEventType()) {
case ClusterConfigChange:
processViewClusterConfigUpdate();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));

// disconnect manager
if (_viewClusterManager != null && _viewClusterManager.isConnected()) {
logger.info("Shutting down view cluster helix manager");
try {
_viewClusterManager.disconnect();
} catch (ZkInterruptedException zkintr) {
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
logger.error(String
.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
}

// Clean up all data providers
for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
logger
.info(String.format("Shutting data provider for source cluster %s", provider.getName()));
try {
provider.shutdown();
} catch (Exception e) {
logger.error(String
.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
_viewClusterName), e);
}
}
logger.info("HelixViewAggregator shutdown cleanly");
}

/**
* Process source cluster data change events and view cluster periodic refresh events
*/
private class ViewAggregationWorker extends ClusterEventProcessor {
private boolean _shouldRefresh;
@Override
@PreFetch(enabled = false)
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
_viewConfigProcessor
.queueEvent(new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange));
} else {
logger.info(String
.format("Skip processing view cluster config change with notification context type %s",
context == null ? "NoContext" : context.getType().name()));
}
}

public ViewAggregationWorker() {
super(_viewClusterName, "ViewAggregationWorker");
_shouldRefresh = false;
private void handleSourceClusterEvent(ClusterEvent event) {
logger.info("Processing event from source cluster " + event.getClusterName());
switch (event.getEventType()) {
case LiveInstanceChange:
case InstanceConfigChange:
case ExternalViewChange:
_refreshViewCluster = true;
break;
case ViewClusterPeriodicRefresh:
if (!_refreshViewCluster) {
logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
return;
}
// mark source cluster as changed to trigger next refresh as we failed to refresh at
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " + event.getEventType().name());
_refreshViewCluster = refreshViewCluster();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}

@Override
public void handleEvent(ClusterEvent event) {
logger.info("Processing event " + event.getEventType().name());

switch (event.getEventType()) {
case LiveInstanceChange:
case InstanceConfigChange:
case ExternalViewChange:
_shouldRefresh = true;
break;
case ViewClusterPeriodicRefresh:
Boolean forceRefresh =
event.getAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name());
if (!forceRefresh && !_shouldRefresh) {
logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
return;
private synchronized void handleViewClusterConfigChange(ClusterEvent event) {
logger.info("Processing view cluster event " + event.getEventType().name());
switch (event.getEventType()) {
case ClusterConfigChange:
// TODO: when clusterEventProcessor supports delayed scheduling,
// we should not have this head-of-line blocking but to have ClusterEventProcessor do the work.
// Currently it's acceptable as we can endure delay in processing view cluster config change
if (event.getAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name()) != null) {
try {
Thread.sleep(PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS);
} catch (InterruptedException e) {
logger.warn("Interrupted when backing off during process view config change retry", e);
}
}
// We always compare current cluster config with most up-to-date cluster config
ClusterConfig newClusterConfig =
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
SourceClusterConfigChangeAction action =
new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
action.computeAction();

// mark source cluster as changed to trigger next refresh as we failed to refresh at
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " + event.getEventType().name());
_shouldRefresh = refreshViewCluster();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
// If we fail to process action and should retry, re-queue event to retry
if (processViewClusterConfigUpdate(action)) {
event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
_viewConfigProcessor.queueEvent(event);
} else {
_curViewClusterConfig = newClusterConfig;
}
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}

private class RefreshViewClusterTask extends TimerTask {
@Override
public void run() {
triggerViewClusterRefresh(false);
logger.info("Triggering view cluster refresh");
_aggregator.queueEvent(
new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh));
}
}

public void shutdown() {
if (_viewClusterManager != null) {
logger.info("Shutting down view cluster helix manager");
_viewClusterManager.disconnect();
}

/**
* Recreate timer that triggers RefreshViewClusterTask
*/
private void resetTimer(long triggerIntervalMs) {
if (_viewClusterRefreshTimer != null) {
logger.info("Shutting down view cluster refresh timer");
_viewClusterRefreshTimer.cancel();
}

for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
logger
.info(String.format("Shutting data provider for source cluster %s", provider.getName()));
provider.shutdown();
}
logger.info("HelixViewAggregator shutdown cleanly");
RefreshViewClusterTask refreshTrigger = new RefreshViewClusterTask();
_viewClusterRefreshTimer = new Timer(true);
_viewClusterRefreshTimer.scheduleAtFixedRate(refreshTrigger, 0, triggerIntervalMs);
}

/**
* Recreate timer that triggers RefreshViewClusterTask
* Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
* create/delete SourceClusterDataProvider in data provider map
*
* @return true if action failed and should retry, else false
*/
private void resetTimer() {
// TODO: implement
}
private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
boolean shouldRetry = false;
for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
String key = generateDataProviderMapKey(source);
logger.info("Deleting data provider " + key);
if (_dataProviderMap.containsKey(key)) {
try {
_dataProviderMap.get(key).shutdown();
_dataProviderMap.remove(key);
} catch (Exception e) {
shouldRetry = true;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
}
}
}

/**
* Use ViewClusterConfigProvider (assume its up-to-date) to compute
* SourceClusterConfigChangeAction, based on _sourceConfigs. Use the action object to
* reset timer (RefreshViewClusterTask), create/delete/update SourceClusterDataProvider in
* data provider map and populate new _sourceConfigs
*/
private synchronized void processViewClusterConfigUpdate() {
// TODO: implement
}
for (ViewClusterSourceConfig source : action.getConfigsToAdd()) {
String key = generateDataProviderMapKey(source);
logger.info("Creating data provider " + key);
if (_dataProviderMap.containsKey(key)) {
// possibly due to a previous failure of shutting down, print warning and recreate for now
logger.warn(String.format("Add data provider %s which already exists. Recreating", key));
_dataProviderMap.remove(key);
}

/**
* push event to worker queue to trigger refresh. Worker might not refresh view cluster
* if there is no event happened since last refresh
* @param forceRefresh
*/
private void triggerViewClusterRefresh(boolean forceRefresh) {
ClusterEvent event = new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh);
event.addAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name(),
Boolean.valueOf(forceRefresh));
_aggregationWorker.queueEvent(event);
logger.info("Triggering view cluster refresh, forceRefresh=" + forceRefresh);
try {
SourceClusterDataProvider provider = new SourceClusterDataProvider(source, _aggregator);
provider.setup();
_dataProviderMap.put(key, provider);
} catch (Exception e) {
shouldRetry = true;
logger.warn(String.format("Failed to create data provider %s, will retry", key));
}
}

if (action.shouldResetTimer()) {
logger.info(
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
resetTimer(action.getCurrentRefreshPeriodMs());
}
return shouldRetry;
}

/**
@@ -231,7 +302,7 @@ private static String generateHelixManagerInstanceName(String viewClusterName) {
return String.format("HelixViewAggregator-%s", viewClusterName);
}

private static String generateSourceClusterDataProviderMapKey(ViewClusterSourceConfig config) {
private static String generateDataProviderMapKey(ViewClusterSourceConfig config) {
return String.format("%s-%s", config.getName(), config.getZkAddress());
}
}

0 comments on commit 7bfbb36

Please sign in to comment.