Skip to content
Permalink
Browse files
HELIX-708: adding basic metrics to HelixViewAggregator
RB=1237803
BUG=HELIX-708
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
  • Loading branch information
zhan849 authored and junkaixue committed Apr 11, 2022
1 parent 8d9db16 commit 87a5f1515f7abb94cb3918edb4167418a7fe6237
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 11 deletions.
@@ -20,6 +20,7 @@
*/

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -39,6 +40,7 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.view.common.ViewAggregatorEventAttributes;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -63,18 +65,21 @@ public class HelixViewAggregator implements ClusterConfigChangeListener {
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;
private ViewAggregatorMonitor _monitor;

public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
_dataProviderMap = new HashMap<>();
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
_refreshViewCluster = false;
_refreshViewCluster = new AtomicBoolean(false);
_monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
@Override
public void handleEvent(ClusterEvent event) {
handleSourceClusterEvent(event);
_monitor.recordProcessedSourceEvent();
}
};

@@ -91,6 +96,8 @@ public void handleEvent(ClusterEvent event) {
* @throws Exception
*/
public void start() throws Exception {
_monitor.register();

// Start workers
_aggregator.start();
_viewConfigProcessor.start();
@@ -111,6 +118,8 @@ public void start() throws Exception {
}

public void shutdown() {
boolean success = true;

// Stop all workers
_aggregator.interrupt();
_viewConfigProcessor.interrupt();
@@ -129,6 +138,7 @@ public void shutdown() {
} catch (ZkInterruptedException zkintr) {
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
success = false;
logger.error(String
.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
@@ -141,12 +151,17 @@ public void shutdown() {
try {
provider.shutdown();
} catch (Exception e) {
success = false;
logger.error(String
.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
_viewClusterName), e);
}
}
logger.info("HelixViewAggregator shutdown cleanly");

logger.info("Unregistering monitor.");
_monitor.unregister();

logger.info("HelixViewAggregator shutdown " + (success ? "cleanly" : "with error"));
}

@Override
@@ -207,7 +222,8 @@ private synchronized void handleViewClusterConfigChange(ClusterEvent event) {
action.computeAction();

// If we fail to process action and should retry, re-queue event to retry
if (processViewClusterConfigUpdate(action)) {
if (!processViewClusterConfigUpdate(action)) {
_monitor.recordViewConfigProcessFailure();
event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
_viewConfigProcessor.queueEvent(event);
} else {
@@ -244,10 +260,10 @@ private void resetTimer(long triggerIntervalMs) {
* Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
* create/delete SourceClusterDataProvider in data provider map
*
* @return true if action failed and should retry, else false
* @return true if success else false
*/
private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
boolean shouldRetry = false;
boolean success = true;
for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
String key = generateDataProviderMapKey(source);
logger.info("Deleting data provider " + key);
@@ -256,7 +272,7 @@ private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction a
_dataProviderMap.get(key).shutdown();
_dataProviderMap.remove(key);
} catch (Exception e) {
shouldRetry = true;
success = false;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
}
}
@@ -276,7 +292,7 @@ private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction a
provider.setup();
_dataProviderMap.put(key, provider);
} catch (Exception e) {
shouldRetry = true;
success = false;
logger.warn(String.format("Failed to create data provider %s, will retry", key));
}
}
@@ -286,16 +302,85 @@ private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction a
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
resetTimer(action.getCurrentRefreshPeriodMs());
}
return shouldRetry;
return success;
}

/**
* Use ViewClusterRefresher to refresh ViewCluster.
* @return true if needs retry, else false
*/
private synchronized boolean refreshViewCluster() {
// TODO: Implement refresh logic
return false;
private void refreshViewCluster() {
long startRefreshMs = System.currentTimeMillis();
logger.info(String.format("START RefreshViewCluster: Refresh view cluster %s at timestamp %s",
_viewClusterName, startRefreshMs));
boolean dataProviderFailure = false;
boolean viewClusterFailure = false;

// Generate a view of providers so refresh won't block cluster config update
// When a data provider is shutdown while we are reloading cache / generating diff,
// Exception will be thrown out and we retry during next refresh cycle
Set<SourceClusterDataProvider> providerView;
synchronized (_dataProviderMap) {
_refreshViewCluster.set(false);
providerView = new HashSet<>(_dataProviderMap.values());
}

// Refresh data providers
// TODO: the following steps can be parallelized
for (SourceClusterDataProvider provider : providerView) {
try {
provider.refreshCache();
} catch (Exception e) {
logger.warn("Caught exception when refreshing source cluster cache. Abort refresh.", e);
_refreshViewCluster.set(true);
dataProviderFailure = true;

// Skip refresh view cluster when we cannot successfully refresh
// source cluster caches
break;
}
}

// Refresh properties in view cluster
if (!dataProviderFailure) {
_viewClusterRefresher.updateProviderView(providerView);
for (PropertyType propertyType : ViewClusterSourceConfig.getValidPropertyTypes()) {
logger.info(String
.format("Refreshing property %s in view cluster %s", propertyType, _viewClusterName));
try {
// We try to refresh all properties with best effort, and don't break when
// failed to refresh a particular property
if (!_viewClusterRefresher.refreshPropertiesInViewCluster(propertyType)) {
viewClusterFailure = true;
_refreshViewCluster.set(true);
}
} catch (IllegalArgumentException e) {
// Invalid property... not expected! Something wrong with code, should not retry
logger.error(String.format("Failed to refresh property in view cluster %s with exception",
_viewClusterName), e);
}
}
}

recordRefreshResults(dataProviderFailure, viewClusterFailure,
System.currentTimeMillis() - startRefreshMs);
}

private void recordRefreshResults(boolean recordSourceFailure, boolean recordViewFailure,
long latency) {
if (recordSourceFailure) {
_monitor.recordReadSourceFailure();
}

if (recordViewFailure) {
_monitor.recordViewRefreshFailure();
}

_monitor.recordRefreshViewLatency(latency);

logger.info(String
.format("END RefreshViewCluster: finished refresh %s. Time spent: %s ms. Success: %s",
_viewClusterName, latency, !recordSourceFailure && !recordViewFailure));
}

private static String generateHelixManagerInstanceName(String viewClusterName) {
@@ -0,0 +1,89 @@
package org.apache.helix.view.monitoring;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;

public class ViewAggregatorMonitor extends DynamicMBeanProvider {
/* package */ static final String MBEAN_DOMAIN = MonitorDomainNames.ClusterStatus.name();
/* package */ static final String MONITOR_KEY = "ViewClusterName";
private static final String MBEAN_DESCRIPTION = "Monitor helix view aggregator activity";
private final String _clusterName;
private final String _sensorName;

// Counters
private final SimpleDynamicMetric<Long> _refreshViewFailureCounter;
private final SimpleDynamicMetric<Long> _sourceReadFailureCounter;
private final SimpleDynamicMetric<Long> _processViewConfigFailureCounter;
private final SimpleDynamicMetric<Long> _processedSourceClusterEventCounter;

// Gauges
private final HistogramDynamicMetric _viewRefreshLatencyGauge;

public ViewAggregatorMonitor(String clusterName) {
_clusterName = clusterName;
_sensorName = String.format("%s.%s.%s", MBEAN_DOMAIN, MONITOR_KEY, clusterName);

// Initialize metrics
_refreshViewFailureCounter =
new SimpleDynamicMetric<>("ViewClusterRefreshFailureCounter", 0L);
_sourceReadFailureCounter =
new SimpleDynamicMetric<>("SourceClusterRefreshFailureCounter", 0L);
_processedSourceClusterEventCounter =
new SimpleDynamicMetric<>("ProcessedSourceClusterEventCounter", 0L);
_processViewConfigFailureCounter =
new SimpleDynamicMetric<>("ProcessViewConfigFailureCounter", 0L);
_viewRefreshLatencyGauge = new HistogramDynamicMetric("ViewClusterRefreshDurationGauge",
new Histogram(
new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
}

public void recordViewRefreshFailure() {
_refreshViewFailureCounter.updateValue(_refreshViewFailureCounter.getValue() + 1);
}

public void recordViewConfigProcessFailure() {
_processViewConfigFailureCounter.updateValue(_processViewConfigFailureCounter.getValue() + 1);
}

public void recordReadSourceFailure() {
_sourceReadFailureCounter.updateValue(_sourceReadFailureCounter.getValue() + 1);
}

public void recordProcessedSourceEvent() {
_processedSourceClusterEventCounter
.updateValue(_processedSourceClusterEventCounter.getValue() + 1);
}

public void recordRefreshViewLatency(long latency) {
_viewRefreshLatencyGauge.updateValue(latency);
}

@Override
public String getSensorName() {
return _sensorName;
}

@Override
public DynamicMBeanProvider register() throws JMException {
List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
attributeList.add(_sourceReadFailureCounter);
attributeList.add(_refreshViewFailureCounter);
attributeList.add(_processViewConfigFailureCounter);
attributeList.add(_processedSourceClusterEventCounter);
attributeList.add(_viewRefreshLatencyGauge);

doRegister(attributeList, MBEAN_DESCRIPTION, MBeanRegistrar
.buildObjectName(MBEAN_DOMAIN, MONITOR_KEY, _clusterName));
return this;
}
}
@@ -0,0 +1,92 @@
package org.apache.helix.view.monitoring;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.lang.management.ManagementFactory;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestViewAggregatorMonitor {
private static final MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();

@Test
public void testViewAggregatorMonitorMBeanRegistration() throws Exception {
String cluster1 = "cluster1";
String cluster2 = "cluster2";
ObjectName name1 = generateObjectName(cluster1);
ObjectName name2 = generateObjectName(cluster2);

ViewAggregatorMonitor monitor1 = new ViewAggregatorMonitor(cluster1);
monitor1.register();

ViewAggregatorMonitor monitor2 = new ViewAggregatorMonitor(cluster2);
monitor2.register();

Assert.assertTrue(_beanServer.isRegistered(name1));
Assert.assertTrue(_beanServer.isRegistered(name2));

monitor1.unregister();
monitor2.unregister();

Assert.assertFalse(_beanServer.isRegistered(name1));
Assert.assertFalse(_beanServer.isRegistered(name2));
}

@Test
public void testViewAggregatorMonitorDataRecording() throws Exception {
String cluster = "testViewCluster";
ViewAggregatorMonitor monitor = new ViewAggregatorMonitor(cluster);
monitor.register();
ObjectName objectName = generateObjectName(cluster);

monitor.recordProcessedSourceEvent();
monitor.recordViewConfigProcessFailure();
monitor.recordViewRefreshFailure();
monitor.recordReadSourceFailure();
monitor.recordRefreshViewLatency(100);

Assert.assertEquals(
(long) _beanServer.getAttribute(objectName, "ViewClusterRefreshFailureCounter"), 1);
Assert.assertEquals(
(long) _beanServer.getAttribute(objectName, "SourceClusterRefreshFailureCounter"), 1);
Assert.assertEquals(
(long) _beanServer.getAttribute(objectName, "ProcessedSourceClusterEventCounter"), 1);
Assert.assertEquals(
(long) _beanServer.getAttribute(objectName, "ProcessViewConfigFailureCounter"), 1);
Assert.assertEquals(
(long) _beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Max"), 100);
Assert
.assertEquals(_beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Mean"),
100.0);
Assert.assertEquals(
_beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.StdDev"), 0.0);
}

private ObjectName generateObjectName(String viewClusterName) throws JMException {
return MBeanRegistrar
.buildObjectName(ViewAggregatorMonitor.MBEAN_DOMAIN, ViewAggregatorMonitor.MONITOR_KEY,
viewClusterName);
}

}

0 comments on commit 87a5f15

Please sign in to comment.