Skip to content

Commit

Permalink
Add metricsMaster.getMetrics() and fix fsadmin report metrics
Browse files Browse the repository at this point in the history
Expose all metrics stored in the leading master via
MetricsMasterClient.getMetrics().
Change fsadmin report metrics command to show all metrics.

pr-link: #10782
change-id: cid-e4d311847c74644301b5781679e03153004a935e
  • Loading branch information
LuQQiu committed Jan 31, 2020
1 parent 4285740 commit a5ea8ce
Show file tree
Hide file tree
Showing 30 changed files with 787 additions and 780 deletions.
Expand Up @@ -11,17 +11,14 @@

package alluxio.client.meta;

import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.BackupPRequest;
import alluxio.grpc.MasterInfo;
import alluxio.grpc.MasterInfoField;
import alluxio.grpc.MetricValue;
import alluxio.wire.BackupStatus;
import alluxio.wire.ConfigCheckReport;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -68,13 +65,6 @@ public interface MetaMasterClient extends Closeable {
*/
MasterInfo getMasterInfo(Set<MasterInfoField> masterInfoFields) throws IOException;

/**
* Gets a map of metrics property names and their values from metrics system.
*
* @return a map of metrics information
*/
Map<String, MetricValue> getMetrics() throws AlluxioStatusException;

/**
* Creates a checkpoint in the primary master journal system.
*
Expand Down
Expand Up @@ -13,24 +13,20 @@

import alluxio.AbstractMasterClient;
import alluxio.Constants;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.BackupPRequest;
import alluxio.grpc.BackupStatusPRequest;
import alluxio.grpc.CheckpointPOptions;
import alluxio.grpc.GetConfigReportPOptions;
import alluxio.grpc.GetMasterInfoPOptions;
import alluxio.grpc.GetMetricsPOptions;
import alluxio.grpc.MasterInfo;
import alluxio.grpc.MasterInfoField;
import alluxio.grpc.MetaMasterClientServiceGrpc;
import alluxio.grpc.MetricValue;
import alluxio.grpc.ServiceType;
import alluxio.master.MasterClientContext;
import alluxio.wire.BackupStatus;
import alluxio.wire.ConfigCheckReport;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

Expand Down Expand Up @@ -98,12 +94,6 @@ public MasterInfo getMasterInfo(final Set<MasterInfoField> fields)
.getMasterInfo());
}

@Override
public Map<String, MetricValue> getMetrics() throws AlluxioStatusException {
return retryRPC(
() -> mClient.getMetrics(GetMetricsPOptions.getDefaultInstance()).getMetricsMap());
}

@Override
public String checkpoint() throws IOException {
return retryRPC(() -> mClient
Expand Down
Expand Up @@ -11,11 +11,14 @@

package alluxio.client.metrics;

import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.ClientMetrics;
import alluxio.grpc.MetricValue;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* Interface for a metrics master client.
Expand All @@ -33,4 +36,11 @@ public interface MetricsMasterClient extends Closeable {
* @param metrics a list of client metrics
*/
void heartbeat(final List<ClientMetrics> metrics) throws IOException;

/**
* Gets all the metrics stored in the current master from metric name to metric value.
*
* @return a map of metrics information
*/
Map<String, MetricValue> getMetrics() throws AlluxioStatusException;
}
Expand Up @@ -13,9 +13,12 @@

import alluxio.AbstractMasterClient;
import alluxio.Constants;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.ClearMetricsPRequest;
import alluxio.grpc.ClientMetrics;
import alluxio.grpc.GetMetricsPOptions;
import alluxio.grpc.MetricValue;
import alluxio.grpc.MetricsHeartbeatPOptions;
import alluxio.grpc.MetricsHeartbeatPRequest;
import alluxio.grpc.MetricsMasterClientServiceGrpc;
Expand All @@ -24,6 +27,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;

import javax.annotation.concurrent.ThreadSafe;

Expand Down Expand Up @@ -82,4 +86,10 @@ public void heartbeat(final List<ClientMetrics> metrics) throws IOException {
throw new UnavailableException(e);
}
}

@Override
public Map<String, MetricValue> getMetrics() throws AlluxioStatusException {
return retryRPC(
() -> mClient.getMetrics(GetMetricsPOptions.getDefaultInstance()).getMetricsMap());
}
}
52 changes: 46 additions & 6 deletions core/common/src/main/java/alluxio/metrics/MetricsSystem.java
Expand Up @@ -15,6 +15,7 @@
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.MetricType;
import alluxio.grpc.MetricValue;
import alluxio.metrics.sink.Sink;
import alluxio.util.CommonUtils;
import alluxio.util.ConfigurationUtils;
Expand All @@ -35,6 +36,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -107,7 +109,7 @@ public static InstanceType fromString(String text) {
}

// Supported special instance names.
public static final String CLUSTER = "cluster";
public static final String CLUSTER = "Cluster";

