Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#640] feat(netty): Metric system for netty server #1041

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

package org.apache.uniffle.common.metrics;

import java.util.Map;

import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;

public abstract class GRPCMetrics {
public abstract class GRPCMetrics extends RPCMetrics {
// Grpc server internal executor metrics
public static final String GRPC_SERVER_EXECUTOR_ACTIVE_THREADS_KEY =
"grpcServerExecutorActiveThreads";
Expand All @@ -43,34 +37,17 @@ public abstract class GRPCMetrics {
private static final String GRPC_OPEN = "grpc_open";
private static final String GRPC_TOTAL = "grpc_total";

private boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap = JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Gauge.Child gaugeGrpcOpen;
protected Counter.Child counterGrpcTotal;
protected MetricsManager metricsManager;
protected String tags;

public GRPCMetrics(String tags) {
this.tags = tags;
super(tags);
}

public abstract void registerMetrics();

public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
registerGeneralMetrics();
registerMetrics();
isRegistered = true;
}
}

private void registerGeneralMetrics() {
@Override
public void registerGeneralMetrics() {
gaugeGrpcOpen = metricsManager.addLabeledGauge(GRPC_OPEN);
counterGrpcTotal = metricsManager.addLabeledCounter(GRPC_TOTAL);
gaugeMap.putIfAbsent(
Expand All @@ -84,92 +61,21 @@ private void registerGeneralMetrics() {
metricsManager.addLabeledGauge(GRPC_SERVER_CONNECTION_NUMBER));
}

public void setGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
}
}
}

public void incGauge(String tag) {
incGauge(tag, 1);
}

public void incGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.inc(value);
}
}
}

public void decGauge(String tag) {
decGauge(tag, 1);
}

public void decGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.dec(value);
}
}
}

public void incCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
}
Counter.Child counter = counterMap.get(methodName);
if (counter != null) {
counter.inc();
}
super.incCounter(methodName);
gaugeGrpcOpen.inc();
counterGrpcTotal.inc();
}
}

public void decCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
}
super.decCounter(methodName);
gaugeGrpcOpen.dec();
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}

public Map<String, Counter.Child> getCounterMap() {
return counterMap;
}

public Map<String, Gauge.Child> getGaugeMap() {
return gaugeMap;
}

public Gauge.Child getGaugeGrpcOpen() {
return gaugeGrpcOpen;
}
Expand All @@ -178,14 +84,6 @@ public Counter.Child getCounterGrpcTotal() {
return counterGrpcTotal;
}

public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}

public static GRPCMetrics getEmptyGRPCMetrics() {
return new EmptyGRPCMetrics(Constants.SHUFFLE_SERVER_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;

public abstract class NettyMetrics extends RPCMetrics {

private static final String NETTY_ACTIVE_CONNECTION = "netty_active_connection";
private static final String NETTY_HANDLE_EXCEPTION = "netty_handle_exception";

protected Gauge.Child gaugeNettyActiveConn;
protected Counter.Child counterNettyException;

public NettyMetrics(String tags) {
super(tags);
}

@Override
public void registerGeneralMetrics() {
gaugeNettyActiveConn = metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
counterNettyException = metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
}

public void incRequest(String className) {
if (isRegistered) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse RPCMetrics#incCounter? seems same code logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good advice.

Gauge.Child gauge = gaugeMap.get(className);
if (gauge != null) {
gauge.inc();
}
Counter.Child counter = counterMap.get(className);
if (counter != null) {
counter.inc();
}
}
}

public void decRequest(String className) {
if (isRegistered) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix it.

Gauge.Child gauge = gaugeMap.get(className);
if (gauge != null) {
gauge.dec();
}
}
}

public Counter.Child getCounterNettyException() {
return counterNettyException;
}

public Gauge.Child getGaugeNettyActiveConn() {
return gaugeNettyActiveConn;
}
}
149 changes: 149 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/metrics/RPCMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import java.util.Map;

import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Summary;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;

public abstract class RPCMetrics {
protected boolean isRegistered = false;
protected Map<String, Counter.Child> counterMap = JavaUtils.newConcurrentMap();
protected Map<String, Gauge.Child> gaugeMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> transportTimeSummaryMap = JavaUtils.newConcurrentMap();
protected Map<String, Summary.Child> processTimeSummaryMap = JavaUtils.newConcurrentMap();
protected MetricsManager metricsManager;
protected String tags;

public RPCMetrics(String tags) {
this.tags = tags;
}

public abstract void registerMetrics();

public abstract void registerGeneralMetrics();

public void register(CollectorRegistry collectorRegistry) {
if (!isRegistered) {
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
registerGeneralMetrics();
registerMetrics();
isRegistered = true;
}
}

public void setGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.set(value);
}
}
}

public void incGauge(String tag) {
incGauge(tag, 1);
}

public void incGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.inc(value);
}
}
}

public void decGauge(String tag) {
decGauge(tag, 1);
}

public void decGauge(String tag, double value) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(tag);
if (gauge != null) {
gauge.dec(value);
}
}
}

public void incCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.inc();
}
Counter.Child counter = counterMap.get(methodName);
if (counter != null) {
counter.inc();
}
}
}

public void decCounter(String methodName) {
if (isRegistered) {
Gauge.Child gauge = gaugeMap.get(methodName);
if (gauge != null) {
gauge.dec();
}
}
}

public void recordTransportTime(String methodName, long transportTimeInMillionSecond) {
Summary.Child summary = transportTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(transportTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public void recordProcessTime(String methodName, long processTimeInMillionSecond) {
Summary.Child summary = processTimeSummaryMap.get(methodName);
if (summary != null) {
summary.observe(processTimeInMillionSecond / Constants.MILLION_SECONDS_PER_SECOND);
}
}

public CollectorRegistry getCollectorRegistry() {
return metricsManager.getCollectorRegistry();
}

public Map<String, Counter.Child> getCounterMap() {
return counterMap;
}

public Map<String, Gauge.Child> getGaugeMap() {
return gaugeMap;
}

public Map<String, Summary.Child> getTransportTimeSummaryMap() {
return transportTimeSummaryMap;
}

public Map<String, Summary.Child> getProcessTimeSummaryMap() {
return processTimeSummaryMap;
}
}
Loading