Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25697][Runtime / Metrics] introduce port url config for prometheus push gateway a… #18668

Merged
merged 3 commits into from Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/content.zh/docs/deployment/metric_reporters.md
Expand Up @@ -192,8 +192,7 @@ Example configuration:

```yaml
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
Expand Down
3 changes: 1 addition & 2 deletions docs/content/docs/deployment/metric_reporters.md
Expand Up @@ -192,8 +192,7 @@ Example configuration:

```yaml
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.hostUrl: http://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
Expand Down
Expand Up @@ -27,23 +27,17 @@
<td>Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., <code class="highlighter-rouge">k1=v1;k2=v2</code>. Please ensure that your grouping key meets the <a href="https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels">Prometheus requirements</a>.</td>
</tr>
<tr>
<td><h5>host</h5></td>
<td><h5>hostUrl</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The PushGateway server host.</td>
<td>The PushGateway server host URL including scheme, host name, and port.</td>
</tr>
<tr>
<td><h5>jobName</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The job name under which metrics will be pushed</td>
</tr>
<tr>
<td><h5>port</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>The PushGateway server port.</td>
</tr>
<tr>
<td><h5>randomJobNameSuffix</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.metrics.prometheus;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
Expand All @@ -29,6 +30,7 @@
import io.prometheus.client.exporter.PushGateway;

import java.io.IOException;
import java.net.URL;
import java.util.Map;

/**
Expand All @@ -44,14 +46,15 @@ public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter im
private final String jobName;
private final Map<String, String> groupingKey;
private final boolean deleteOnShutdown;
@VisibleForTesting final URL hostUrl;

PrometheusPushGatewayReporter(
String host,
int port,
URL hostUrl,
String jobName,
Map<String, String> groupingKey,
final boolean deleteOnShutdown) {
this.pushGateway = new PushGateway(host + ':' + port);
this.hostUrl = hostUrl;
this.pushGateway = new PushGateway(hostUrl);
this.jobName = Preconditions.checkNotNull(jobName);
this.groupingKey = Preconditions.checkNotNull(groupingKey);
this.deleteOnShutdown = deleteOnShutdown;
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -35,6 +37,7 @@
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.DELETE_ON_SHUTDOWN;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.GROUPING_KEY;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.JOB_NAME;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;
Expand Down Expand Up @@ -63,9 +66,18 @@ public PrometheusPushGatewayReporter createMetricReporter(Properties properties)
parseGroupingKey(
metricConfig.getString(GROUPING_KEY.key(), GROUPING_KEY.defaultValue()));

if (host == null || host.isEmpty() || port < 1) {
throw new IllegalArgumentException(
"Invalid host/port configuration. Host: " + host + " Port: " + port);
String hostUrlConfig = metricConfig.getString(HOST_URL.key(), HOST_URL.defaultValue());

final String hostUrl;
if (!StringUtils.isNullOrWhitespaceOnly(hostUrlConfig)) {
hostUrl = hostUrlConfig;
} else {
if (StringUtils.isNullOrWhitespaceOnly(host) || port < 1) {
throw new IllegalArgumentException(
"Invalid host/port configuration. Host: " + host + " Port: " + port);
} else {
hostUrl = "http://" + host + ":" + port;
}
}

String jobName = configuredJobName;
Expand All @@ -74,16 +86,19 @@ public PrometheusPushGatewayReporter createMetricReporter(Properties properties)
}

LOG.info(
"Configured PrometheusPushGatewayReporter with {host:{}, port:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}}",
host,
port,
"Configured PrometheusPushGatewayReporter with {hostUrl:{}, jobName:{}, randomJobNameSuffix:{}, deleteOnShutdown:{}, groupingKey:{}}",
hostUrl,
jobName,
randomSuffix,
deleteOnShutdown,
groupingKey);

return new PrometheusPushGatewayReporter(
host, port, jobName, groupingKey, deleteOnShutdown);
try {
return new PrometheusPushGatewayReporter(
new URL(hostUrl), jobName, groupingKey, deleteOnShutdown);
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}

@VisibleForTesting
Expand Down
Expand Up @@ -29,17 +29,26 @@
@Documentation.SuffixOption
public class PrometheusPushGatewayReporterOptions {

@Deprecated
public static final ConfigOption<String> HOST =
ConfigOptions.key("host")
.stringType()
.noDefaultValue()
.withDescription("The PushGateway server host.");
.withDescription("(deprecated) The PushGateway server host.");

@Deprecated
public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(-1)
.withDescription("The PushGateway server port.");
.withDescription("(deprecated) The PushGateway server port.");

public static final ConfigOption<String> HOST_URL =
ConfigOptions.key("hostUrl")
.stringType()
.noDefaultValue()
.withDescription(
"The PushGateway server host URL including scheme, host name, and port.");

public static final ConfigOption<String> JOB_NAME =
ConfigOptions.key("jobName")
Expand Down
Expand Up @@ -18,13 +18,18 @@

package org.apache.flink.metrics.prometheus;

import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.HOST_URL;
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.PORT;

/** Test for {@link PrometheusPushGatewayReporter}. */
public class PrometheusPushGatewayReporterTest extends TestLogger {

Expand All @@ -49,4 +54,53 @@ public void testParseIncompleteGroupingKey() {
groupingKey = PrometheusPushGatewayReporterFactory.parseGroupingKey("k1");
Assert.assertTrue(groupingKey.isEmpty());
}

@Test
public void testConnectToPushGatewayUsingHostAndPort() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST.key(), "localhost");
metricConfig.setProperty(PORT.key(), "18080");
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
String gatewayBaseURL = factory.createMetricReporter(metricConfig).hostUrl.toString();
Assert.assertEquals(gatewayBaseURL, "http://localhost:18080");
}

@Test
public void testConnectToPushGatewayUsingHostUrl() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "https://localhost:18080");
PrometheusPushGatewayReporter reporter = factory.createMetricReporter(metricConfig);
String gatewayBaseURL = factory.createMetricReporter(metricConfig).hostUrl.toString();
Assert.assertEquals(gatewayBaseURL, "https://localhost:18080");
}

@Test
public void testConnectToPushGatewayPreferHostUrl() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(HOST_URL.key(), "https://localhost:18080");
metricConfig.setProperty(HOST.key(), "localhost1");
metricConfig.setProperty(PORT.key(), "18081");
String gatewayBaseURL = factory.createMetricReporter(metricConfig).hostUrl.toString();
Assert.assertEquals(gatewayBaseURL, "https://localhost:18080");
}

@Test
public void testConnectToPushGatewayThrowsExceptionWithoutHostInformation() {
PrometheusPushGatewayReporterFactory factory = new PrometheusPushGatewayReporterFactory();
MetricConfig metricConfig = new MetricConfig();
Assert.assertThrows(
IllegalArgumentException.class, () -> factory.createMetricReporter(metricConfig));

metricConfig.setProperty(HOST.key(), "localhost");
Assert.assertThrows(
IllegalArgumentException.class, () -> factory.createMetricReporter(metricConfig));
zentol marked this conversation as resolved.
Show resolved Hide resolved

metricConfig.clear();
metricConfig.setProperty(PORT.key(), "18080");
Assert.assertThrows(
IllegalArgumentException.class, () -> factory.createMetricReporter(metricConfig));
}
}