Skip to content

Commit

Permalink
fixup! [FLINK-7812][metrics] Add system resources metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski committed Aug 2, 2018
1 parent 46d7f75 commit a279b1b
Show file tree
Hide file tree
Showing 20 changed files with 174 additions and 149 deletions.
10 changes: 10 additions & 0 deletions docs/_includes/generated/core_configuration.html
Expand Up @@ -22,6 +22,16 @@
<td style="word-wrap: break-word;">"child-first"</td>
<td>Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).</td>
</tr>
<tr>
<td><h5>debug.system-resource-metrics</h5></td>
<td style="word-wrap: break-word;">false</td>
<td></td>
</tr>
<tr>
<td><h5>debug.system-resource-metrics-probing-interval</h5></td>
<td style="word-wrap: break-word;">5000</td>
<td></td>
</tr>
<tr>
<td><h5>io.tmp.dirs</h5></td>
<td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
Expand Down
10 changes: 0 additions & 10 deletions docs/_includes/generated/task_manager_configuration.html
Expand Up @@ -37,16 +37,6 @@
<td style="word-wrap: break-word;">true</td>
<td>Enable SSL support for the taskmanager data transport. This is applicable only when the global flag for internal SSL (security.ssl.internal.enabled) is set to true</td>
</tr>
<tr>
<td><h5>taskmanager.debug.additional-logging</h5></td>
<td style="word-wrap: break-word;">false</td>
<td></td>
</tr>
<tr>
<td><h5>taskmanager.debug.additional-logging-interval</h5></td>
<td style="word-wrap: break-word;">5000</td>
<td></td>
</tr>
<tr>
<td><h5>taskmanager.debug.memory.log</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
19 changes: 11 additions & 8 deletions docs/monitoring/metrics.md
Expand Up @@ -1398,15 +1398,18 @@ Thus, in order to infer the metric identifier:

### System resources

Logging of system resources is disabled by default. When `taskmanager.debug.additional-logging`
is enabled on a TaskManager additional metrics listed below will be available. System
resources metrics are updated periodically and they present average values for a configured
interval (`taskmanager.debug.additional-logging-interval`).
System resources reporting is disabled by default. When `debug.system-resource-metrics`
is enabled additional metrics listed below will be available on TaskManager and JobManager.
System resources metrics are updated periodically and they present average values for a
configured interval (`debug.system-resource-metrics-probing-interval`).

Logging of system resources require couple of optional dependencies to be present on the
System resources reporting requires an optional dependency to be present on the
classpath (for example placed in Flink's `lib` directory):

- `com.github.oshi:oshi-core:3.4.0` (licensed under EPL 1.0 license)

Including it's transitive dependencies:

- `net.java.dev.jna:jna-platform:jar:4.2.2`
- `net.java.dev.jna:jna:jar:4.2.2`

Expand All @@ -1426,7 +1429,7 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
</thead>
<tbody>
<tr>
<th rowspan="12"><strong>TaskManager</strong></th>
<th rowspan="12"><strong>Job-/TaskManager</strong></th>
<td rowspan="12">System.CPU</td>
<td>Usage</td>
<td>Overall % of CPU usage on the machine.</td>
Expand Down Expand Up @@ -1491,7 +1494,7 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
</thead>
<tbody>
<tr>
<th rowspan="4"><strong>TaskManager</strong></th>
<th rowspan="4"><strong>Job-/TaskManager</strong></th>
<td rowspan="2">System.Memory</td>
<td>Available</td>
<td>Available memory in bytes</td>
Expand Down Expand Up @@ -1525,7 +1528,7 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
</thead>
<tbody>
<tr>
<th rowspan="2"><strong>TaskManager</strong></th>
<th rowspan="2"><strong>Job-/TaskManager</strong></th>
<td rowspan="2">System.Network.INTERFACE_NAME</td>
<td>ReceiveRate</td>
<td>Average receive rate in bytes per second</td>
Expand Down
Expand Up @@ -18,12 +18,18 @@

package org.apache.flink.configuration;

import org.apache.flink.api.common.time.Time;

import javax.annotation.Nonnull;

import java.io.File;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.configuration.CoreOptions.SYSTEM_RESOURCE_METRICS;
import static org.apache.flink.configuration.CoreOptions.SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL;

/**
* Utility class for {@link Configuration} related helper functions.
*/
Expand Down Expand Up @@ -69,6 +75,19 @@ public static MemorySize getTaskManagerHeapMemory(Configuration configuration) {
}
}

