Skip to content

Commit

Permalink
#1748 Use metrics provided by Micrometer for monitoring executors (#1824
Browse files Browse the repository at this point in the history
)

* #1748 WIP

* #1748 Add monitoring for Zookeeper cache executor service

* #1748 Remove unnecessary method

* #1748 Remove unused methods

* #1748 Remove unused constants

* #1748 Test for monitoring of the executors with Micrometer

---------

Co-authored-by: Michal Ciszewski <michal.ciszewski@allegro.pl>
  • Loading branch information
2 people authored and faderskd committed Apr 19, 2024
1 parent e2778ed commit 1b2c543
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -1,23 +1,35 @@
package pl.allegro.tech.hermes.common.di.factories;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import pl.allegro.tech.hermes.common.cache.queue.LinkedHashSetBlockingQueue;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.infrastructure.zookeeper.cache.ModelAwareZookeeperNotifyingCache;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ModelAwareZookeeperNotifyingCacheFactory {

private final CuratorFramework curator;

private final MetricsFacade metricsFacade;

private final ZookeeperParameters zookeeperParameters;

public ModelAwareZookeeperNotifyingCacheFactory(CuratorFramework curator, ZookeeperParameters zookeeperParameters) {
public ModelAwareZookeeperNotifyingCacheFactory(CuratorFramework curator, MetricsFacade metricaFacade, ZookeeperParameters zookeeperParameters) {
this.curator = curator;
this.metricsFacade = metricaFacade;
this.zookeeperParameters = zookeeperParameters;
}

public ModelAwareZookeeperNotifyingCache provide() {
String rootPath = zookeeperParameters.getRoot();
ExecutorService executor = createExecutor(rootPath, zookeeperParameters.getProcessingThreadPoolSize());
ModelAwareZookeeperNotifyingCache cache = new ModelAwareZookeeperNotifyingCache(
curator, rootPath, zookeeperParameters.getProcessingThreadPoolSize()
curator, executor, rootPath
);
try {
cache.start();
Expand All @@ -26,4 +38,11 @@ public ModelAwareZookeeperNotifyingCache provide() {
}
return cache;
}

private ExecutorService createExecutor(String rootPath, int processingThreadPoolSize) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(rootPath + "-zk-cache-%d").build();
ExecutorService executor = new ThreadPoolExecutor(1, processingThreadPoolSize,
Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedHashSetBlockingQueue<>(), threadFactory);
return metricsFacade.executor().monitor(executor, rootPath + "zk-cache");
}
}
Original file line number Diff line number Diff line change
@@ -1,56 +1,23 @@
package pl.allegro.tech.hermes.common.metric;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;

import java.util.function.ToDoubleFunction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class ExecutorMetrics {
private final HermesMetrics hermesMetrics;
private final MeterRegistry meterRegistry;

public ExecutorMetrics(HermesMetrics hermesMetrics, MeterRegistry meterRegistry) {
this.hermesMetrics = hermesMetrics;
public ExecutorMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

public <T> void registerThreadPoolCapacity(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolCapacity(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.capacity", executorName, stateObj, f);
public ExecutorService monitor(ExecutorService executorService, String executorName) {
return ExecutorServiceMetrics.monitor(meterRegistry, executorService, executorName);
}

public <T> void registerThreadPoolActiveThreads(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolActiveThreads(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.active-threads", executorName, stateObj, f);
}

public <T> void registerThreadPoolUtilization(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolUtilization(executorName, () -> f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.utilization", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueueCapacity(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueueCapacity(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-capacity", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueued(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueued(executorName, () -> (int) f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-size", executorName, stateObj, f);
}

public <T> void registerThreadPoolTaskQueueUtilization(String executorName, T stateObj, ToDoubleFunction<T> f) {
hermesMetrics.registerThreadPoolTaskQueueUtilization(executorName, () -> f.applyAsDouble(stateObj));
registerMicrometerGauge("executors.task-queue-utilization", executorName, stateObj, f);
}

public void incrementRequestRejectedCounter(String executorName) {
hermesMetrics.incrementThreadPoolTaskRejectedCount(executorName);
meterRegistry.counter("executors.task-rejected", Tags.of("executor_name", executorName)).increment();
}


private <T> void registerMicrometerGauge(String name, String executorName, T stateObj, ToDoubleFunction<T> f) {
meterRegistry.gauge(name, Tags.of("executor_name", executorName), stateObj, f);
public ScheduledExecutorService monitor(ScheduledExecutorService scheduledExecutorService, String executorName) {
return ExecutorServiceMetrics.monitor(meterRegistry, scheduledExecutorService, executorName);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package pl.allegro.tech.hermes.common.metric;

import static pl.allegro.tech.hermes.metrics.PathsCompiler.EXECUTOR_NAME;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.GROUP;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.SUBSCRIPTION;
import static pl.allegro.tech.hermes.metrics.PathsCompiler.TOPIC;
Expand Down Expand Up @@ -34,13 +33,5 @@ public class Gauges {
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_CONNECTIONS = "http-clients.serial.http2.connections";
public static final String CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_PENDING_CONNECTIONS = "http-clients.serial.http2.pending-connections";

public static final String EXECUTORS = "executors.";
public static final String EXECUTOR_ACTIVE_THREADS = EXECUTORS + EXECUTOR_NAME + ".active-threads";
public static final String EXECUTOR_CAPACITY = EXECUTORS + EXECUTOR_NAME + ".capacity";
public static final String UTILIZATION = EXECUTORS + EXECUTOR_NAME + ".utilization";
public static final String TASK_QUEUE_CAPACITY = EXECUTORS + EXECUTOR_NAME + ".task-queue-capacity";
public static final String TASK_QUEUED = EXECUTORS + EXECUTOR_NAME + ".task-queue-size";
public static final String TASKS_QUEUE_UTILIZATION = EXECUTORS + EXECUTOR_NAME + ".task-queue-utilization";
public static final String TASKS_REJECTED_COUNT = EXECUTORS + EXECUTOR_NAME + "task-rejected";
public static final String INFLIGHT = "inflight." + GROUP + "." + TOPIC + "." + SUBSCRIPTION + ".count";
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,34 +112,6 @@ public void registerConsumerSenderHttp2RequestQueueSize(Gauge<Integer> gauge) {
metricRegistry.register(metricRegistryName(Gauges.CONSUMER_SENDER_HTTP_2_SERIAL_CLIENT_REQUEST_QUEUE_SIZE), gauge);
}

public void registerThreadPoolActiveThreads(String executorName, Gauge<Integer> gauge) {
registerExecutorGauge(Gauges.EXECUTOR_ACTIVE_THREADS, executorName, gauge);
}

public void registerThreadPoolCapacity(String executorName, Gauge<Integer> gauge) {
registerExecutorGauge(Gauges.EXECUTOR_CAPACITY, executorName, gauge);
}

public void registerThreadPoolUtilization(String executorName, Gauge<Double> gauge) {
registerExecutorGauge(Gauges.UTILIZATION, executorName, gauge);
}

public void registerThreadPoolTaskQueueCapacity(String executorName, Gauge<Integer> gauge) {
registerExecutorGauge(Gauges.TASK_QUEUE_CAPACITY, executorName, gauge);
}

public void registerThreadPoolTaskQueued(String executorName, Gauge<Integer> gauge) {
registerExecutorGauge(Gauges.TASK_QUEUED, executorName, gauge);
}

public void registerThreadPoolTaskQueueUtilization(String executorName, Gauge<Double> gauge) {
registerExecutorGauge(Gauges.TASKS_QUEUE_UTILIZATION, executorName, gauge);
}

public void incrementThreadPoolTaskRejectedCount(String executorName) {
executorCounter(Gauges.TASKS_REJECTED_COUNT, executorName).inc();
}

public void registerInflightGauge(SubscriptionName subscription, Gauge<?> gauge) {
registerGauge(metricRegistryName(Gauges.INFLIGHT, subscription.getTopicName(), subscription.getName()), gauge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public MetricsFacade(MeterRegistry meterRegistry, HermesMetrics hermesMetrics) {
this.trackerElasticSearchMetrics = new TrackerElasticSearchMetrics(hermesMetrics, meterRegistry);
this.persistentBufferMetrics = new PersistentBufferMetrics(hermesMetrics, meterRegistry);
this.producerMetrics = new ProducerMetrics(hermesMetrics, meterRegistry);
this.executorMetrics = new ExecutorMetrics(hermesMetrics, meterRegistry);
this.executorMetrics = new ExecutorMetrics(meterRegistry);
this.schemaClientMetrics = new SchemaClientMetrics(hermesMetrics, meterRegistry);
this.undeliveredMessagesMetrics = new UndeliveredMessagesMetrics(hermesMetrics, meterRegistry);
this.deserializationMetrics = new DeserializationMetrics(hermesMetrics, meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.metric.executor;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -11,14 +12,13 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class InstrumentedExecutorServiceFactory {

private final ThreadPoolMetrics threadPoolMetrics;
private final MetricsFacade metricsFacade;
private final RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

public InstrumentedExecutorServiceFactory(ThreadPoolMetrics threadPoolMetrics) {
this.threadPoolMetrics = threadPoolMetrics;
public InstrumentedExecutorServiceFactory(MetricsFacade metricsFacade) {
this.metricsFacade = metricsFacade;
}

public ExecutorService getExecutorService(String name, int size, boolean monitoringEnabled) {
Expand All @@ -30,30 +30,24 @@ public ExecutorService getExecutorService(String name, int size, boolean monitor
ThreadPoolExecutor executor = newFixedThreadPool(name, size, threadFactory, queueCapacity);
executor.prestartAllCoreThreads();

if (monitoringEnabled) {
monitor(name, executor);
}

return executor;
return monitoringEnabled ? monitor(name, executor) : executor;
}

public ScheduledExecutorService getScheduledExecutorService(
String name, int size, boolean monitoringEnabled
) {

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-scheduled-executor-%d").build();

ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(size, threadFactory);
return monitoringEnabled ? monitor(name, executor) : executor;
}

if (monitoringEnabled) {
monitor(name, executor);
}

return executor;
private ExecutorService monitor(String threadPoolName, ExecutorService executor) {
return metricsFacade.executor().monitor(executor, threadPoolName);
}

private void monitor(String threadPoolName, ThreadPoolExecutor executor) {
threadPoolMetrics.createGauges(threadPoolName, executor);
private ScheduledExecutorService monitor(String threadPoolName, ScheduledExecutorService executor) {
return metricsFacade.executor().monitor(executor, threadPoolName);
}

/**
Expand All @@ -68,16 +62,8 @@ private ThreadPoolExecutor newFixedThreadPool(String executorName, int size, Thr
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueCapacity),
threadFactory,
getMeteredRejectedExecutionHandler(executorName)
rejectedExecutionHandler
);
return executor;
}

RejectedExecutionHandler getMeteredRejectedExecutionHandler(String executorName) {
return (r, executor) -> {
threadPoolMetrics.markRequestRejected(executorName);
rejectedExecutionHandler.rejectedExecution(r, executor);
};
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper.cache;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.cache.queue.LinkedHashSetBlockingQueue;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

public class ModelAwareZookeeperNotifyingCache {
Expand All @@ -26,15 +22,13 @@ public class ModelAwareZookeeperNotifyingCache {
private static final int SUBSCRIPTION_LEVEL = 2;

private final HierarchicalCache cache;
private final ThreadPoolExecutor executor;
private final ExecutorService executor;

public ModelAwareZookeeperNotifyingCache(CuratorFramework curator, String rootPath, int processingThreadPoolSize) {
public ModelAwareZookeeperNotifyingCache(CuratorFramework curator, ExecutorService executor, String rootPath) {
List<String> levelPrefixes = Arrays.asList(
ZookeeperPaths.GROUPS_PATH, ZookeeperPaths.TOPICS_PATH, ZookeeperPaths.SUBSCRIPTIONS_PATH
);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(rootPath + "-zk-cache-%d").build();
executor = new ThreadPoolExecutor(1, processingThreadPoolSize,
Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedHashSetBlockingQueue<>(), threadFactory);
this.executor = executor;
this.cache = new HierarchicalCache(
curator,
executor,
Expand Down
Loading

0 comments on commit 1b2c543

Please sign in to comment.