Skip to content

Commit

Permalink
Update dependencies and fix compile errors
Browse files Browse the repository at this point in the history
Unit test fix and code style improvement.
  • Loading branch information
qqu0127 authored and junkaixue committed Apr 11, 2022
1 parent 258f49a commit 764d826
Show file tree
Hide file tree
Showing 25 changed files with 641 additions and 335 deletions.
Expand Up @@ -19,40 +19,37 @@
* under the License.
*/

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.helix.PropertyType;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;

/**
* Represents source physical cluster information for view cluster
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class ViewClusterSourceConfig {
private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(
Arrays.asList(PropertyType.INSTANCES, PropertyType.EXTERNALVIEW, PropertyType.LIVEINSTANCES));
private static final ObjectMapper _objectMapper = new ObjectMapper();

private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(Arrays
.asList(new PropertyType[] { PropertyType.INSTANCES, PropertyType.EXTERNALVIEW,
PropertyType.LIVEINSTANCES
}));

private static ObjectMapper _objectMapper = new ObjectMapper();

@JsonProperty("name")
private String _name;

@JsonProperty("zkAddress")
String _zkAddress;

@JsonProperty("properties")
private final String _name;
private final String _zkAddress;
private List<PropertyType> _properties;

private ViewClusterSourceConfig() {
}

public ViewClusterSourceConfig(String name, String zkAddress, List<PropertyType> properties) {
@JsonCreator
public ViewClusterSourceConfig(
@JsonProperty("name") String name,
@JsonProperty("zkAddress") String zkAddress,
@JsonProperty("properties") List<PropertyType> properties
) {
_name = name;
_zkAddress = zkAddress;
_properties = properties;
Expand All @@ -62,14 +59,6 @@ public ViewClusterSourceConfig(ViewClusterSourceConfig config) {
this(config.getName(), config.getZkAddress(), new ArrayList<>(config.getProperties()));
}

public void setName(String name) {
_name = name;
}

public void setZkAddress(String zkAddress) {
_zkAddress = zkAddress;
}

public void setProperties(List<PropertyType> properties) {
for (PropertyType p : properties) {
if (!_validPropertyTypes.contains(p)) {
Expand All @@ -92,8 +81,13 @@ public List<PropertyType> getProperties() {
return _properties;
}

@JsonIgnore
public static List<PropertyType> getValidPropertyTypes() {
return _validPropertyTypes;
}

public String toJson() throws IOException {
return new ObjectMapper().writeValueAsString(this);
return _objectMapper.writeValueAsString(this);
}

public String toString() {
Expand Down Expand Up @@ -121,4 +115,4 @@ public static ViewClusterSourceConfig fromJson(String jsonString) {
String.format("Invalid Json: %s, Exception: %s", jsonString, e.toString()));
}
}
}
}
Expand Up @@ -42,9 +42,4 @@ public ClusterEventProcessor(String clusterName, String processorName) {
public void queueEvent(ClusterEvent event) {
_eventQueue.put(event.getEventType(), event);
}

public void shutdown() {
_eventQueue.clear();
this.interrupt();
}
}
Expand Up @@ -19,11 +19,13 @@
* under the License.
*/

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.model.ExternalView;
Expand All @@ -46,10 +48,6 @@ public class BasicClusterDataCache implements ControlContextProvider {
protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
protected ExternalViewCache _externalViewCache;
protected Map<String, LiveInstance> _liveInstanceMap;
protected Map<String, InstanceConfig> _instanceConfigMap;
protected Map<String, ExternalView> _externalViewMap;
private final PropertyType _sourceDataType;

protected String _clusterName;

Expand Down Expand Up @@ -110,12 +108,12 @@ public void refresh(HelixDataAccessor accessor) {
LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();

if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.EXTERNAL_VIEW, false)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
_externalViewCache.refresh(accessor);
}

if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
_propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, true);
Expand All @@ -124,7 +122,7 @@ public void refresh(HelixDataAccessor accessor) {
+ ". Takes " + (System.currentTimeMillis() - start) + " ms");
}

if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
_instanceConfigPropertyCache.refresh(accessor);
Expand Down Expand Up @@ -196,15 +194,17 @@ public void notifyDataChange(HelixConstants.ChangeType changeType) {
*/
public synchronized void clearCache(HelixConstants.ChangeType changeType) {
switch (changeType) {
case LIVE_INSTANCE:
case INSTANCE_CONFIG:
LOG.warn("clearCache is deprecated for changeType: {}.", changeType);
break;
case EXTERNAL_VIEW:
_externalViewCache.clear();
break;
default:
break;
case LIVE_INSTANCE:
_liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
break;
case INSTANCE_CONFIG:
_instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
break;
case EXTERNAL_VIEW:
_externalViewCache.clear();
break;
default:
break;
}
}

Expand Down
Expand Up @@ -93,7 +93,6 @@ public List<Node> getFaultZones() {
}
return Collections.emptyList();
}

