diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index afec6daea46c4..98c61a4d91e17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConfig; @@ -48,6 +50,10 @@ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); + private static final String METRIC_GROUP_NETWORK = "Network"; + private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; + private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments"; + private final Object lock = new Object(); private final NetworkEnvironmentConfiguration config; @@ -62,7 +68,10 @@ public class NetworkEnvironment { private boolean isShutdown; - public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) { + public NetworkEnvironment( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup) { this.config = checkNotNull(config); this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); @@ -78,9 +87,21 @@ public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPubli this.taskEventPublisher = checkNotNull(taskEventPublisher); + registerNetworkMetrics(metricGroup, networkBufferPool); + isShutdown = false; } + private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { + checkNotNull(metricGroup); + + MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK); + networkGroup.>gauge(METRIC_TOTAL_MEMORY_SEGMENT, + networkBufferPool::getTotalNumberOfMemorySegments); + networkGroup.>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, + networkBufferPool::getNumberOfAvailableMemorySegments); + } + // -------------------------------------------------------------------------------------------- // Properties // -------------------------------------------------------------------------------------------- @@ -93,6 +114,7 @@ public ConnectionManager getConnectionManager() { return connectionManager; } + @VisibleForTesting public NetworkBufferPool getNetworkBufferPool() { return networkBufferPool; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java index e0adcffad8216..c30ef62470df8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java @@ -25,15 +25,13 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.BootstrapTools; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -93,23 +91,19 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup( public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup( MetricRegistry metricRegistry, - TaskManagerLocation taskManagerLocation, - NetworkEnvironment network, + String hostName, + ResourceID resourceID, Optional