Skip to content

Commit

Permalink
Synchronise modifications of metricsServlet and pendingMetricsProviders
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Mar 15, 2022
1 parent c21738d commit 3a11280
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public CompletableFuture<Void> closeAsync() {
}
}

metricsServlet = null;
resetMetricsServlet();

if (this.webSocketService != null) {
this.webSocketService.close();
Expand Down Expand Up @@ -570,6 +570,10 @@ public CompletableFuture<Void> closeAsync() {
}
}

private synchronized void resetMetricsServlet() {
metricsServlet = null;
}

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory(getClass().getSimpleName() + "-shutdown"));
Expand Down Expand Up @@ -697,16 +701,7 @@ public void start() throws PulsarServerException {
this.brokerAdditionalServlets = AdditionalServlets.load(config);

this.webService = new WebService(this);
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus(),
config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}

createMetricsServlet();
this.addWebServerHandlers(webService, metricsServlet, this.config);
this.webService.start();
heartbeatNamespaceV1 = NamespaceService.getHeartbeatNamespace(this.advertisedAddress, this.config);
Expand Down Expand Up @@ -824,6 +819,18 @@ public void start() throws PulsarServerException {
}
}

private synchronized void createMetricsServlet() {
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
config.isExposeConsumerLevelMetricsInPrometheus(),
config.isExposeProducerLevelMetricsInPrometheus(),
config.isSplitTopicAndPartitionLabelInPrometheus());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}
}

private void addWebServerHandlers(WebService webService,
PulsarPrometheusMetricsServlet metricsServlet,
ServiceConfiguration config)
Expand Down Expand Up @@ -1515,7 +1522,7 @@ public ResourceUsageTransportManager getResourceUsageTransportManager() {
return resourceUsageTransportManager;
}

public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
if (metricsServlet == null) {
if (pendingMetricsProviders == null) {
pendingMetricsProviders = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class ProxyService implements Closeable {
private final Map<String, TopicStats> topicStats;
@Getter
private AdditionalServlets proxyAdditionalServlets;
@Getter

private PrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;

Expand Down Expand Up @@ -256,11 +256,7 @@ public void start() throws Exception {
this.serviceUrlTls = null;
}

this.metricsServlet = new PrometheusMetricsServlet(-1L, proxyConfig.getClusterName());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}
createMetricsServlet();

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand All @@ -271,6 +267,14 @@ public void start() throws Exception {
startProxyExtensions(protocolHandlerChannelInitializers, bootstrap);
}

private synchronized void createMetricsServlet() {
this.metricsServlet = new PrometheusMetricsServlet(-1L, proxyConfig.getClusterName());
if (pendingMetricsProviders != null) {
pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider));
this.pendingMetricsProviders = null;
}
}

// This call is used for starting additional protocol handlers
public void startProxyExtensions(Map<String, Map<InetSocketAddress,
ChannelInitializer<SocketChannel>>> protocolHandlers, ServerBootstrap serverBootstrap) {
Expand Down Expand Up @@ -347,7 +351,7 @@ public void close() throws IOException {
proxyAdditionalServlets = null;
}

metricsServlet = null;
resetMetricsServlet();

if (localMetadataStore != null) {
try {
Expand All @@ -370,6 +374,10 @@ public void close() throws IOException {
}
}

private synchronized void resetMetricsServlet() {
metricsServlet = null;
}

public String getServiceUrl() {
return serviceUrl;
}
Expand Down Expand Up @@ -428,7 +436,11 @@ public Authentication getProxyClientAuthenticationPlugin() {
return this.proxyClientAuthentication;
}

public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
public synchronized PrometheusMetricsServlet getMetricsServlet() {
return metricsServlet;
}

public synchronized void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) {
if (metricsServlet == null) {
if (pendingMetricsProviders == null) {
pendingMetricsProviders = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand Down Expand Up @@ -239,8 +240,13 @@ public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
server.addServlet("/metrics", new ServletHolder(service.getMetricsServlet()),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
if (service != null) {
PrometheusMetricsServlet metricsServlet = service.getMetricsServlet();
if (metricsServlet != null) {
server.addServlet("/metrics", new ServletHolder(metricsServlet),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
}
}
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
Expand Down

0 comments on commit 3a11280

Please sign in to comment.