Skip to content
Permalink
Browse files
HELIX-705: implemented SourceClusterDataProvider's core logic and rel…
…ated tests

RB=1205694
BUG=HELIX-775
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
  • Loading branch information
zhan849 authored and junkaixue committed Apr 11, 2022
1 parent 7bfbb36 commit 440d109db0e275c0b5938c2ac79ee787706a2416
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 90 deletions.
@@ -51,46 +51,66 @@
public class SourceClusterDataProvider extends BasicClusterDataCache
implements InstanceConfigChangeListener, LiveInstanceChangeListener,
ExternalViewChangeListener {
private HelixManager _helixManager;
private final HelixManager _helixManager;
private final ViewClusterSourceConfig _sourceClusterConfig;
private final ClusterEventProcessor _eventProcessor;

private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _propertyKeyBuilder;
private ViewClusterSourceConfig _sourceClusterConfig;
private ClusterEventProcessor _eventProcessor;

public SourceClusterDataProvider(ViewClusterSourceConfig config,
ClusterEventProcessor eventProcessor) {
super(config.getName());
_eventProcessor = eventProcessor;
_sourceClusterConfig = config;
_helixManager = HelixManagerFactory
.getZKHelixManager(config.getName(), generateHelixManagerInstanceName(config.getName()),
InstanceType.SPECTATOR, config.getZkAddress());
requireFullRefreshBasedOnConfig();
_helixManager = HelixManagerFactory.getZKHelixManager(config.getName(),
generateHelixManagerInstanceName(config.getName()),
InstanceType.SPECTATOR, config.getZkAddress());
}

public String getName() {
return _helixManager.getInstanceName();
}

public ViewClusterSourceConfig getSourceClusterConfig() {
return _sourceClusterConfig;
}

/**
* Set up ClusterDataProvider. After setting up, the class should start listening
* on change events and perform corresponding reactions
* @throws Exception
*/
public void setup() throws Exception {
if (_helixManager != null && _helixManager.isConnected()) {
LOG.info(String.format("Data provider %s is already setup", _helixManager.getInstanceName()));
return;
}
try {
LOG.info(String.format("%s setting up ...", _helixManager.getInstanceName()));
_helixManager.connect();
_helixManager.addInstanceConfigChangeListener(this);
_helixManager.addLiveInstanceChangeListener(this);
_helixManager.addExternalViewChangeListener(this);
for (PropertyType property : _sourceClusterConfig.getProperties()) {
HelixConstants.ChangeType changeType;
switch (property) {
case INSTANCES:
_helixManager.addInstanceConfigChangeListener(this);
changeType = HelixConstants.ChangeType.INSTANCE_CONFIG;
break;
case LIVEINSTANCES:
_helixManager.addLiveInstanceChangeListener(this);
changeType = HelixConstants.ChangeType.LIVE_INSTANCE;
break;
case EXTERNALVIEW:
_helixManager.addExternalViewChangeListener(this);
changeType = HelixConstants.ChangeType.EXTERNAL_VIEW;
break;
default:
LOG.warn(String
.format("Unsupported property type: %s. Skip adding listener", property.name()));
continue;
}
notifyDataChange(changeType);
}
_dataAccessor = _helixManager.getHelixDataAccessor();
_propertyKeyBuilder = _dataAccessor.keyBuilder();
LOG.info(String.format("%s started.", _helixManager.getInstanceName()));
LOG.info(String
.format("%s started. Source cluster detail: %s", _helixManager.getInstanceName(),
_sourceClusterConfig.toString()));
} catch(Exception e) {
shutdown();
throw e;
@@ -105,6 +125,8 @@ public void shutdown() {
if (_helixManager != null && _helixManager.isConnected()) {
try {
_helixManager.disconnect();
LOG.info(
String.format("Data provider %s shutdown cleanly.", _helixManager.getInstanceName()));
} catch (ZkInterruptedException e) {
// OK
}
@@ -116,130 +138,73 @@ public void refreshCache() {
}

/**
* Re-list current instance config names. ListName is a more reliable way to find
* Get current instance config names. ListName is a more reliable way to find
* current instance config names. This is needed for ViewClusterRefresher when
* it is generating diffs to push to ViewCluster
* @return
*/
public List<String> listInstanceConfigNames() {
public List<String> getInstanceConfigNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.instanceConfigs());
}

/**
* Re-list current live instance names.
* Get current live instance names.
* @return
*/
public List<String> listLiveInstanceNames() {
public List<String> getLiveInstanceNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.liveInstances());
}

/**
* re-list external view names
* Get external view names
* @return
*/
public List<String> listExternalViewNames() {
public List<String> getExternalViewNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.externalViews());
}

/**
* Based on current ViewClusterSourceConfig, decide whether caller should
* read instance config. Used by ViewClusterRefresher
* @return
*/
public boolean shouldReadInstanceConfigs() {
// TODO: implement logic
return false;
}

/**
* Based on current ViewClusterSourceConfig, decide whether caller should
* read live instances. Used by ViewClusterRefresher
* @return
*/
public boolean shouldReadLiveInstances() {
// TODO: implement logic
return false;
}

/**
* Based on current ViewClusterSourceConfig, decide whether caller should
* read external view. Used by ViewClusterRefresher
* @return
*/
public boolean shouldReadExternalViews() {
// TODO: implement logic
return false;
public List<PropertyType> getPropertiesToAggregate() {
return _sourceClusterConfig.getProperties();
}

@Override
@PreFetch(enabled = false)
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
queueEventToProcessor(context, ClusterEventType.InstanceConfigChange,
queueEvent(context, ClusterEventType.InstanceConfigChange,
HelixConstants.ChangeType.INSTANCE_CONFIG);
}

@Override
@PreFetch(enabled = false)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
queueEventToProcessor(changeContext, ClusterEventType.LiveInstanceChange,
queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}

@Override
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
queueEventToProcessor(changeContext, ClusterEventType.ExternalViewChange,
queueEvent(changeContext, ClusterEventType.ExternalViewChange,
HelixConstants.ChangeType.EXTERNAL_VIEW);
}

private void queueEventToProcessor(NotificationContext context,
private void queueEvent(NotificationContext context,
ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
throws IllegalStateException {
if (!shouldProcessEvent(cacheChangeType)) {
LOG.info(String.format(
"Skip processing event based on ViewClusterSourceConfig: ClusterName=%s; ClusterEventType=%s, ChangeType=%s",
_clusterName, clusterEventType, cacheChangeType));
return;
}
ClusterEvent event = new ClusterEvent(_clusterName, clusterEventType);
// TODO: in case of FINALIZE, if we are not shutdown, re-connect helix manager and report error
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
notifyDataChange(cacheChangeType);
_eventProcessor.queueEvent(event);
_eventProcessor.queueEvent(new ClusterEvent(_clusterName, clusterEventType));
} else {
LOG.info(String.format("SourceClusterDataProvider: skip queuing event %s", event));
LOG.info(String.format("Skip queuing event. EventType: %s, ChangeType: %s, ContextType: %s",
clusterEventType.name(), cacheChangeType.name(),
context == null ? "NoContext" : context.getType().name()));
}
}

/**
* Replace current source cluster config properties (A list of PropertyType we should aggregate)
* with the given ones, and update corresponding provider mechanisms
* @param properties
*/
public synchronized void setSourceClusterConfigProperty(List<PropertyType> properties) {
// TODO: implement logic
}

/**
* Based on current ViewClusterSourceConfig, notify cache accordingly.
*/
private void requireFullRefreshBasedOnConfig() {
// TODO: implement logic
}

/**
* Check source cluster config and decide whether this event should be processed
* @param sourceClusterChangeType
* @return
*/
private synchronized boolean shouldProcessEvent(
HelixConstants.ChangeType sourceClusterChangeType) {
// TODO: implement
return false;
}

private static String generateHelixManagerInstanceName(String clusterName) {
return String.format("SourceClusterSpectatorHelixManager-%s", clusterName);
}
@@ -0,0 +1,144 @@
package org.apache.helix.view.integration;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.mock.MockClusterEventProcessor;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestSourceClusterDataProvider extends ViewAggregatorIntegrationTestBase {
private static final int numSourceCluster = 1;
private static final String stateModel = "MasterSlave";
private static final String testResource = "restResource";

@Test
public void testSourceClusterDataProviderWatchAndRefresh() throws Exception {
String clusterName = _allSourceClusters.get(0);
List<PropertyType> properties = Arrays.asList(
new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
PropertyType.INSTANCES
});

ViewClusterSourceConfig sourceClusterConfig =
new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);

MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
processor.start();

SourceClusterDataProvider dataProvider =
new SourceClusterDataProvider(sourceClusterConfig, processor);

// setup can be re-called
dataProvider.setup();
dataProvider.setup();

Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
new HashSet<>(properties));

// When first connected, data provider will have some initial events
Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
processor.resetHandledEventCount();

// ListNames should work
Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numPaticipantCount);
Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numPaticipantCount);
Assert.assertEquals(dataProvider.getExternalViewNames().size(), 0);

processor.resetHandledEventCount();

// rebalance resource to check external view related events
_gSetupTool.addResourceToCluster(clusterName, testResource, numPaticipantCount, stateModel);
_gSetupTool.rebalanceResource(clusterName, testResource, 3);
Thread.sleep(1000);
Assert.assertTrue(processor.getHandledExternalViewChangeCount() > 0);
Assert.assertEquals(dataProvider.getExternalViewNames().size(), 1);

// refresh data provider will have correct data loaded
dataProvider.refreshCache();
Assert.assertEquals(dataProvider.getLiveInstances().size(), numPaticipantCount);
Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numPaticipantCount);
Assert.assertEquals(dataProvider.getExternalViews().size(), 1);
processor.resetHandledEventCount();

// Add additional participant will have corresponding change
String testParticipantName = "testParticipant";
_gSetupTool.addInstanceToCluster(clusterName, testParticipantName);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
participant.syncStart();

Thread.sleep(500);
Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);

// shutdown can be re-called
dataProvider.shutdown();
dataProvider.shutdown();

// Verify cache is cleaned up
Assert.assertEquals(dataProvider.getLiveInstances().size(), 0);
Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), 0);
Assert.assertEquals(dataProvider.getExternalViews().size(), 0);
}

@Test
public void testSourceClusterDataProviderPropertyFilter() throws Exception {
String clusterName = _allSourceClusters.get(0);
List<PropertyType> properties = Arrays.asList(
new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW });

ViewClusterSourceConfig sourceClusterConfig =
new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);

MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
processor.start();

SourceClusterDataProvider dataProvider =
new SourceClusterDataProvider(sourceClusterConfig, processor);
dataProvider.setup();

Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
new HashSet<>(properties));

// When first connected, data provider will have some initial events, but InstanceConfig
// will be filtered out since its not in properties
Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 0);
Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);

dataProvider.shutdown();
}

@Override
protected int getNumSourceCluster() {
return numSourceCluster;
}

}

0 comments on commit 440d109

Please sign in to comment.