Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.network;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
Expand Down Expand Up @@ -48,6 +50,10 @@ public class NetworkEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);

private static final String METRIC_GROUP_NETWORK = "Network";
private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";

private final Object lock = new Object();

private final NetworkEnvironmentConfiguration config;
Expand All @@ -62,7 +68,10 @@ public class NetworkEnvironment {

private boolean isShutdown;

public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPublisher taskEventPublisher) {
public NetworkEnvironment(
NetworkEnvironmentConfiguration config,
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup) {
this.config = checkNotNull(config);

this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
Expand All @@ -78,9 +87,21 @@ public NetworkEnvironment(NetworkEnvironmentConfiguration config, TaskEventPubli

this.taskEventPublisher = checkNotNull(taskEventPublisher);

registerNetworkMetrics(metricGroup, networkBufferPool);

isShutdown = false;
}

private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);

MetricGroup networkGroup = metricGroup.addGroup(METRIC_GROUP_NETWORK);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT,
networkBufferPool::getTotalNumberOfMemorySegments);
networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT,
networkBufferPool::getNumberOfAvailableMemorySegments);
}

// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
Expand All @@ -93,6 +114,7 @@ public ConnectionManager getConnectionManager() {
return connectionManager;
}

@VisibleForTesting
public NetworkBufferPool getNetworkBufferPool() {
return networkBufferPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -93,23 +91,19 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup(

public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry,
TaskManagerLocation taskManagerLocation,
NetworkEnvironment network,
String hostName,
ResourceID resourceID,
Optional<Time> systemResourceProbeInterval) {
final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
metricRegistry,
taskManagerLocation.getHostname(),
taskManagerLocation.getResourceID().toString());
hostName,
resourceID.toString());

MetricGroup statusGroup = taskManagerMetricGroup.addGroup(METRIC_GROUP_STATUS_NAME);

// Initialize the TM metrics
instantiateStatusMetrics(statusGroup);

MetricGroup networkGroup = statusGroup
.addGroup("Network");
instantiateNetworkMetrics(networkGroup, network);

if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
}
Expand Down Expand Up @@ -139,15 +133,6 @@ public static RpcService startMetricsRpcService(Configuration configuration, Str
new BootstrapTools.FixedThreadPoolExecutorConfiguration(1, 1, threadPriority));
}

private static void instantiateNetworkMetrics(
MetricGroup metrics,
final NetworkEnvironment network) {

final NetworkBufferPool networkBufferPool = network.getNetworkBufferPool();
metrics.<Integer, Gauge<Integer>>gauge("TotalMemorySegments", networkBufferPool::getTotalNumberOfMemorySegments);
metrics.<Integer, Gauge<Integer>>gauge("AvailableMemorySegments", networkBufferPool::getNumberOfAvailableMemorySegments);
}

private static void instantiateClassLoaderMetrics(MetricGroup metrics) {
final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean();
metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", mxBean::getTotalLoadedClassCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.MemoryLogger;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
Expand Down Expand Up @@ -357,19 +358,20 @@ public static TaskExecutor startTaskManager(
remoteAddress,
localCommunicationOnly);

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
TaskManagerLocation.getHostName(remoteAddress),
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
taskManagerMetricGroup,
resourceID,
rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
EnvironmentInformation.getMaxJvmHeapMemory());

TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
taskManagerServices.getTaskManagerLocation(),
taskManagerServices.getNetworkEnvironment(),
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
Expand Down Expand Up @@ -220,16 +221,18 @@ public void shutDown() throws FlinkException {
/**
* Creates and returns the task manager services.
*
* @param resourceID resource ID of the task manager
* @param taskManagerServicesConfiguration task manager configuration
* @param taskIOExecutor executor for async IO operations.
* @param taskManagerMetricGroup metric group of the task manager
* @param resourceID resource ID of the task manager
* @param taskIOExecutor executor for async IO operations
* @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory
* @param maxJvmHeapMemory the maximum JVM heap size
* @return task manager components
* @throws Exception
*/
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
TaskManagerMetricGroup taskManagerMetricGroup,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
Expand All @@ -241,7 +244,7 @@ public static TaskManagerServices fromConfiguration(
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

final NetworkEnvironment network = new NetworkEnvironment(
taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher);
taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup);
network.start();

final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.runtime.taskmanager;

import java.net.InetAddress;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.NetUtils;

Expand All @@ -28,6 +26,8 @@

import javax.annotation.Nonnull;

import java.net.InetAddress;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -82,30 +82,9 @@ public TaskManagerLocation(ResourceID resourceID, InetAddress inetAddress, int d
this.dataPort = dataPort;

// get FQDN hostname on this TaskManager.
String fqdnHostName;
try {
fqdnHostName = this.inetAddress.getCanonicalHostName();
}
catch (Throwable t) {
LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
"for HDFS files) may be non-local when the canonical hostname is missing.");
LOG.debug("getCanonicalHostName() Exception:", t);
fqdnHostName = this.inetAddress.getHostAddress();
}
this.fqdnHostName = fqdnHostName;
this.fqdnHostName = getFqdnHostName(inetAddress);

if (this.fqdnHostName.equals(this.inetAddress.getHostAddress())) {
// this happens when the name lookup fails, either due to an exception,
// or because no hostname can be found for the address
// take IP textual representation
this.hostName = this.fqdnHostName;
LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+ "Local input split assignment (such as for HDFS files) may be impacted.",
this.inetAddress.getHostAddress());
}
else {
this.hostName = NetUtils.getHostnameFromFQDN(this.fqdnHostName);
}
this.hostName = getHostName(inetAddress);

this.stringRepresentation = String.format(
"%s @ %s (dataPort=%d)", resourceID, fqdnHostName, dataPort);
Expand Down Expand Up @@ -185,6 +164,50 @@ public String getHostname() {
return hostName;
}

/**
* Gets the fully qualified hostname of the TaskManager based on the network address.
*
* @param inetAddress the network address that the TaskManager binds its sockets to
* @return fully qualified hostname of the TaskManager
*/
private static String getFqdnHostName(InetAddress inetAddress) {
String fqdnHostName;
try {
fqdnHostName = inetAddress.getCanonicalHostName();
} catch (Throwable t) {
LOG.warn("Unable to determine the canonical hostname. Input split assignment (such as " +
"for HDFS files) may be non-local when the canonical hostname is missing.");
LOG.debug("getCanonicalHostName() Exception:", t);
fqdnHostName = inetAddress.getHostAddress();
}

return fqdnHostName;
}

/**
* Gets the hostname of the TaskManager based on the network address.
*
* @param inetAddress the network address that the TaskManager binds its sockets to
* @return hostname of the TaskManager
*/
public static String getHostName(InetAddress inetAddress) {
String hostName;
String fqdnHostName = getFqdnHostName(inetAddress);

if (fqdnHostName.equals(inetAddress.getHostAddress())) {
// this happens when the name lookup fails, either due to an exception,
// or because no hostname can be found for the address
// take IP textual representation
hostName = fqdnHostName;
LOG.warn("No hostname could be resolved for the IP address {}, using IP address as host name. "
+ "Local input split assignment (such as for HDFS files) may be impacted.", inetAddress.getHostAddress());
} else {
hostName = NetUtils.getHostnameFromFQDN(fqdnHostName);
}

return hostName;
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
Expand Down
Loading