Skip to content

Commit

Permalink
[#640] feat(netty): Metric system for netty server (#1041)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

add NettyMetrics for StreamSever 

### Why are the changes needed?

Fix: #640

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
UT

Co-authored-by: jam.xu <jam.xu@vipshop.com>
  • Loading branch information
xumanbu and xumanbu committed Jul 28, 2023
1 parent 44fa8fb commit a5ba479
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

package org.apache.uniffle.common.metrics;

import java.util.Map;

import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;

public abstract class GRPCMetrics {
public abstract class GRPCMetrics extends RPCMetrics {
// Grpc server internal executor metrics
public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
"grpcServerExecutorActiveThreads";
Expand All @@ -43,34 +37,17 @@ public abstract class GRPCMetrics {
private static final String GRPC_OPEN = "grpc_open";
private static final String GRPC_TOTAL = "grpc_total";

private boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap = JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Gauge.Child gaugeGrpcOpen;
protected Counter.Child counterGrpcTotal;
protected MetricsManager metricsManager;
protected String tags;

public GRPCMetrics(String tags) {
this.tags = tags;
super(tags);
}

public abstract void registerMetrics();

public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
registerGeneralMetrics();
registerMetrics();
isRegistered = true;
}
}

private void registerGeneralMetrics() {
@Override
public void registerGeneralMetrics() {
gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
gaugeMap.putIfAbsent(
Expand All @@ -84,92 +61,21 @@ private void registerGeneralMetrics() {
metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
}

public void setGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
}
}
}

public void incGauge(String tag) {
incGauge(tag, 1);
}

public void incGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.inc(value);
}
}
}

public void decGauge(String tag) {
decGauge(tag, 1);
}

public void decGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.dec(value);
}
}
}

public void incCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
}
Counter.Child counter = counterMap.get(methodName);
if (counter != null) {
counter.inc();
}
super.incCounter(methodName);
gaugeGrpcOpen.inc();
counterGrpcTotal.inc();
}
}

public void decCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
}
super.decCounter(methodName);
gaugeGrpcOpen.dec();
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}

public Map<String, Counter.Child> getCounterMap() {
return counterMap;
}

public Map<String, Gauge.Child> getGaugeMap() {
return gaugeMap;
}

public Gauge.Child getGaugeGrpcOpen() {
return gaugeGrpcOpen;
}
Expand All @@ -178,14 +84,6 @@ public Counter.Child getCounterGrpcTotal() {
return counterGrpcTotal;
}

public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}

public static GRPCMetrics getEmptyGRPCMetrics() {
return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

public abstract class NettyMetrics extends RPCMetrics {

private static final String NETTY_ACTIVE_CONNECTION = "netty_active_connection";
private static final String NETTY_HANDLE_EXCEPTION = "netty_handle_exception";

protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;

public NettyMetrics(String tags) {
super(tags);
}

@Override
public void registerGeneralMetrics() {
gaugeNettyActiveConn = metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
counterNettyException = metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
}

public Counter.Child getCounterNettyException() {
return counterNettyException;
}

public Gauge.Child getGaugeNettyActiveConn() {
return gaugeNettyActiveConn;
}
}
149 changes: 149 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import java.util.Map;

import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;

public abstract class RPCMetrics {
protected boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap = JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap = JavaUtils.newConcurrentMap();
protected MetricsManager metricsManager;
protected String tags;

public RPCMetrics(String tags) {
this.tags = tags;
}

public abstract void registerMetrics();

public abstract void registerGeneralMetrics();

public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
registerGeneralMetrics();
registerMetrics();
isRegistered = true;
}
}

public void setGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
}
}
}

public void incGauge(String tag) {
incGauge(tag, 1);
}

public void incGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.inc(value);
}
}
}

public void decGauge(String tag) {
decGauge(tag, 1);
}

public void decGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.dec(value);
}
}
}

public void incCounter(String metricKey) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(metricKey);
if (gauge != null) {
gauge.inc();
}
Counter.Child counter = counterMap.get(metricKey);
if (counter != null) {
counter.inc();
}
}
}

public void decCounter(String metricKey) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(metricKey);
if (gauge != null) {
gauge.dec();
}
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}

public Map<String, Counter.Child> getCounterMap() {
return counterMap;
}

public Map<String, Gauge.Child> getGaugeMap() {
return gaugeMap;
}

public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
}
Loading

0 comments on commit a5ba479

Please sign in to comment.