/**
* Returns all leaf nodes that belong in the tree. Returns itself if this node is a leaf.
*
Expand Down
Expand Up @@ -39,7 +39,6 @@ public enum ClusterEventType {
OnDemandRebalance,
ControllerChange,
RetryRebalance,
ViewClusterPeriodicRefresh,
StateVerifier,
Unknown
}
61 changes: 61 additions & 0 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,6 +38,7 @@
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

/**
* Cluster configurations
Expand Down Expand Up @@ -85,6 +87,10 @@ public enum ClusterConfigProperty {
// error exceeds this limitation
DISABLED_INSTANCES,

VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
// ViewClusterSourceConfig JSON string
VIEW_CLUSTER_REFRESH_PERIOD, // In second
// Specifies job types and used for quota allocation
QUOTA_TYPES,

Expand Down Expand Up @@ -178,6 +184,7 @@ public enum GlobalRebalancePreferenceKey {
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1;
private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;

/**
* Instantiate for a specific cluster
Expand All @@ -195,6 +202,36 @@ public ClusterConfig(ZNRecord record) {
super(record);
}

public void setViewCluster() {
_record.setBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), true);
}

/**
* Whether this cluster is a ViewCluster
* @return
*/
public boolean isViewCluster() {
return _record
.getBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), false);
}

/**
* Set a list of ViewClusterSourceConfig to ClusterConfig. Current source config will be
* overwritten
* @param sourceConfigList
*/
public void setViewClusterSourceConfigs(List<ViewClusterSourceConfig> sourceConfigList) {
List<String> sourceConfigs = new ArrayList<>();
for (ViewClusterSourceConfig config : sourceConfigList) {
try {
sourceConfigs.add(config.toJson());
} catch (IOException e) {
throw new IllegalArgumentException("Invalid source config. Error: " + e.toString());
}
}
_record.setListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name(), sourceConfigs);
}

/**
* Set task quota type with the ratio of this quota.
* @param quotaType String
Expand Down Expand Up @@ -262,6 +299,30 @@ public void resetTaskQuotaRatioMap() {
}
}

/**
* Set view cluster max refresh period
* @param refreshPeriod refresh period in second
*/
public void setViewClusterRefreshPeriod(int refreshPeriod) {
_record.setIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
refreshPeriod);
}

public List<ViewClusterSourceConfig> getViewClusterSourceConfigs() {
List<ViewClusterSourceConfig> sourceConfigList = new ArrayList<>();
for (String configJSON : _record
.getListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name())) {
ViewClusterSourceConfig config = ViewClusterSourceConfig.fromJson(configJSON);
sourceConfigList.add(config);
}
return sourceConfigList;
}

public int getViewClusterRefershPeriod() {
return _record.getIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD);
}

/**
* Whether to persist best possible assignment in a resource's idealstate.
* @return
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskPartitionState;
Expand Down
Expand Up @@ -7,9 +7,7 @@ regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -20,7 +18,7 @@ under the License.
<ivy-module version="1.0">
<info organisation="org.apache.helix"
module="helix-view-aggregator"
revision="0.6.10-SNAPSHOT"
revision="1.0.3-SNAPSHOT"
status="integration"
publication="20170128141623"
/>
Expand All @@ -37,16 +35,19 @@ under the License.
<artifact name="helix-view-aggregator" type="jar" ext="jar" conf="master"/>
</publications>
<dependencies>
<dependency org="org.slf4j" name="slf4j-api" rev="1.7.25" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<dependency org="org.slf4j" name="slf4j-api" rev="1.7.32" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="slf4j-api" ext="jar"/>
</dependency>
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="slf4j-log4j12" ext="jar"/>
<dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
<dependency org="org.apache.helix" name="helix-core" rev="0.8.0-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<--dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="slf4j-log4j12" ext="jar"/>
</dependency-->
<dependency org="org.apache.helix" name="helix-core" rev="1.0.3-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="io.dropwizard.metrics" name="metrics-core" rev="3.2.3" conf="compile->compile(default);runtime->runtime(default);default->default"/>
</dependencies>
</ivy-module>
</ivy-module>

0 comments on commit 764d826

Please sign in to comment.