Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions helix-core/src/main/java/org/apache/helix/InstanceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum InstanceType {
MonitorDomainNames.ClusterStatus.name(),
MonitorDomainNames.HelixZkClient.name(),
MonitorDomainNames.HelixCallback.name(),
MonitorDomainNames.Rebalancer.name()
MonitorDomainNames.Rebalancer.name(),
MonitorDomainNames.AggregatedView.name()
}),

PARTICIPANT(new String[] {
Expand All @@ -53,7 +54,8 @@ public enum InstanceType {
MonitorDomainNames.HelixCallback.name(),
MonitorDomainNames.HelixThreadPoolExecutor.name(),
MonitorDomainNames.CLMParticipantReport.name(),
MonitorDomainNames.Rebalancer.name()
MonitorDomainNames.Rebalancer.name(),
MonitorDomainNames.AggregatedView.name()
}),

SPECTATOR(new String[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CustomizedStateComputationStage extends AbstractBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CustomizedStateComputationStage.class);

Expand Down Expand Up @@ -85,7 +84,8 @@ private void updateCustomizedStates(String instanceName, String customizedStateT
if (partition != null) {
customizedStateOutput
.setCustomizedState(customizedStateType, resourceName, partition, instanceName,
customizedState.getState(partitionName));
customizedState.getState(partitionName),
customizedState.getStartTime(partitionName));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,35 @@

import org.apache.helix.model.Partition;


public class CustomizedStateOutput {
// stateType -> (resourceName -> (Partition -> (instanceName -> customizedState)))
private final Map<String, Map<String, Map<Partition, Map<String, String>>>> _customizedStateMap;
// stateType -> (resourceName -> (Partition -> (instanceName -> startTime)))
private final Map<String, Map<String, Map<Partition, Map<String, Long>>>> _startTimeMap;

public CustomizedStateOutput() {
_customizedStateMap = new HashMap<>();
_startTimeMap = new HashMap<>();
}

public void setCustomizedState(String stateType, String resourceName, Partition partition,
String instanceName, String state, Long startTime) {
setCurrentState(stateType, resourceName, partition, instanceName, state);
setStartTime(stateType, resourceName, partition, instanceName, startTime);
}

private void setCurrentState(String stateType, String resourceName, Partition partition,
String instanceName, String state) {
if (!_customizedStateMap.containsKey(stateType)) {
_customizedStateMap
.put(stateType, new HashMap<String, Map<Partition, Map<String, String>>>());
}
if (!_customizedStateMap.get(stateType).containsKey(resourceName)) {
_customizedStateMap.get(stateType)
.put(resourceName, new HashMap<Partition, Map<String, String>>());
}
if (!_customizedStateMap.get(stateType).get(resourceName).containsKey(partition)) {
_customizedStateMap.get(stateType).get(resourceName)
.put(partition, new HashMap<String, String>());
}
_customizedStateMap.get(stateType).get(resourceName).get(partition).put(instanceName, state);
_customizedStateMap.computeIfAbsent(stateType, k -> new HashMap<>())
.computeIfAbsent(resourceName, k -> new HashMap<>())
.computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, state);
}

private void setStartTime(String stateType, String resourceName, Partition partition,
String instanceName, Long startTime) {
_startTimeMap.computeIfAbsent(stateType, k -> new HashMap<>())
.computeIfAbsent(resourceName, k -> new HashMap<>())
.computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, startTime);
}

/**
Expand All @@ -65,6 +70,10 @@ public Map<String, Map<Partition, Map<String, String>>> getCustomizedStateMap(St
return Collections.emptyMap();
}

private Map<String, Map<Partition, Map<String, Long>>> getStartTimeMap(String stateType) {
return _startTimeMap.getOrDefault(stateType, Collections.emptyMap());
}

/**
* given (stateType, resource), returns (partition -> instance-> customizedState) map
* @param stateType
Expand Down Expand Up @@ -114,6 +123,12 @@ public String getPartitionCustomizedState(String stateType, String resourceName,
return null;
}

public Map<Partition, Map<String, Long>> getResourceStartTimeMap(String stateType,
String resourceName) {
return Collections.unmodifiableMap(
getStartTimeMap(stateType).getOrDefault(resourceName, Collections.emptyMap()));
}

public Set<String> getAllStateTypes() {
return _customizedStateMap.keySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -40,10 +41,11 @@
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage {
private static Logger LOG = LoggerFactory.getLogger(CustomizedViewAggregationStage.class);

Expand Down Expand Up @@ -78,7 +80,9 @@ public void execute(final ClusterEvent event) throws Exception {
Set<String> customizedTypesToRemove = new HashSet<>();
for (String stateType : customizedViewCacheMap.keySet()) {
if (!customizedStateOutput.getAllStateTypes().contains(stateType)) {
LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType);
LogUtil.logInfo(LOG, _eventId,
"Remove customizedView for stateType: " + stateType + ", on cluster " + event
.getClusterName());
dataAccessor.removeProperty(keyBuilder.customizedView(stateType));
customizedTypesToRemove.add(stateType);
}
Expand All @@ -88,6 +92,7 @@ public void execute(final ClusterEvent event) throws Exception {
// update customized view
for (String stateType : customizedStateOutput.getAllStateTypes()) {
List<CustomizedView> updatedCustomizedViews = new ArrayList<>();
Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps = new HashMap<>();
Map<String, CustomizedView> curCustomizedViews = new HashMap<>();
CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType);
if (customizedViewCache != null) {
Expand All @@ -97,13 +102,13 @@ public void execute(final ClusterEvent event) throws Exception {
for (Resource resource : resourceMap.values()) {
try {
computeCustomizedStateView(resource, stateType, customizedStateOutput, curCustomizedViews,
updatedCustomizedViews);
updatedCustomizedViews, updatedStartTimestamps);
} catch (HelixException ex) {
LogUtil.logError(LOG, _eventId,
"Failed to calculate customized view for resource " + resource.getResourceName(), ex);
"Failed to calculate customized view for resource " + resource.getResourceName()
+ ", on cluster " + event.getClusterName(), ex);
}
}

List<PropertyKey> keys = new ArrayList<>();
for (Iterator<CustomizedView> it = updatedCustomizedViews.iterator(); it.hasNext(); ) {
CustomizedView view = it.next();
Expand All @@ -112,15 +117,19 @@ public void execute(final ClusterEvent event) throws Exception {
}
// add/update customized-views from zk and cache
if (updatedCustomizedViews.size() > 0) {
dataAccessor.setChildren(keys, updatedCustomizedViews);
boolean[] success = dataAccessor.setChildren(keys, updatedCustomizedViews);
reportLatency(event, updatedCustomizedViews, curCustomizedViews, updatedStartTimestamps,
success);
cache.updateCustomizedViews(stateType, updatedCustomizedViews);
}

// remove stale customized views from zk and cache
List<String> customizedViewToRemove = new ArrayList<>();
for (String resourceName : curCustomizedViews.keySet()) {
if (!resourceMap.keySet().contains(resourceName)) {
LogUtil.logInfo(LOG, _eventId, "Remove customizedView for resource: " + resourceName);
if (!resourceMap.containsKey(resourceName)) {
LogUtil.logInfo(LOG, _eventId,
"Remove customizedView for resource: " + resourceName + ", on cluster " + event
.getClusterName());
dataAccessor.removeProperty(keyBuilder.customizedView(stateType, resourceName));
customizedViewToRemove.add(resourceName);
}
Expand All @@ -132,7 +141,8 @@ public void execute(final ClusterEvent event) throws Exception {
private void computeCustomizedStateView(final Resource resource, final String stateType,
CustomizedStateOutput customizedStateOutput,
final Map<String, CustomizedView> curCustomizedViews,
List<CustomizedView> updatedCustomizedViews) {
List<CustomizedView> updatedCustomizedViews,
Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps) {
String resourceName = resource.getResourceName();
CustomizedView view = new CustomizedView(resource.getResourceName());

Expand All @@ -152,6 +162,68 @@ private void computeCustomizedStateView(final Resource resource, final String st
if (curCustomizedView == null || !curCustomizedView.getRecord().equals(view.getRecord())) {
// Add customized view to the list which will be written to ZK later.
updatedCustomizedViews.add(view);
updatedStartTimestamps.put(resourceName,
customizedStateOutput.getResourceStartTimeMap(stateType, resourceName));
}
}

private void reportLatency(ClusterEvent event, List<CustomizedView> updatedCustomizedViews,
Map<String, CustomizedView> curCustomizedViews,
Map<String, Map<Partition, Map<String, Long>>> updatedStartTimestamps,
boolean[] updateSuccess) {
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
if (clusterStatusMonitor == null) {
return;
}
long curTime = System.currentTimeMillis();
String clusterName = event.getClusterName();
CustomizedViewMonitor customizedViewMonitor =
clusterStatusMonitor.getOrCreateCustomizedViewMonitor(clusterName);

for (int i = 0; i < updatedCustomizedViews.size(); i++) {
CustomizedView newCV = updatedCustomizedViews.get(i);
String resourceName = newCV.getResourceName();

if (!updateSuccess[i]) {
LOG.warn("Customized views are not updated successfully for resource {} on cluster {}",
resourceName, clusterName);
continue;
}

CustomizedView oldCV =
curCustomizedViews.getOrDefault(resourceName, new CustomizedView(resourceName));

Map<String, Map<String, String>> newPartitionStateMaps = newCV.getRecord().getMapFields();
Map<String, Map<String, String>> oldPartitionStateMaps = oldCV.getRecord().getMapFields();
Map<Partition, Map<String, Long>> partitionStartTimeMaps =
updatedStartTimestamps.getOrDefault(resourceName, Collections.emptyMap());

for (Map.Entry<String, Map<String, String>> partitionStateMapEntry : newPartitionStateMaps
.entrySet()) {
String partitionName = partitionStateMapEntry.getKey();
Map<String, String> newStateMap = partitionStateMapEntry.getValue();
Map<String, String> oldStateMap =
oldPartitionStateMaps.getOrDefault(partitionName, Collections.emptyMap());
if (!newStateMap.equals(oldStateMap)) {
Map<String, Long> partitionStartTimeMap = partitionStartTimeMaps
.getOrDefault(new Partition(partitionName), Collections.emptyMap());

for (Map.Entry<String, String> stateMapEntry : newStateMap.entrySet()) {
String instanceName = stateMapEntry.getKey();
if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) {
long timestamp = partitionStartTimeMap.get(instanceName);
if (timestamp > 0) {
customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp);
} else {
LOG.warn(
"Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.",
resourceName, partitionName, instanceName, clusterName);
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public void updateCustomizedState(String customizedStateName, String resourceNam
PropertyKey propertyKey =
keyBuilder.customizedState(_instanceName, customizedStateName, resourceName);
ZNRecord record = new ZNRecord(resourceName);
// Update start time field for monitoring purpose, updated value is current time
customizedStateMap.put(CustomizedState.CustomizedStateProperty.START_TIME.name(),
String.valueOf(System.currentTimeMillis()));
record.setMapField(partitionName, customizedStateMap);
if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) {
throw new HelixException(String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
protected final ConcurrentHashMap<String, ClusterEventMonitor> _clusterEventMonitorMap =
new ConcurrentHashMap<>();

private CustomizedViewMonitor _customizedViewMonitor;

/**
* PerInstanceResource monitor map: beanName->monitor
*/
Expand Down Expand Up @@ -332,6 +334,23 @@ public void updateClusterEventDuration(String phase, long duration) {
}
}

/**
* Lazy initialization of customized view monitor
* @param clusterName the cluster name of the cluster to be monitored
* @return a customized view monitor instance
*/
public synchronized CustomizedViewMonitor getOrCreateCustomizedViewMonitor(String clusterName) {
if (_customizedViewMonitor == null) {
_customizedViewMonitor = new CustomizedViewMonitor(clusterName);
try {
_customizedViewMonitor.register();
} catch (JMException e) {
LOG.error("Failed to register CustomizedViewMonitorMBean for cluster " + _clusterName, e);
}
}
return _customizedViewMonitor;
}

private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) {
try {
if (!_clusterEventMonitorMap.containsKey(phase)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.apache.helix.monitoring.mbeans;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomizedViewMonitor extends DynamicMBeanProvider {
private static final Logger LOG = LoggerFactory.getLogger(CustomizedViewMonitor.class);

private static final String MBEAN_DESCRIPTION = "Helix Customized View Aggregation Monitor";
private final String _clusterName;
private final String _sensorName;
private HistogramDynamicMetric _updateToAggregationLatencyGauge;
public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE =
"UpdateToAggregationLatencyGauge";
private ClusterStatusMonitor _clusterStatusMonitor;
private static final String TYPE_KEY = "Type";
private static final String CLUSTER_KEY = "Cluster";
private static final String CUSTOMIZED_VIEW = "CustomizedView";

public CustomizedViewMonitor(String clusterName) {
_clusterName = clusterName;
_sensorName = String
.format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName);
_updateToAggregationLatencyGauge =
new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram(
new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS)));
}

@Override
public DynamicMBeanProvider register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_updateToAggregationLatencyGauge);
doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanObjectName());
return this;
}

private ObjectName getMBeanObjectName() throws MalformedObjectNameException {
return new ObjectName(String
.format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY,
CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName));
}

@Override
public String getSensorName() {
return _sensorName;
}

public void recordUpdateToAggregationLatency(long latency) {
_updateToAggregationLatencyGauge.updateValue(latency);
}
}
Loading