Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,10 @@ public String getPushGatewayJobName() {
return getString(HoodieMetricsPrometheusConfig.PUSHGATEWAY_JOBNAME);
}

public String getPushGatewayLabels() {
return getString(HoodieMetricsPrometheusConfig.PUSHGATEWAY_LABELS);
}

public boolean getPushGatewayRandomJobNameSuffix() {
return getBoolean(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOBNAME_SUFFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public class HoodieMetricsPrometheusConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Name of the push gateway job.");

public static final ConfigProperty<String> PUSHGATEWAY_LABELS = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".report.labels")
.defaultValue("")
.sinceVersion("0.14.0")
.withDocumentation("Label for the metrics emitted to the Pushgateway. Labels can be specified with key:value pairs separated by commas");

public static final ConfigProperty<Boolean> PUSHGATEWAY_RANDOM_JOBNAME_SUFFIX = ConfigProperty
.key(PUSHGATEWAY_PREFIX + ".random.job.name.suffix")
.defaultValue(true)
Expand Down Expand Up @@ -205,6 +211,11 @@ public HoodieMetricsPrometheusConfig.Builder withPushgatewayRandomJobnameSuffix(
return this;
}

public Builder withPushgatewayLabels(String pushGatewayLabels) {
hoodieMetricsPrometheusConfig.setValue(PUSHGATEWAY_LABELS, pushGatewayLabels);
return this;
}

public HoodieMetricsPrometheusConfig.Builder withPrometheusPortNum(int prometheusPortNum) {
hoodieMetricsPrometheusConfig.setValue(PROMETHEUS_PORT_NUM, String.valueOf(prometheusPortNum));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
Expand Down Expand Up @@ -329,9 +330,18 @@ private HoodieWriteConfig createMetadataWriteConfig(
.toJmxHost(writeConfig.getJmxHost())
.build());
break;
case PROMETHEUS_PUSHGATEWAY:
HoodieMetricsPrometheusConfig prometheusConfig = HoodieMetricsPrometheusConfig.newBuilder()
.withPushgatewayJobname(writeConfig.getPushGatewayJobName())
.withPushgatewayRandomJobnameSuffix(writeConfig.getPushGatewayRandomJobNameSuffix())
.withPushgatewayLabels(writeConfig.getPushGatewayLabels())
.withPushgatewayReportPeriodInSeconds(String.valueOf(writeConfig.getPushGatewayReportPeriodSeconds()))
.withPushgatewayHostName(writeConfig.getPushGatewayHost())
.withPushgatewayPortNum(writeConfig.getPushGatewayPort()).build();
builder.withProperties(prometheusConfig.getProps());
break;
case DATADOG:
case PROMETHEUS:
case PROMETHEUS_PUSHGATEWAY:
case CONSOLE:
case INMEMORY:
case CLOUDWATCH:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,27 @@
package org.apache.hudi.metrics.prometheus;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.MetricsReporter;

import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class PushGatewayMetricsReporter extends MetricsReporter {

private final PushGatewayReporter pushGatewayReporter;
private final int periodSeconds;
private final boolean deleteShutdown;
private final String configuredJobName;
private final Map<String, String> configuredLabels;
private final boolean randomSuffix;

public PushGatewayMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
Expand All @@ -42,6 +49,7 @@ public PushGatewayMetricsReporter(HoodieWriteConfig config, MetricRegistry regis
periodSeconds = config.getPushGatewayReportPeriodSeconds();
deleteShutdown = config.getPushGatewayDeleteOnShutdown();
configuredJobName = config.getPushGatewayJobName();
configuredLabels = Collections.unmodifiableMap(parseLabels(config.getPushGatewayLabels()));
randomSuffix = config.getPushGatewayRandomJobNameSuffix();

pushGatewayReporter = new PushGatewayReporter(
Expand All @@ -50,6 +58,7 @@ public PushGatewayMetricsReporter(HoodieWriteConfig config, MetricRegistry regis
TimeUnit.SECONDS,
TimeUnit.SECONDS,
getJobName(),
configuredLabels,
serverHost,
serverPort,
deleteShutdown);
Expand All @@ -70,11 +79,33 @@ public void stop() {
pushGatewayReporter.stop();
}

public Map<String, String> getLabels() {
return configuredLabels;
}

private String getJobName() {
if (randomSuffix) {
Random random = new Random();
return configuredJobName + random.nextLong();
}
return configuredJobName;
}

private static Map<String, String> parseLabels(String labels) {
Stream<String[]> intermediateStream = Pattern.compile("\\s*,\\s*")
.splitAsStream(labels.trim())
.map(s -> s.split(":", 2));

Map<String, String> labelsMap = new HashMap<>();
intermediateStream.forEach(a -> {
String key = a[0];
String value = a.length > 1 ? a[1] : "";
String oldValue = labelsMap.put(key, value);
if (oldValue != null) {
throw new HoodieException(String.format("Duplicate key=%s found in labels: %s %s",
key, key + ":" + oldValue, key + ":" + value));
}
});
return labelsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,21 @@ public class PushGatewayReporter extends ScheduledReporter {
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final String jobName;
private final Map<String, String> labels;
private final boolean deleteShutdown;

protected PushGatewayReporter(MetricRegistry registry,
MetricFilter filter,
TimeUnit rateUnit,
TimeUnit durationUnit,
String jobName,
Map<String, String> labels,
String serverHost,
int serverPort,
boolean deleteShutdown) {
super(registry, "hudi-push-gateway-reporter", filter, rateUnit, durationUnit);
this.jobName = jobName;
this.labels = labels;
this.deleteShutdown = deleteShutdown;
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
Expand Down Expand Up @@ -97,7 +100,7 @@ public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
try {
pushGatewayClient.pushAdd(collectorRegistry, jobName);
pushGatewayClient.pushAdd(collectorRegistry, jobName, labels);
} catch (IOException e) {
LOG.warn("Can't push monitoring information to pushGateway", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.metrics.prometheus;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.metrics.MetricsReporterType;
Expand All @@ -29,9 +30,13 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Map;

import static org.apache.hudi.metrics.Metrics.registerGauge;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand All @@ -56,6 +61,7 @@ public void testRegisterGauge() {
when(config.getPushGatewayDeleteOnShutdown()).thenReturn(true);
when(config.getPushGatewayJobName()).thenReturn("foo");
when(config.getPushGatewayRandomJobNameSuffix()).thenReturn(false);
when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");

assertDoesNotThrow(() -> {
new HoodieMetrics(config);
Expand All @@ -65,4 +71,47 @@ public void testRegisterGauge() {
assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
.get("pushGateWayReporter_metric").getValue().toString());
}

@Test
public void testMetricLabels() {
PushGatewayMetricsReporter reporter;
Map<String, String> labels;

when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus");
reporter = new PushGatewayMetricsReporter(config, null);
labels = reporter.getLabels();
assertEquals(1, labels.size());
assertTrue(labels.containsKey("hudi"));
assertTrue(labels.containsValue("prometheus"));

when(config.getPushGatewayLabels()).thenReturn("hudi:prome:theus");
reporter = new PushGatewayMetricsReporter(config, null);
labels = reporter.getLabels();
assertEquals(1, labels.size());
assertTrue(labels.containsKey("hudi"));
assertTrue(labels.containsValue("prome:theus"));

when(config.getPushGatewayLabels()).thenReturn("hudiprometheus");
reporter = new PushGatewayMetricsReporter(config, null);
labels = reporter.getLabels();
assertEquals(1, labels.size());
assertTrue(labels.containsKey("hudiprometheus"));
assertTrue(labels.containsValue(""));

when(config.getPushGatewayLabels()).thenReturn("hudi1:prometheus,hudi2:prometheus");
reporter = new PushGatewayMetricsReporter(config, null);
labels = reporter.getLabels();
assertEquals(2, labels.size());
assertTrue(labels.containsKey("hudi1"));
assertTrue(labels.containsKey("hudi2"));
assertTrue(labels.containsValue("prometheus"));

try {
when(config.getPushGatewayLabels()).thenReturn("hudi:prometheus,hudi:prometheus");
reporter = new PushGatewayMetricsReporter(config, null);
fail("Should fail");
} catch (HoodieException e) {
assertTrue(e.getMessage().contains("Duplicate key=hudi found in labels"));
}
}
}