diff --git a/helix-core/src/main/java/org/apache/helix/InstanceType.java b/helix-core/src/main/java/org/apache/helix/InstanceType.java index ef77c59b5f..8603c8406b 100644 --- a/helix-core/src/main/java/org/apache/helix/InstanceType.java +++ b/helix-core/src/main/java/org/apache/helix/InstanceType.java @@ -37,7 +37,8 @@ public enum InstanceType { MonitorDomainNames.ClusterStatus.name(), MonitorDomainNames.HelixZkClient.name(), MonitorDomainNames.HelixCallback.name(), - MonitorDomainNames.Rebalancer.name() + MonitorDomainNames.Rebalancer.name(), + MonitorDomainNames.AggregatedView.name() }), PARTICIPANT(new String[] { @@ -53,7 +54,8 @@ public enum InstanceType { MonitorDomainNames.HelixCallback.name(), MonitorDomainNames.HelixThreadPoolExecutor.name(), MonitorDomainNames.CLMParticipantReport.name(), - MonitorDomainNames.Rebalancer.name() + MonitorDomainNames.Rebalancer.name(), + MonitorDomainNames.AggregatedView.name() }), SPECTATOR(new String[] { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java index cb5f1a37d2..8716997080 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateComputationStage.java @@ -32,7 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class CustomizedStateComputationStage extends AbstractBaseStage { private static Logger LOG = LoggerFactory.getLogger(CustomizedStateComputationStage.class); @@ -85,7 +84,8 @@ private void updateCustomizedStates(String instanceName, String customizedStateT if (partition != null) { customizedStateOutput .setCustomizedState(customizedStateType, resourceName, partition, instanceName, - customizedState.getState(partitionName)); + customizedState.getState(partitionName), + customizedState.getStartTime(partitionName)); } } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java index 3863743375..6ba1a513d7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedStateOutput.java @@ -26,30 +26,35 @@ import org.apache.helix.model.Partition; - public class CustomizedStateOutput { // stateType -> (resourceName -> (Partition -> (instanceName -> customizedState))) private final Map>>> _customizedStateMap; + // stateType -> (resourceName -> (Partition -> (instanceName -> startTime))) + private final Map>>> _startTimeMap; public CustomizedStateOutput() { _customizedStateMap = new HashMap<>(); + _startTimeMap = new HashMap<>(); } public void setCustomizedState(String stateType, String resourceName, Partition partition, + String instanceName, String state, Long startTime) { + setCurrentState(stateType, resourceName, partition, instanceName, state); + setStartTime(stateType, resourceName, partition, instanceName, startTime); + } + + private void setCurrentState(String stateType, String resourceName, Partition partition, String instanceName, String state) { - if (!_customizedStateMap.containsKey(stateType)) { - _customizedStateMap - .put(stateType, new HashMap>>()); - } - if (!_customizedStateMap.get(stateType).containsKey(resourceName)) { - _customizedStateMap.get(stateType) - .put(resourceName, new HashMap>()); - } - if (!_customizedStateMap.get(stateType).get(resourceName).containsKey(partition)) { - _customizedStateMap.get(stateType).get(resourceName) - .put(partition, new HashMap()); - } - _customizedStateMap.get(stateType).get(resourceName).get(partition).put(instanceName, state); + _customizedStateMap.computeIfAbsent(stateType, k -> new HashMap<>()) + .computeIfAbsent(resourceName, k -> new HashMap<>()) + .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, state); + } + + private void setStartTime(String stateType, String resourceName, Partition partition, + String instanceName, Long startTime) { + _startTimeMap.computeIfAbsent(stateType, k -> new HashMap<>()) + .computeIfAbsent(resourceName, k -> new HashMap<>()) + .computeIfAbsent(partition, k -> new HashMap<>()).put(instanceName, startTime); } /** @@ -65,6 +70,10 @@ public Map>> getCustomizedStateMap(St return Collections.emptyMap(); } + private Map>> getStartTimeMap(String stateType) { + return _startTimeMap.getOrDefault(stateType, Collections.emptyMap()); + } + /** * given (stateType, resource), returns (partition -> instance-> customizedState) map * @param stateType @@ -114,6 +123,12 @@ public String getPartitionCustomizedState(String stateType, String resourceName, return null; } + public Map> getResourceStartTimeMap(String stateType, + String resourceName) { + return Collections.unmodifiableMap( + getStartTimeMap(stateType).getOrDefault(resourceName, Collections.emptyMap())); + } + public Set getAllStateTypes() { return _customizedStateMap.keySet(); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java index 165a0af08f..1582630f84 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CustomizedViewAggregationStage.java @@ -20,6 +20,7 @@ */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -40,10 +41,11 @@ import org.apache.helix.model.CustomizedView; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class CustomizedViewAggregationStage extends AbstractAsyncBaseStage { private static Logger LOG = LoggerFactory.getLogger(CustomizedViewAggregationStage.class); @@ -78,7 +80,9 @@ public void execute(final ClusterEvent event) throws Exception { Set customizedTypesToRemove = new HashSet<>(); for (String stateType : customizedViewCacheMap.keySet()) { if (!customizedStateOutput.getAllStateTypes().contains(stateType)) { - LogUtil.logInfo(LOG, _eventId, "Remove customizedView for stateType: " + stateType); + LogUtil.logInfo(LOG, _eventId, + "Remove customizedView for stateType: " + stateType + ", on cluster " + event + .getClusterName()); dataAccessor.removeProperty(keyBuilder.customizedView(stateType)); customizedTypesToRemove.add(stateType); } @@ -88,6 +92,7 @@ public void execute(final ClusterEvent event) throws Exception { // update customized view for (String stateType : customizedStateOutput.getAllStateTypes()) { List updatedCustomizedViews = new ArrayList<>(); + Map>> updatedStartTimestamps = new HashMap<>(); Map curCustomizedViews = new HashMap<>(); CustomizedViewCache customizedViewCache = customizedViewCacheMap.get(stateType); if (customizedViewCache != null) { @@ -97,13 +102,13 @@ public void execute(final ClusterEvent event) throws Exception { for (Resource resource : resourceMap.values()) { try { computeCustomizedStateView(resource, stateType, customizedStateOutput, curCustomizedViews, - updatedCustomizedViews); + updatedCustomizedViews, updatedStartTimestamps); } catch (HelixException ex) { LogUtil.logError(LOG, _eventId, - "Failed to calculate customized view for resource " + resource.getResourceName(), ex); + "Failed to calculate customized view for resource " + resource.getResourceName() + + ", on cluster " + event.getClusterName(), ex); } } - List keys = new ArrayList<>(); for (Iterator it = updatedCustomizedViews.iterator(); it.hasNext(); ) { CustomizedView view = it.next(); @@ -112,15 +117,19 @@ public void execute(final ClusterEvent event) throws Exception { } // add/update customized-views from zk and cache if (updatedCustomizedViews.size() > 0) { - dataAccessor.setChildren(keys, updatedCustomizedViews); + boolean[] success = dataAccessor.setChildren(keys, updatedCustomizedViews); + reportLatency(event, updatedCustomizedViews, curCustomizedViews, updatedStartTimestamps, + success); cache.updateCustomizedViews(stateType, updatedCustomizedViews); } // remove stale customized views from zk and cache List customizedViewToRemove = new ArrayList<>(); for (String resourceName : curCustomizedViews.keySet()) { - if (!resourceMap.keySet().contains(resourceName)) { - LogUtil.logInfo(LOG, _eventId, "Remove customizedView for resource: " + resourceName); + if (!resourceMap.containsKey(resourceName)) { + LogUtil.logInfo(LOG, _eventId, + "Remove customizedView for resource: " + resourceName + ", on cluster " + event + .getClusterName()); dataAccessor.removeProperty(keyBuilder.customizedView(stateType, resourceName)); customizedViewToRemove.add(resourceName); } @@ -132,7 +141,8 @@ public void execute(final ClusterEvent event) throws Exception { private void computeCustomizedStateView(final Resource resource, final String stateType, CustomizedStateOutput customizedStateOutput, final Map curCustomizedViews, - List updatedCustomizedViews) { + List updatedCustomizedViews, + Map>> updatedStartTimestamps) { String resourceName = resource.getResourceName(); CustomizedView view = new CustomizedView(resource.getResourceName()); @@ -152,6 +162,68 @@ private void computeCustomizedStateView(final Resource resource, final String st if (curCustomizedView == null || !curCustomizedView.getRecord().equals(view.getRecord())) { // Add customized view to the list which will be written to ZK later. updatedCustomizedViews.add(view); + updatedStartTimestamps.put(resourceName, + customizedStateOutput.getResourceStartTimeMap(stateType, resourceName)); + } + } + + private void reportLatency(ClusterEvent event, List updatedCustomizedViews, + Map curCustomizedViews, + Map>> updatedStartTimestamps, + boolean[] updateSuccess) { + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); + if (clusterStatusMonitor == null) { + return; + } + long curTime = System.currentTimeMillis(); + String clusterName = event.getClusterName(); + CustomizedViewMonitor customizedViewMonitor = + clusterStatusMonitor.getOrCreateCustomizedViewMonitor(clusterName); + + for (int i = 0; i < updatedCustomizedViews.size(); i++) { + CustomizedView newCV = updatedCustomizedViews.get(i); + String resourceName = newCV.getResourceName(); + + if (!updateSuccess[i]) { + LOG.warn("Customized views are not updated successfully for resource {} on cluster {}", + resourceName, clusterName); + continue; + } + + CustomizedView oldCV = + curCustomizedViews.getOrDefault(resourceName, new CustomizedView(resourceName)); + + Map> newPartitionStateMaps = newCV.getRecord().getMapFields(); + Map> oldPartitionStateMaps = oldCV.getRecord().getMapFields(); + Map> partitionStartTimeMaps = + updatedStartTimestamps.getOrDefault(resourceName, Collections.emptyMap()); + + for (Map.Entry> partitionStateMapEntry : newPartitionStateMaps + .entrySet()) { + String partitionName = partitionStateMapEntry.getKey(); + Map newStateMap = partitionStateMapEntry.getValue(); + Map oldStateMap = + oldPartitionStateMaps.getOrDefault(partitionName, Collections.emptyMap()); + if (!newStateMap.equals(oldStateMap)) { + Map partitionStartTimeMap = partitionStartTimeMaps + .getOrDefault(new Partition(partitionName), Collections.emptyMap()); + + for (Map.Entry stateMapEntry : newStateMap.entrySet()) { + String instanceName = stateMapEntry.getKey(); + if (!stateMapEntry.getValue().equals(oldStateMap.get(instanceName))) { + long timestamp = partitionStartTimeMap.get(instanceName); + if (timestamp > 0) { + customizedViewMonitor.recordUpdateToAggregationLatency(curTime - timestamp); + } else { + LOG.warn( + "Failed to find customized state update time stamp for resource {} partition {}, instance {}, on cluster {} the number should be positive.", + resourceName, partitionName, instanceName, clusterName); + } + } + } + } + } } } } diff --git a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java index 7684e05c28..0abf4a1126 100644 --- a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java +++ b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java @@ -68,6 +68,9 @@ public void updateCustomizedState(String customizedStateName, String resourceNam PropertyKey propertyKey = keyBuilder.customizedState(_instanceName, customizedStateName, resourceName); ZNRecord record = new ZNRecord(resourceName); + // Update start time field for monitoring purpose, updated value is current time + customizedStateMap.put(CustomizedState.CustomizedStateProperty.START_TIME.name(), + String.valueOf(System.currentTimeMillis())); record.setMapField(partitionName, customizedStateMap); if (!_helixDataAccessor.updateProperty(propertyKey, new CustomizedState(record))) { throw new HelixException(String diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 19113852b6..2f212302cc 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -96,6 +96,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { protected final ConcurrentHashMap _clusterEventMonitorMap = new ConcurrentHashMap<>(); + private CustomizedViewMonitor _customizedViewMonitor; + /** * PerInstanceResource monitor map: beanName->monitor */ @@ -332,6 +334,23 @@ public void updateClusterEventDuration(String phase, long duration) { } } + /** + * Lazy initialization of customized view monitor + * @param clusterName the cluster name of the cluster to be monitored + * @return a customized view monitor instance + */ + public synchronized CustomizedViewMonitor getOrCreateCustomizedViewMonitor(String clusterName) { + if (_customizedViewMonitor == null) { + _customizedViewMonitor = new CustomizedViewMonitor(clusterName); + try { + _customizedViewMonitor.register(); + } catch (JMException e) { + LOG.error("Failed to register CustomizedViewMonitorMBean for cluster " + _clusterName, e); + } + } + return _customizedViewMonitor; + } + private ClusterEventMonitor getOrCreateClusterEventMonitor(String phase) { try { if (!_clusterEventMonitorMap.containsKey(phase)) { diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java new file mode 100644 index 0000000000..b0380784cb --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/CustomizedViewMonitor.java @@ -0,0 +1,83 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.JMException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingTimeWindowArrayReservoir; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric; +import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CustomizedViewMonitor extends DynamicMBeanProvider { + private static final Logger LOG = LoggerFactory.getLogger(CustomizedViewMonitor.class); + + private static final String MBEAN_DESCRIPTION = "Helix Customized View Aggregation Monitor"; + private final String _clusterName; + private final String _sensorName; + private HistogramDynamicMetric _updateToAggregationLatencyGauge; + public static final String UPDATE_TO_AGGREGATION_LATENCY_GAUGE = + "UpdateToAggregationLatencyGauge"; + private ClusterStatusMonitor _clusterStatusMonitor; + private static final String TYPE_KEY = "Type"; + private static final String CLUSTER_KEY = "Cluster"; + private static final String CUSTOMIZED_VIEW = "CustomizedView"; + + public CustomizedViewMonitor(String clusterName) { + _clusterName = clusterName; + _sensorName = String + .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY, + CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName); + _updateToAggregationLatencyGauge = + new HistogramDynamicMetric(UPDATE_TO_AGGREGATION_LATENCY_GAUGE, new Histogram( + new SlidingTimeWindowArrayReservoir(getResetIntervalInMs(), TimeUnit.MILLISECONDS))); + } + + @Override + public DynamicMBeanProvider register() throws JMException { + List> attributeList = new ArrayList<>(); + attributeList.add(_updateToAggregationLatencyGauge); + doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanObjectName()); + return this; + } + + private ObjectName getMBeanObjectName() throws MalformedObjectNameException { + return new ObjectName(String + .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), TYPE_KEY, + CUSTOMIZED_VIEW, CLUSTER_KEY, _clusterName)); + } + + @Override + public String getSensorName() { + return _sensorName; + } + + public void recordUpdateToAggregationLatency(long latency) { + _updateToAggregationLatencyGauge.updateValue(latency); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java index bcd5f75b4a..0f14bc275e 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java @@ -21,6 +21,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.management.ObjectName; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -34,10 +37,12 @@ import org.apache.helix.model.CustomizedState; import org.apache.helix.model.CustomizedStateConfig; import org.apache.helix.model.CustomizedView; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.monitoring.mbeans.CustomizedViewMonitor; +import org.apache.helix.monitoring.mbeans.MonitorDomainNames; import org.testng.Assert; import org.testng.annotations.Test; - public class TestCustomizedViewStage extends ZkUnitTestBase { private final String RESOURCE_NAME = "TestDB"; private final String PARTITION_NAME = "TestDB_0"; @@ -112,4 +117,65 @@ public void testCachedCustomizedViews() throws Exception { deleteLiveInstances(clusterName); deleteCluster(clusterName); } + + @Test + public void testLatencyMetricReporting() throws Exception { + String clusterName = "CLUSTER_" + TestHelper.getTestMethodName(); + + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + HelixManager manager = new DummyClusterManager(clusterName, accessor); + + // ideal state: node0 is MASTER, node1 is SLAVE + // replica=2 means 1 master and 1 slave + setupIdealState(clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2); + setupLiveInstances(clusterName, new int[]{0, 1}); + setupStateModel(clusterName); + + ClusterStatusMonitor clusterStatusMonitor = new ClusterStatusMonitor(clusterName); + ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.Unknown); + ResourceControllerDataProvider cache = new ResourceControllerDataProvider(clusterName); + ExecutorService executor = Executors.newSingleThreadExecutor(); + cache.setAsyncTasksThreadPool(executor); + event.addAttribute(AttributeName.helixmanager.name(), manager); + event.addAttribute(AttributeName.ControllerDataProvider.name(), cache); + event.addAttribute(AttributeName.clusterStatusMonitor.name(), clusterStatusMonitor); + + CustomizedStateConfig config = new CustomizedStateConfig(); + List aggregationEnabledTypes = new ArrayList<>(); + aggregationEnabledTypes.add(CUSTOMIZED_STATE_NAME); + config.setAggregationEnabledTypes(aggregationEnabledTypes); + + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + accessor.setProperty(keyBuilder.customizedStateConfig(), config); + + CustomizedState customizedState = new CustomizedState(RESOURCE_NAME); + customizedState.setState(PARTITION_NAME, "STATE"); + customizedState.setStartTime(PARTITION_NAME, 1); + accessor.setProperty( + keyBuilder.customizedState(INSTANCE_NAME, CUSTOMIZED_STATE_NAME, RESOURCE_NAME), + customizedState); + + Pipeline dataRefresh = new Pipeline(); + dataRefresh.addStage(new ReadClusterDataStage()); + runPipeline(event, dataRefresh); + runStage(event, new ResourceComputationStage()); + runStage(event, new CustomizedStateComputationStage()); + runStage(event, new CustomizedViewAggregationStage()); + + ObjectName objectName = new ObjectName(String + .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type", + "CustomizedView", "Cluster", clusterName)); + Assert.assertNotNull( + ((ClusterStatusMonitor) event.getAttribute(AttributeName.clusterStatusMonitor.name())) + .getOrCreateCustomizedViewMonitor(clusterName)); + + TestHelper.verify(() -> (long) _server.getAttribute(objectName, + CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + ".Max") != 0, + TestHelper.WAIT_DURATION); + + deleteLiveInstances(clusterName); + deleteCluster(clusterName); + executor.shutdownNow(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java index 4eed9cceb0..258e4c46a8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedViewAggregation.java @@ -222,7 +222,7 @@ public boolean verify() { Map> localPerResourceCustomizedView = localSnapshot .getOrDefault(resourceCustomizedView.getResourceName(), Maps.newHashMap()); - if (resourceStateMap.isEmpty() && !localPerResourceCustomizedView.isEmpty()) { + if (resourceStateMap.size() != localPerResourceCustomizedView.size()) { return false; } @@ -252,7 +252,7 @@ public boolean verify() { } return true; } - }, 12000); + }, TestHelper.WAIT_DURATION); Assert.assertTrue(result); } @@ -422,6 +422,8 @@ public void testCustomizedViewAggregation() throws Exception { Map perPartitionCustomizedState = _customizedStateProvider_participant1 .getPerPartitionCustomizedState(CustomizedStateType.TYPE_A.name(), RESOURCE_1, PARTITION_10); + // Remove this field because it's automatically updated for monitoring purpose and we don't need to compare it + perPartitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name()); Map actualPerPartitionCustomizedState = Maps.newHashMap(); actualPerPartitionCustomizedState .put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java index 0102375419..ed1338abe3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java @@ -87,7 +87,8 @@ public void testUpdateCustomizedState() { Map> mapView = customizedState.getRecord().getMapFields(); Assert.assertEquals(mapView.keySet().size(), 1); Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1); - Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2); + // Updated 2 fields + START_TIME field is automatically updated for monitoring + Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "STARTED"); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED"); @@ -103,7 +104,7 @@ public void testUpdateCustomizedState() { mapView = customizedState.getRecord().getMapFields(); Assert.assertEquals(mapView.keySet().size(), 1); Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1); - Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2); + Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED"); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED"); @@ -120,7 +121,7 @@ public void testUpdateCustomizedState() { mapView = customizedState.getRecord().getMapFields(); Assert.assertEquals(mapView.keySet().size(), 1); Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1); - Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 2); + Assert.assertEquals(mapView.get(PARTITION_NAME1).keySet().size(), 3); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED"); Assert.assertEquals(mapView.get(PARTITION_NAME1).get("CURRENT_STATE"), "COMPLETED"); @@ -141,13 +142,13 @@ public void testUpdateCustomizedState() { Map partitionMap1 = _mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1); - Assert.assertEquals(partitionMap1.keySet().size(), 2); + Assert.assertEquals(partitionMap1.keySet().size(), 3); Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"), "END_OF_PUSH_RECEIVED"); Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED"); Map partitionMap2 = _mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2); - Assert.assertEquals(partitionMap2.keySet().size(), 2); + Assert.assertEquals(partitionMap2.keySet().size(), 3); Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED"); Assert.assertEquals(partitionMap2.get("CURRENT_STATE"), "END_OF_PUSH_RECEIVED"); @@ -170,6 +171,7 @@ public void testUpdateSinglePartitionCustomizedState() { // get customized state CustomizedState customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME); + // START_TIME field is automatically updated for monitoring Assert.assertEquals( customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE) .size(), 1); @@ -178,8 +180,8 @@ public void testUpdateSinglePartitionCustomizedState() { Assert.assertEquals(customizedState .getPartitionStateMap(CustomizedState.CustomizedStateProperty.PREVIOUS_STATE), map); Assert.assertEquals( - customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME), - map); + customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.START_TIME).size(), + 1); Assert.assertEquals( customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.END_TIME), map); @@ -192,6 +194,7 @@ public void testUpdateSinglePartitionCustomizedState() { map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), PARTITION_STATE); Map partitionCustomizedState = _mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1); + partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name()); Assert.assertEquals(partitionCustomizedState, map); Assert.assertNull(_mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2)); @@ -218,6 +221,7 @@ public void testUpdateSinglePartitionCustomizedStateWithNullField() { map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(), null); Map partitionCustomizedState = _mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1); + partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name()); Assert.assertEquals(partitionCustomizedState, map); Assert.assertNull(_mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2)); @@ -244,7 +248,8 @@ public void testUpdateCustomizedStateWithEmptyMap() { // get per partition customized state Map partitionCustomizedState = _mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME1); - Assert.assertEquals(partitionCustomizedState.size(), 0); + // START_TIME field is automatically updated for monitoring + Assert.assertEquals(partitionCustomizedState.size(), 1); Assert.assertNull(_mockProvider .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME, PARTITION_NAME2)); } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java new file mode 100644 index 0000000000..248ed56870 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestCustomizedViewMonitor.java @@ -0,0 +1,99 @@ +package org.apache.helix.monitoring.mbeans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Stack; +import javax.management.JMException; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.helix.TestHelper; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestCustomizedViewMonitor { + private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer(); + private final String TEST_CLUSTER = "test_cluster"; + private final String MAX_SUFFIX = ".Max"; + private final String MEAN_SUFFIX = ".Mean"; + + private ObjectName buildObjectName(int duplicateNum) throws MalformedObjectNameException { + ObjectName objectName = new ObjectName(String + .format("%s:%s=%s,%s=%s", MonitorDomainNames.AggregatedView.name(), "Type", + "CustomizedView", "Cluster", TEST_CLUSTER)); + if (duplicateNum == 0) { + return objectName; + } else { + return new ObjectName( + String.format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE, duplicateNum)); + } + } + + @Test + public void testMBeanRegistration() throws JMException, IOException { + int numOfMonitors = 5; + + Stack monitors = new Stack<>(); + for (int i = 0; i < numOfMonitors; i++) { + CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER); + monitor.register(); + monitors.push(monitor); + } + + for (int i = 0; i < numOfMonitors; i++) { + if (i == numOfMonitors - 1) { + Assert.assertTrue(_server.isRegistered(buildObjectName(0))); + } else { + Assert.assertTrue(_server.isRegistered(buildObjectName(numOfMonitors - i - 1))); + } + CustomizedViewMonitor monitor = monitors.pop(); + assert monitor != null; + monitor.unregister(); + if (i == numOfMonitors - 1) { + Assert.assertFalse(_server.isRegistered(buildObjectName(0))); + } else { + Assert.assertFalse(_server.isRegistered(buildObjectName(numOfMonitors - i - 1))); + } + } + } + + @Test + public void testMetricInitialization() throws Exception { + CustomizedViewMonitor monitor = new CustomizedViewMonitor(TEST_CLUSTER); + monitor.register(); + int sum = 0; + for (int i = 0; i < 10; i++) { + monitor.recordUpdateToAggregationLatency(i); + sum += i; + int expectedMax = i; + double expectedMean = sum / (i + 1.0); + Assert.assertTrue(TestHelper.verify(() -> (long) _server.getAttribute(buildObjectName(0), + CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MAX_SUFFIX) == expectedMax, + TestHelper.WAIT_DURATION)); + Assert.assertTrue(TestHelper.verify(() -> (double) _server.getAttribute(buildObjectName(0), + CustomizedViewMonitor.UPDATE_TO_AGGREGATION_LATENCY_GAUGE + MEAN_SUFFIX) == expectedMean, + TestHelper.WAIT_DURATION)); + } + monitor.unregister(); + } +} diff --git a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java index a499d79574..7a94e7f7ff 100644 --- a/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java +++ b/metrics-common/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java @@ -29,5 +29,6 @@ public enum MonitorDomainNames { HelixCallback, RoutingTableProvider, CLMParticipantReport, - Rebalancer + Rebalancer, + AggregatedView }