/**
* @return extracted {@link CoreOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code Optional.empty()} if
* {@link CoreOptions#SYSTEM_RESOURCE_METRICS} are disabled.
*/
public static Optional<Time> getSystemResourceMetricsProbingInterval(Configuration configuration) {
if (!configuration.getBoolean(SYSTEM_RESOURCE_METRICS)) {
return Optional.empty();
} else {
return Optional.of(Time.milliseconds(
configuration.getLong(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL)));
}
}

/**
* Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
Expand Down
Expand Up @@ -296,6 +296,21 @@ public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeou
return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
}

/**
* Whether Flink should report system resource metrics such as machine's CPU, memory or network usage.
*/
public static final ConfigOption<Boolean> SYSTEM_RESOURCE_METRICS =
key("debug.system-resource-metrics")
.defaultValue(false);

/**
* Interval between probing of system resource metrics specified in milliseconds. Has an effect only when
* {@link #SYSTEM_RESOURCE_METRICS} is enabled.
*/
public static final ConfigOption<Long> SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL =
key("debug.system-resource-metrics-probing-interval")
.defaultValue(5000L);

// ------------------------------------------------------------------------
// Distributed architecture
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -185,21 +185,6 @@ public class TaskManagerOptions {
.withDeprecatedKeys("taskmanager.debug.memory.logIntervalMs")
.withDescription("The interval (in ms) for the log thread to log the current memory usage.");

/**
* Whether TaskManager should log additional metrics (such as system resources).
*/
public static final ConfigOption<Boolean> ADDITIONAL_LOGGING =
key("taskmanager.debug.additional-logging")
.defaultValue(false);

/**
* Interval between probing of additional metrics specified in milliseconds. Has an effect only when
* {@link #ADDITIONAL_LOGGING} is enabled.
*/
public static final ConfigOption<Long> ADDITIONAL_LOGGING_INTERVAL =
key("taskmanager.debug.additional-logging-interval")
.defaultValue(5000L);

// ------------------------------------------------------------------------
// Managed Memory Options
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -346,7 +346,10 @@ protected void startClusterComponents(
clusterInformation,
webMonitorEndpoint.getRestBaseUrl());

jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
metricRegistry,
rpcService.getAddress(),
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);

Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.metrics.util;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
Expand Down Expand Up @@ -45,6 +46,9 @@
import java.lang.management.MemoryUsage;
import java.lang.management.ThreadMXBean;
import java.util.List;
import java.util.Optional;

import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;

/**
* Utility class to register pre-defined metric sets.
Expand All @@ -58,7 +62,8 @@ private MetricUtils() {

public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
final MetricRegistry metricRegistry,
final String hostname) {
final String hostname,
final Optional<Time> systemResourceProbeInterval) {
final JobManagerMetricGroup jobManagerMetricGroup = new JobManagerMetricGroup(
metricRegistry,
hostname);
Expand All @@ -68,13 +73,17 @@ public static JobManagerMetricGroup instantiateJobManagerMetricGroup(
// initialize the JM metrics
instantiateStatusMetrics(statusGroup);

if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(jobManagerMetricGroup, systemResourceProbeInterval.get());
}
return jobManagerMetricGroup;
}

public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
MetricRegistry metricRegistry,
TaskManagerLocation taskManagerLocation,
NetworkEnvironment network) {
NetworkEnvironment network,
Optional<Time> systemResourceProbeInterval) {
final TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(
metricRegistry,
taskManagerLocation.getHostname(),
Expand All @@ -89,6 +98,9 @@ public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
.addGroup("Network");
instantiateNetworkMetrics(networkGroup, network);

if (systemResourceProbeInterval.isPresent()) {
instantiateSystemMetrics(taskManagerMetricGroup, systemResourceProbeInterval.get());
}
return taskManagerMetricGroup;
}

Expand Down

0 comments on commit a279b1b

Please sign in to comment.