diff --git a/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/ActuatorContributor.java b/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/ActuatorContributor.java index 7a27765750..e555465f56 100644 --- a/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/ActuatorContributor.java +++ b/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/ActuatorContributor.java @@ -15,6 +15,7 @@ */ package org.factcast.example.server; +import java.util.concurrent.TimeUnit; import org.factcast.example.server.telemetry.MyTelemetryListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.info.Info; @@ -27,6 +28,10 @@ public class ActuatorContributor implements InfoContributor { @Autowired MyTelemetryListener listener; public void contribute(Info.Builder builder) { - builder.withDetail("followingSubscriptionsInfo", listener.getFollowingSubscriptionsInfo()); + builder.withDetail( + "followingSubscriptionsInfo", + listener.getFollowingSubscriptionsInfo().stream() + .map(i -> i.getMessage() + " following for " + i.getTime(TimeUnit.SECONDS) + "s") + .toList()); } } diff --git a/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/telemetry/MyTelemetryListener.java b/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/telemetry/MyTelemetryListener.java index 850c86f253..3f52c1e9eb 100644 --- a/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/telemetry/MyTelemetryListener.java +++ b/factcast-examples/factcast-example-server-telemetry/src/main/java/org/factcast/example/server/telemetry/MyTelemetryListener.java @@ -20,13 +20,14 @@ import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; import org.factcast.store.internal.telemetry.PgStoreTelemetry; @RequiredArgsConstructor @Slf4j public class MyTelemetryListener { - final List followingSubscriptionsInfo = Lists.newArrayList(); + final List followingSubscriptionsInfo = Lists.newArrayList(); public MyTelemetryListener(PgStoreTelemetry telemetry) { telemetry.register(this); @@ -34,32 +35,34 @@ public MyTelemetryListener(PgStoreTelemetry telemetry) { @Subscribe public void on(PgStoreTelemetry.Connect signal) { - log.info("### FactStreamTelemetry Connect: {}", signal.request()); + log.info("FactStreamTelemetry Connect: {}", signal.request()); } @Subscribe public void on(PgStoreTelemetry.Catchup signal) { - log.info("### FactStreamTelemetry Catchup: {}", signal.request()); + log.info("FactStreamTelemetry Catchup: {}", signal.request()); } @Subscribe public void on(PgStoreTelemetry.Follow signal) { - log.info("### FactStreamTelemetry Follow: {}", signal.request()); - followingSubscriptionsInfo.add(signal.request().debugInfo()); + log.info("FactStreamTelemetry Follow: {}", signal.request()); + StopWatch stopWatch = new StopWatch(signal.request().debugInfo()); + stopWatch.start(); + followingSubscriptionsInfo.add(stopWatch); } @Subscribe public void on(PgStoreTelemetry.Complete signal) { - log.info("### FactStreamTelemetry Complete: {}", signal.request()); + log.info("FactStreamTelemetry Complete: {}", signal.request()); } @Subscribe public void on(PgStoreTelemetry.Close signal) { - log.info("### FactStreamTelemetry Close: {}", signal.request()); - followingSubscriptionsInfo.remove(signal.request().debugInfo()); + log.info("FactStreamTelemetry Close: {}", signal.request()); + followingSubscriptionsInfo.removeIf(i -> i.getMessage().equals(signal.request().debugInfo())); } - public List getFollowingSubscriptionsInfo() { + public List getFollowingSubscriptionsInfo() { return followingSubscriptionsInfo; } } diff --git a/factcast-site/documentation-docsy/content/en/Setup/telemetry.md b/factcast-site/documentation-docsy/content/en/Setup/telemetry.md new file mode 100755 index 0000000000..d0d239fa05 --- /dev/null +++ b/factcast-site/documentation-docsy/content/en/Setup/telemetry.md @@ -0,0 +1,61 @@ +--- +title: "Telemetry" +type: docs +weight: 155 +description: Listen to internal telemetry events +--- + +Starting from factcast version 0.7.9, you can extend your server implementation to listen internal telemetry events. +This can be useful for monitoring and debugging purposes. + +The telemetry events are emitted using a dedicated internal [Guava EventBus](https://github.com/google/guava/wiki/EventBusExplained). + +## Subscription lifecycle events + +Currently, the factcast-store module emits an event on each phase of the subscription lifecycle (see +`org.factcast.store.internal.telemetry.PgStoreTelemetry`): + +- `PgStoreTelemetry.Connect` emitted whenever a client connects to the factcast server +- `PgStoreTelemetry.Catchup` emitted whenever the subscription catches up to the current state of the store +- `PgStoreTelemetry.Follow` emitted whenever the subscription started consuming live events +- `PgStoreTelemetry.Close` emitted whenever the client disconnects from the factcast server +- `PgStoreTelemetry.Complete` emitted whenever the subscription completed its lifecycle + +Each emitted event contains a `request`, which holds the client's request details. + +## How to listen to telemetry events + +It boils down to implementing a listener that is able to consume telemetry events, through +`com.google.common.eventbus.Subscribe` annotated methods, and registering it via the `PgStoreTelemetry` bean. + +Here is an example: + +```java +import com.google.common.eventbus.Subscribe; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.factcast.store.internal.telemetry.PgStoreTelemetry; + +@RequiredArgsConstructor +@Slf4j +public class MyTelemetryListener { + + public MyTelemetryListener(PgStoreTelemetry telemetry) { + telemetry.register(this); + } + + @Subscribe + public void on(PgStoreTelemetry.Connect signal) { + log.info("FactStreamTelemetry Connect: {}", signal.request()); + } + + @Subscribe + public void on(PgStoreTelemetry.Close signal) { + log.info("FactStreamTelemetry Close: {}", signal.request()); + } +} +``` + +You can check out the full example in the [factcast-example-server-telemetry](https://github.com/factcast/factcast/blob/master/factcast-examples/factcast-example-server-telemetry) +module. That module contains a simple example of how to listen to each subscription lifecycle event, to log the request +details and maintaining a list of _following_ subscriptions, which can be read through the actuator `/info` endpoint.