public static final MetricRegistry METRIC_REGISTRY;

Expand Down Expand Up @@ -444,7 +446,7 @@ public static synchronized <T> void registerGaugeIfAbsent(String name, Gauge<T>
*/
private static synchronized List<alluxio.grpc.Metric> reportMetrics(InstanceType instanceType) {
List<alluxio.grpc.Metric> rpcMetrics = new ArrayList<>(20);
for (Metric m : allMetrics(instanceType)) {
for (Metric m : allInstanceMetrics(instanceType)) {
// last reported metrics only need to be tracked for COUNTER metrics
// Store the last metric value which was sent, but the rpc metric returned should only
// contain the difference of the current value, and the last value sent. If it doesn't
Expand Down Expand Up @@ -484,24 +486,62 @@ public static List<alluxio.grpc.Metric> reportClientMetrics() {
* @return all the master's metrics in the format of {@link Metric}
*/
public static List<Metric> allMasterMetrics() {
return allMetrics(InstanceType.MASTER);
return allInstanceMetrics(InstanceType.MASTER);
}

/**
* @return a map of all metrics stored in the current node
* from metric name to {@link MetricValue}
*/
public static Map<String, MetricValue> allMetrics() {
Map<String, MetricValue> metricsMap = new HashMap<>();
for (Entry<String, Gauge> entry : METRIC_REGISTRY.getGauges().entrySet()) {
Object value = entry.getValue().getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder().setMetricType(MetricType.GAUGE);
if (!(value instanceof Number)) {
valueBuilder.setStringValue(value.toString());
} else {
valueBuilder.setDoubleValue(((Number) value).doubleValue());
}
metricsMap.put(entry.getKey(), valueBuilder.build());
}
for (Entry<String, Counter> entry : METRIC_REGISTRY.getCounters().entrySet()) {
metricsMap.put(entry.getKey(), MetricValue.newBuilder()
.setDoubleValue(entry.getValue().getCount()).setMetricType(MetricType.COUNTER).build());
}
for (Entry<String, Meter> entry : METRIC_REGISTRY.getMeters().entrySet()) {
metricsMap.put(entry.getKey(), MetricValue.newBuilder()
.setDoubleValue(entry.getValue().getOneMinuteRate())
.setMetricType(MetricType.METER).build());
}
for (Entry<String, Timer> entry : METRIC_REGISTRY.getTimers().entrySet()) {
metricsMap.put(entry.getKey(), MetricValue.newBuilder()
.setDoubleValue(entry.getValue().getCount()).setMetricType(MetricType.TIMER).build());
}
return metricsMap;
}

/**
* @return all the worker's metrics in the format of {@link Metric}
*/
public static List<Metric> allWorkerMetrics() {
return allMetrics(InstanceType.WORKER);
return allInstanceMetrics(InstanceType.WORKER);
}

/**
* @return all the client's metrics in the format of {@link Metric}
*/
public static List<Metric> allClientMetrics() {
return allMetrics(InstanceType.CLIENT);
return allInstanceMetrics(InstanceType.CLIENT);
}

private static List<Metric> allMetrics(MetricsSystem.InstanceType instanceType) {
/**
* Gets all metrics of the given instance type.
*
* @param instanceType the requested instance type
* @return all metrics of the given instance type
*/
private static List<Metric> allInstanceMetrics(MetricsSystem.InstanceType instanceType) {
List<Metric> metrics = new ArrayList<>();
for (Entry<String, Gauge> entry : METRIC_REGISTRY.getGauges().entrySet()) {
if (entry.getKey().startsWith(instanceType.toString())) {
Expand Down
Expand Up @@ -24,24 +24,16 @@
import alluxio.grpc.GetConfigReportPResponse;
import alluxio.grpc.GetMasterInfoPOptions;
import alluxio.grpc.GetMasterInfoPResponse;
import alluxio.grpc.GetMetricsPOptions;
import alluxio.grpc.GetMetricsPResponse;
import alluxio.grpc.MasterInfo;
import alluxio.grpc.MasterInfoField;
import alluxio.grpc.MetaMasterClientServiceGrpc;
import alluxio.metrics.MetricsSystem;
import alluxio.wire.Address;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -135,36 +127,6 @@ public void getMasterInfo(GetMasterInfoPOptions options,
}, "getMasterInfo", "options=%s", responseObserver, options);
}

@Override
public void getMetrics(GetMetricsPOptions options,
StreamObserver<GetMetricsPResponse> responseObserver) {
RpcUtils.call(LOG, (RpcUtils.RpcCallableThrowsIOException<GetMetricsPResponse>) () -> {

MetricRegistry mr = MetricsSystem.METRIC_REGISTRY;
Map<String, alluxio.grpc.MetricValue> metricsMap = new HashMap<>();

for (Map.Entry<String, Counter> entry : mr.getCounters().entrySet()) {
metricsMap.put(MetricsSystem.stripInstanceAndHost(entry.getKey()), alluxio.grpc.MetricValue
.newBuilder().setLongValue(entry.getValue().getCount()).build());
}

for (Map.Entry<String, Gauge> entry : mr.getGauges().entrySet()) {
Object value = entry.getValue().getValue();
if (value instanceof Integer) {
metricsMap.put(entry.getKey(), alluxio.grpc.MetricValue.newBuilder()
.setLongValue(Long.valueOf((Integer) value)).build());
} else if (value instanceof Long) {
metricsMap.put(entry.getKey(), alluxio.grpc.MetricValue.newBuilder()
.setLongValue((long) value).build());
} else if (value instanceof Double) {
metricsMap.put(entry.getKey(),
alluxio.grpc.MetricValue.newBuilder().setDoubleValue((Double) value).build());
}
}
return GetMetricsPResponse.newBuilder().putAllMetrics(metricsMap).build();
}, "getConfiguration", "options=%s", responseObserver, options);
}

@Override
public void checkpoint(CheckpointPOptions options,
StreamObserver<CheckpointPResponse> responseObserver) {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.grpc.GrpcService;
import alluxio.grpc.MetricValue;
import alluxio.grpc.ServiceType;
import alluxio.heartbeat.HeartbeatContext;
import alluxio.heartbeat.HeartbeatExecutor;
Expand Down Expand Up @@ -217,6 +218,11 @@ public void clearMetrics() {
mMetricsStore.clear();
}

@Override
public Map<String, MetricValue> getMetrics() {
return MetricsSystem.allMetrics();
}

/**
* Heartbeat executor that updates the cluster metrics.
*/
Expand Down
Expand Up @@ -11,10 +11,12 @@

package alluxio.master.metrics;

import alluxio.grpc.MetricValue;
import alluxio.master.Master;
import alluxio.metrics.Metric;

import java.util.List;
import java.util.Map;

/**
* Interface of the metrics master that aggregates the cluster-level metrics from workers and
Expand All @@ -40,6 +42,12 @@ public interface MetricsMaster extends Master {
*/
MetricsMasterClientServiceHandler getMasterServiceHandler();

/**
* @return all metrics stored in the current master in a map
* from metric name to corresponding metric value.
*/
Map<String, MetricValue> getMetrics();

/**
* Handles the worker heartbeat and puts the metrics from an instance with a hostname. If all the
* old metrics associated with this instance will be removed and then replaced by the latest.
Expand Down
Expand Up @@ -14,6 +14,8 @@
import alluxio.RpcUtils;
import alluxio.grpc.ClearMetricsPRequest;
import alluxio.grpc.ClearMetricsPResponse;
import alluxio.grpc.GetMetricsPOptions;
import alluxio.grpc.GetMetricsPResponse;
import alluxio.grpc.MetricsHeartbeatPRequest;
import alluxio.grpc.MetricsHeartbeatPResponse;
import alluxio.grpc.MetricsMasterClientServiceGrpc;
Expand Down Expand Up @@ -76,4 +78,12 @@ public void metricsHeartbeat(MetricsHeartbeatPRequest request,
return MetricsHeartbeatPResponse.getDefaultInstance();
}, "metricsHeartbeat", "request=%s", responseObserver, request);
}

@Override
public void getMetrics(GetMetricsPOptions options,
StreamObserver<GetMetricsPResponse> responseObserver) {
RpcUtils.call(LOG, (RpcUtils.RpcCallableThrowsIOException<GetMetricsPResponse>) () ->
GetMetricsPResponse.newBuilder().putAllMetrics(mMetricsMaster.getMetrics()).build(),
"getMetrics", "options=%s", responseObserver, options);
}
}
16 changes: 0 additions & 16 deletions core/transport/src/grpc/meta_master.proto
Expand Up @@ -96,17 +96,6 @@ message GetMasterInfoPResponse {
optional MasterInfo masterInfo = 1;
}

// This type is used as a union, only one of doubleValue or longValue should be set
message MetricValue {
optional double doubleValue = 1;
optional int64 longValue = 2;
}

message GetMetricsPOptions {}
message GetMetricsPResponse {
map<string, MetricValue> metrics = 1;
}

message CheckpointPOptions {}
message CheckpointPResponse {
// The hostname of the master that did the checkpoint
Expand Down Expand Up @@ -168,11 +157,6 @@ service MetaMasterClientService {
*/
rpc GetMasterInfo(GetMasterInfoPOptions) returns (GetMasterInfoPResponse);

/**
* Returns a map of metrics property names and their values from Alluxio metrics system.
*/
rpc GetMetrics(GetMetricsPOptions) returns (GetMetricsPResponse);

/**
* Creates a checkpoint in the primary master journal system.
*/
Expand Down

0 comments on commit a5ea8ce

Please sign in to comment.