Skip to content

Commit

Permalink
[enh][monitor]: add metrics for pulsar web service thread pool (#14742)
Browse files Browse the repository at this point in the history
Fixes #14459

### Motivation

See the issue

### Modifications
1. Add WebExecutorStats to record web thread pool metrics
  • Loading branch information
tjiuming committed Apr 19, 2022
1 parent 4398733 commit 32d7a51
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public static void generateSystemMetrics(SimpleTextOutputStream stream, String c
for (int i = 0; i < metricFamily.samples.size(); i++) {
Collector.MetricFamilySamples.Sample sample = metricFamily.samples.get(i);
stream.write(sample.name);
stream.write("{cluster=\"").write(cluster).write('"');
if (!sample.labelNames.contains("cluster")) {
stream.write("{cluster=\"").write(cluster).write('"');
}
for (int j = 0; j < sample.labelNames.size(); j++) {
String labelValue = sample.labelValues.get(j);
if (labelValue != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import java.util.concurrent.atomic.AtomicBoolean;

class WebExecutorStats implements AutoCloseable {
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

private final Gauge maxThreads;
private final Gauge minThreads;
private final Gauge idleThreads;
private final Gauge activeThreads;
private final Gauge currentThreads;
private final WebExecutorThreadPool executor;

private static volatile WebExecutorStats instance;

static synchronized WebExecutorStats getStats(WebExecutorThreadPool executor) {
if (null == instance) {
instance = new WebExecutorStats(executor);
}

return instance;
}

private WebExecutorStats(WebExecutorThreadPool executor) {
this.executor = executor;

this.maxThreads = Gauge.build("pulsar_web_executor_max_threads", "-").create()
.setChild(new Gauge.Child() {
public double get() {
return WebExecutorStats.this.executor.getMaxThreads();
}
})
.register();

this.minThreads = Gauge.build("pulsar_web_executor_min_threads", "-").create()
.setChild(new Gauge.Child() {
public double get() {
return WebExecutorStats.this.executor.getMinThreads();
}
})
.register();

this.idleThreads = Gauge.build("pulsar_web_executor_idle_threads", "-").create()
.setChild(new Gauge.Child() {
public double get() {
return WebExecutorStats.this.executor.getIdleThreads();
}
})
.register();

this.activeThreads = Gauge.build("pulsar_web_executor_active_threads", "-").create()
.setChild(new Gauge.Child() {
public double get() {
return WebExecutorStats.this.executor.getThreads()
- WebExecutorStats.this.executor.getIdleThreads();
}
})
.register();

this.currentThreads = Gauge.build("pulsar_web_executor_current_threads", "-").create()
.setChild(new Gauge.Child() {
public double get() {
return WebExecutorStats.this.executor.getThreads();
}
})
.register();
}

@Override
public void close() throws Exception {
if (CLOSED.compareAndSet(false, true)) {
CollectorRegistry.defaultRegistry.unregister(this.activeThreads);
CollectorRegistry.defaultRegistry.unregister(this.maxThreads);
CollectorRegistry.defaultRegistry.unregister(this.minThreads);
CollectorRegistry.defaultRegistry.unregister(this.idleThreads);
CollectorRegistry.defaultRegistry.unregister(this.currentThreads);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class WebService implements AutoCloseable {
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final WebExecutorStats executorStats;
private final WebExecutorThreadPool webServiceExecutor;

private final ServerConnector httpConnector;
Expand All @@ -82,6 +83,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
config.getNumHttpServerThreads(),
"pulsar-web",
config.getHttpServerThreadPoolQueueSize());
this.executorStats = WebExecutorStats.getStats(webServiceExecutor);
this.server = new Server(webServiceExecutor);
if (config.getMaxHttpServerConnections() > 0) {
server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server));
Expand Down Expand Up @@ -277,6 +279,7 @@ public void close() throws PulsarServerException {
jettyStatisticsCollector = null;
}
webServiceExecutor.join();
this.executorStats.close();
log.info("Web service closed");
} catch (Exception e) {
throw new PulsarServerException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.io.CharStreams;
import com.google.common.io.Closeables;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -35,6 +37,7 @@
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -52,6 +55,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
Expand Down Expand Up @@ -88,6 +93,43 @@ public class WebServiceTest {
private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt";
private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key";


@Test
public void testWebExecutorMetrics() throws Exception {
setupEnv(true, "1.0", true, false, false, false, -1, false);
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, PrometheusMetricsTest.Metric> metrics = PrometheusMetricsTest.parseMetrics(metricsStr);

Collection<PrometheusMetricsTest.Metric> maxThreads = metrics.get("pulsar_web_executor_max_threads");
Collection<PrometheusMetricsTest.Metric> minThreads = metrics.get("pulsar_web_executor_min_threads");
Collection<PrometheusMetricsTest.Metric> activeThreads = metrics.get("pulsar_web_executor_active_threads");
Collection<PrometheusMetricsTest.Metric> idleThreads = metrics.get("pulsar_web_executor_idle_threads");
Collection<PrometheusMetricsTest.Metric> currentThreads = metrics.get("pulsar_web_executor_current_threads");

for (PrometheusMetricsTest.Metric metric : maxThreads) {
Assert.assertNotNull(metric.tags.get("cluster"));
Assert.assertTrue(metric.value > 0);
}
for (PrometheusMetricsTest.Metric metric : minThreads) {
Assert.assertNotNull(metric.tags.get("cluster"));
Assert.assertTrue(metric.value > 0);
}
for (PrometheusMetricsTest.Metric metric : activeThreads) {
Assert.assertNotNull(metric.tags.get("cluster"));
Assert.assertTrue(metric.value >= 0);
}
for (PrometheusMetricsTest.Metric metric : idleThreads) {
Assert.assertNotNull(metric.tags.get("cluster"));
Assert.assertTrue(metric.value >= 0);
}
for (PrometheusMetricsTest.Metric metric : currentThreads) {
Assert.assertNotNull(metric.tags.get("cluster"));
Assert.assertTrue(metric.value > 0);
}
}

/**
* Test that the {@WebService} class properly passes the allowUnversionedClients value. We do this by setting
* allowUnversionedClients to true, then making a request with no version, which should go through.
Expand Down

0 comments on commit 32d7a51

Please sign in to comment.