Skip to content
Permalink
Browse files
Improve ZkClientMonitor and ZkClientPathMonitor performance (#2021)
Previously, regex matches were used, which was inefficient. This commit does this following:
Replace String#matches with more efficient String#contains in ZkClientPathMonitor
Refactor record* methods in ZkClientMonitor to avoid repetition and simplify matching logic
  • Loading branch information
hmhagberg committed Apr 7, 2022
1 parent 858038a commit 08e35dedc2ebf9c8863b0cb136c9e7c9a150e1a0
Showing 2 changed files with 43 additions and 62 deletions.
@@ -20,12 +20,12 @@
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MalformedObjectNameException;
@@ -81,11 +81,11 @@ public ZkClientMonitor(String monitorType, String monitorKey, String monitorInst
_monitorInstanceName = monitorInstanceName;
_monitorRootOnly = monitorRootOnly;

_stateChangeEventCounter = new SimpleDynamicMetric("StateChangeEventCounter", 0l);
_expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter", 0l);
_dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l);
_outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l);
_znodeCompressCounter = new SimpleDynamicMetric("CompressedZnodeWriteCounter", 0l);
_stateChangeEventCounter = new SimpleDynamicMetric<>("StateChangeEventCounter", 0L);
_expiredSessionCounter = new SimpleDynamicMetric<>("ExpiredSessionCounter", 0L);
_dataChangeEventCounter = new SimpleDynamicMetric<>("DataChangeEventCounter", 0L);
_outstandingRequestGauge = new SimpleDynamicMetric<>("OutstandingRequestGauge", 0L);
_znodeCompressCounter = new SimpleDynamicMetric<>("CompressedZnodeWriteCounter", 0L);

if (zkEventThread != null) {
boolean result = setAndInitZkEventThreadMonitor(zkEventThread);
@@ -198,32 +198,12 @@ public void increaseZnodeCompressCounter() {
}

public void recordDataPropagationLatency(String path, long latencyMilliSec) {
if (null == path) {
return;
}
Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
.filter(predefinedPath -> predefinedPath.match(path))
.forEach(predefinedPath -> {
ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
if (zkClientPathMonitor != null) {
zkClientPathMonitor.recordDataPropagationLatency(latencyMilliSec);
}
});
findZkClientPathMonitor(path, (m) -> m.recordDataPropagationLatency(latencyMilliSec));
}

private void record(String path, int bytes, long latencyMilliSec, boolean isFailure,
boolean isRead) {
if (null == path) {
return;
}
Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
.filter(predefinedPath -> predefinedPath.match(path))
.forEach(predefinedPath -> {
ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
if (zkClientPathMonitor != null) {
zkClientPathMonitor.record(bytes, latencyMilliSec, isFailure, isRead);
}
});
findZkClientPathMonitor(path, (m) -> m.record(bytes, latencyMilliSec, isFailure, isRead));
}

public void record(String path, int dataSize, long startTimeMilliSec, AccessType accessType) {
@@ -257,17 +237,7 @@ public void recordFailure(String path, AccessType accessType) {
*/
private void recordAsync(String path, int bytes, long latencyMilliSec, boolean isFailure,
AccessType accessType) {
if (null == path) {
return;
}
Arrays.stream(ZkClientPathMonitor.PredefinedPath.values())
.filter(predefinedPath -> predefinedPath.match(path))
.forEach(predefinedPath -> {
ZkClientPathMonitor zkClientPathMonitor = _zkClientPathMonitorMap.get(predefinedPath);
if (zkClientPathMonitor != null) {
zkClientPathMonitor.recordAsync(bytes, latencyMilliSec, isFailure, accessType);
}
});
findZkClientPathMonitor(path, (m) -> m.recordAsync(bytes, latencyMilliSec, isFailure, accessType));
}

public void recordAsync(String path, int dataSize, long startTimeMilliSec, AccessType accessType) {
@@ -278,6 +248,17 @@ public void recordAsyncFailure(String path, AccessType accessType) {
recordAsync(path, 0, 0, true, accessType);
}

private void findZkClientPathMonitor(String path, Consumer<ZkClientPathMonitor> onMatch) {
if (path == null) {
return;
}
_zkClientPathMonitorMap.forEach((predefinedPath, zkClientPathMonitor) -> {
if (predefinedPath.match(path)) {
onMatch.accept(zkClientPathMonitor);
}
});
}

class ZkThreadMetric extends DynamicMetric<ZkEventThread, ZkEventThread> {
public ZkThreadMetric(ZkEventThread eventThread) {
super("ZkEventThead", eventThread);
@@ -42,16 +42,16 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
private final PredefinedPath _path;

public enum PredefinedPath {
IdealStates(".*/IDEALSTATES/.*"),
Instances(".*/INSTANCES/.*"),
Configs(".*/CONFIGS/.*"),
Controller(".*/CONTROLLER/.*"),
ExternalView(".*/EXTERNALVIEW/.*"),
LiveInstances(".*/LIVEINSTANCES/.*"),
PropertyStore(".*/PROPERTYSTORE/.*"),
CurrentStates(".*/CURRENTSTATES/.*"),
Messages(".*/MESSAGES/.*"),
Root(".*");
IdealStates("/IDEALSTATES/"),
Instances("/INSTANCES/"),
Configs("/CONFIGS/"),
Controller("/CONTROLLER/"),
ExternalView("/EXTERNALVIEW/"),
LiveInstances("/LIVEINSTANCES/"),
PropertyStore("/PROPERTYSTORE/"),
CurrentStates("/CURRENTSTATES/"),
Messages("/MESSAGES/"),
Root("");

private final String _matchString;

@@ -60,7 +60,7 @@ public enum PredefinedPath {
}

public boolean match(String path) {
return path.matches(this._matchString);
return path.contains(this._matchString);
}
}

@@ -135,27 +135,27 @@ public ZkClientPathMonitor(PredefinedPath path, String monitorType, String monit
path.name());

_writeTotalLatencyCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteTotalLatencyCounter.name(), 0L);
_readTotalLatencyCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadTotalLatencyCounter.name(), 0L);
_writeFailureCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.WriteFailureCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteFailureCounter.name(), 0L);
_readFailureCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.ReadFailureCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadFailureCounter.name(), 0L);
_writeAsyncFailureCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.WriteAsyncFailureCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteAsyncFailureCounter.name(), 0L);
_readAsyncFailureCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.ReadAsyncFailureCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadAsyncFailureCounter.name(), 0L);
_writeBytesCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.WriteBytesCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteBytesCounter.name(), 0L);
_readBytesCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.ReadBytesCounter.name(), 0l);
_writeCounter = new SimpleDynamicMetric(PredefinedMetricDomains.WriteCounter.name(), 0l);
_readCounter = new SimpleDynamicMetric(PredefinedMetricDomains.ReadCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadBytesCounter.name(), 0L);
_writeCounter = new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteCounter.name(), 0L);
_readCounter = new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadCounter.name(), 0L);
_writeAsyncCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.WriteAsyncCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.WriteAsyncCounter.name(), 0L);
_readAsyncCounter =
new SimpleDynamicMetric(PredefinedMetricDomains.ReadAsyncCounter.name(), 0l);
new SimpleDynamicMetric<>(PredefinedMetricDomains.ReadAsyncCounter.name(), 0L);

_readLatencyGauge = new HistogramDynamicMetric(PredefinedMetricDomains.ReadLatencyGauge.name(),
new Histogram(

0 comments on commit 08e35de

Please sign in to comment.