Skip to content

Commit

Permalink
#2882: add monitoring for event bus thread pool executors
Browse files Browse the repository at this point in the history
  • Loading branch information
Mattia Brescia committed Jun 13, 2024
1 parent 07b958d commit 50f79ee
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
2 changes: 2 additions & 0 deletions factcast-site/documentation-docsy/content/en/Setup/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ You can distinguish them by the `name` tag. Currently, these are:
- `paged-catchup` - used for buffered transformation while using the paged catchup strategy
- `transformation-cache` - used for inserting/updating entries in the transformation cache (only if you use persisted
cache)
- `pg-listener` - used by the Guava EventBus that receives signals from the PostgreSQL
- `telemetry` - used by the Guava EventBus that receives signals from the FactCast Server (see [telemetry](./telemetry.md))

See https://micrometer.io/docs/ref/jvm for more information.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ public class PgFactStoreInternalConfiguration {

@Bean
@ConditionalOnMissingBean(EventBus.class)
public EventBus eventBus() {
return new AsyncEventBus(getClass().getSimpleName(), Executors.newCachedThreadPool());
public EventBus eventBus(@NonNull PgMetrics metrics) {
return new AsyncEventBus(
getClass().getSimpleName(),
metrics.monitor(Executors.newCachedThreadPool(), "pg-listener"));
}

@Bean
Expand All @@ -122,8 +124,8 @@ public PgMetrics pgMetrics(@NonNull MeterRegistry registry) {
}

@Bean
public PgStoreTelemetry telemetry() {
return new PgStoreTelemetry();
public PgStoreTelemetry telemetry(@NonNull PgMetrics metrics) {
return new PgStoreTelemetry(metrics);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,33 @@
*/
package org.factcast.store.internal.telemetry;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.store.internal.PgMetrics;

@Slf4j
public final class PgStoreTelemetry {

private final EventBus eventBus;

public PgStoreTelemetry() {
this(
new AsyncEventBus(
PgStoreTelemetry.class.getSimpleName(),
// needs to be a singleThread executor to ensure that the order of events is
// respected
Executors.newSingleThreadExecutor()));
public PgStoreTelemetry(@NonNull PgMetrics metrics) {
// needs to be a singleThread executor to ensure that the order of events is respected
this(metrics.monitor(Executors.newSingleThreadExecutor(), "telemetry"));
}

public PgStoreTelemetry(@NonNull EventBus eventBus) {
@VisibleForTesting
PgStoreTelemetry(@NonNull ExecutorService executorService) {
this(new AsyncEventBus(PgStoreTelemetry.class.getSimpleName(), executorService));
}

@VisibleForTesting
PgStoreTelemetry(@NonNull EventBus eventBus) {
this.eventBus = eventBus;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,30 @@
import static org.mockito.Mockito.*;

import com.google.common.eventbus.EventBus;
import java.util.concurrent.ExecutorService;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.store.internal.PgMetrics;
import org.junit.jupiter.api.Test;

class PgStoreTelemetryTest {

@Test
void setsEventBusOnConstruction() {
void setsEventBusOnConstructionWithMetrics() {
var metrics = mock(PgMetrics.class);
when(metrics.monitor(any(ExecutorService.class), eq("telemetry")))
.thenReturn(mock(ExecutorService.class));

new PgStoreTelemetry(metrics);

verify(metrics).monitor(any(ExecutorService.class), eq("telemetry"));
}

@Test
void delegatesRegistrationToEventBus() {
var eventBus = mock(EventBus.class);
var listener = new Object();

var uut = new PgStoreTelemetry(eventBus);

uut.register(listener);

verify(eventBus).register(listener);
Expand Down

0 comments on commit 50f79ee

Please sign in to comment.