Skip to content

Commit

Permalink
SLING-12292 - Add tags to metrics (#138)
Browse files Browse the repository at this point in the history
* SLING-12292 - Add tags to metrics

* SLING-12292 - Add tags to publish metrics
  • Loading branch information
cschneider committed Apr 16, 2024
1 parent a06c5c8 commit 502b13c
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics sub
this.logSender = logSender;
this.config = config;

subscriberMetrics.currentRetries(config.getSubAgentName(), packageRetries::getSum);
subscriberMetrics.currentRetries(packageRetries::getSum);
this.resolverFactory = resolverFactory;
this.subscriberMetrics = subscriberMetrics;
// Error queues are enabled when the number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ public class BookKeeperFactory {
@Reference
private ResourceResolverFactory resolverFactory;

@Reference
private SubscriberMetrics subscriberMetrics;

@Reference
private EventAdmin eventAdmin;

Expand All @@ -63,7 +60,8 @@ public BookKeeper create(
DistributionPackageBuilder packageBuilder,
BookKeeperConfig config,
Consumer<PackageStatusMessage> statusSender,
Consumer<LogMessage> logSender
Consumer<LogMessage> logSender,
SubscriberMetrics subscriberMetrics
) {
ContentPackageExtractor extractor = new ContentPackageExtractor(
packaging,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.sling.distribution.journal.bookkeeper;

import static java.lang.String.format;
import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;

import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;

import org.apache.sling.commons.metrics.Counter;
Expand All @@ -28,82 +30,73 @@
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.apache.sling.distribution.journal.metrics.Tag;

@Component(service = SubscriberMetrics.class)
/**
* Metrics for DistributionSubscriber
* most metrics will have two parameters:
* TAG_SUB_NAME and TAG_EDITABLE
*/
public class SubscriberMetrics {
public static final String SUB_COMPONENT = "distribution.journal.subscriber";
// Name of the subscriber agent
private static final String TAG_SUB_NAME = "sub_name";

private final MetricsService metricsService;

private final Histogram importedPackageSize;

private final Counter itemsBufferSize;

private final Timer removedPackageDuration;

private final Timer removedFailedPackageDuration;

private final Timer importedPackageDuration;

private final Meter failedPackageImports;

private final Timer sendStoredStatusDuration;

private final Timer processQueueItemDuration;

private final Timer packageDistributedDuration;

private final Timer packageJournalDistributionDuration;

private final Timer importPreProcessDuration;

private final Counter importPreProcessSuccess;

private final Counter importPreProcessRequest;

private final Timer importPostProcessDuration;
// Status of a package :
private static final String TAG_STATUS = "status";

private final Counter importPostProcessSuccess;

private final Counter importPostProcessRequest;

private final Timer invalidationProcessDuration;

private final Counter invalidationProcessSuccess;
// Is the queue editable (true, false)
private static final String TAG_EDITABLE = "editable";

public static final String SUB_COMPONENT = "distribution.journal.subscriber.";

private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT + "package_status_count";

// Number of packages with at least one failure to apply
private static final String CURRENT_RETRIES = SUB_COMPONENT + "current_retries";

private final Counter invalidationProcessRequest;
// Cumulated size of all packages (parameters: TAG_SUB_NAME, editable (golden publish))
private static final String IMPORTED_PACKAGE_SIZE = SUB_COMPONENT + "imported_package_size";
private static final String ITEMS_BUFFER_SIZE = SUB_COMPONENT + "items_buffer_size";

private final Counter transientImportErrors;
// Increased on every failure to apply a package
private static final String FAILED_PACKAGE_IMPORTS = SUB_COMPONENT + "failed_package_imports";

// Increased when a package failed before but then succeeded (parameters: agent, editable (golden publish))
private static final String TRANSIENT_IMPORT_ERRORS = SUB_COMPONENT + "transient_import_errors";

// Only counted in error queue setup
private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + "permanent_import_errors";

private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_pre_process_request_count";
private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_post_process_success_count";
private static final String IMPORT_POST_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_post_process_request_count";
private static final String INVALIDATION_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "invalidation_process_success_count";
private static final String INVALIDATION_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "invalidation_process_request_count";
private static final String IMPORT_PRE_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_pre_process_success_count";

private static final String IMPORTED_PACKAGE_DURATION = SUB_COMPONENT + "imported_package_duration";
private static final String REMOVED_PACKAGE_DURATION = SUB_COMPONENT + "removed_package_duration";
private static final String REMOVED_FAILED_PACKAGE_DURATION = SUB_COMPONENT + "removed_failed_package_duration";
private static final String SEND_STORED_STATUS_DURATION = SUB_COMPONENT + "send_stored_status_duration";
private static final String PROCESS_QUEUE_ITEM_DURATION = SUB_COMPONENT + "process_queue_item_duration";
private static final String REQUEST_DISTRIBUTED_DURATION = SUB_COMPONENT + "request_distributed_duration";
private static final String PACKAGE_JOURNAL_DISTRIBUTION_DURATION = SUB_COMPONENT + "package_journal_distribution_duration";
private static final String IMPORT_PRE_PROCESS_DURATION = SUB_COMPONENT + "import_pre_process_duration";
private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT + "import_post_process_duration";
private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT + "invalidation_process_duration";

private final Counter permanentImportErrors;
private final MetricsService metricsService;
private final Tag tagSubName;
private final Tag tagEditable;
private final List<Tag> tags;

@Activate
public SubscriberMetrics(@Reference MetricsService metricsService) {
public SubscriberMetrics(MetricsService metricsService, String subAgentName, boolean editable) {
this.metricsService = metricsService;
importedPackageSize = metricsService.histogram(getMetricName("imported_package_size"));
itemsBufferSize = metricsService.counter(getMetricName("items_buffer_size"));
importedPackageDuration = metricsService.timer(getMetricName("imported_package_duration"));
removedPackageDuration = metricsService.timer(getMetricName("removed_package_duration"));
removedFailedPackageDuration = metricsService.timer(getMetricName("removed_failed_package_duration"));
failedPackageImports = metricsService.meter(getMetricName("failed_package_imports"));
sendStoredStatusDuration = metricsService.timer(getMetricName("send_stored_status_duration"));
processQueueItemDuration = metricsService.timer(getMetricName("process_queue_item_duration"));
packageDistributedDuration = metricsService.timer(getMetricName("request_distributed_duration"));
packageJournalDistributionDuration = metricsService.timer(getMetricName("package_journal_distribution_duration"));
importPreProcessDuration = metricsService.timer(getMetricName("import_pre_process_duration"));
importPreProcessSuccess = metricsService.counter(getMetricName("import_pre_process_success_count"));
importPreProcessRequest = metricsService.counter(getMetricName("import_pre_process_request_count"));
importPostProcessDuration = metricsService.timer(getMetricName("import_post_process_duration"));
importPostProcessSuccess = metricsService.counter(getMetricName("import_post_process_success_count"));
importPostProcessRequest = metricsService.counter(getMetricName("import_post_process_request_count"));
invalidationProcessDuration = metricsService.timer(getMetricName("invalidation_process_duration"));
invalidationProcessSuccess = metricsService.counter(getMetricName("invalidation_process_success_count"));
invalidationProcessRequest = metricsService.counter(getMetricName("invalidation_process_request_count"));
transientImportErrors = metricsService.counter(getMetricName("transient_import_errors"));
permanentImportErrors = metricsService.counter(getMetricName("permanent_import_errors"));
tagSubName = Tag.of(TAG_SUB_NAME, subAgentName);
tagEditable = Tag.of(TAG_EDITABLE, Boolean.toString(editable));
tags = Arrays.asList(
tagSubName,
tagEditable);
}

/**
Expand All @@ -112,7 +105,7 @@ public SubscriberMetrics(@Reference MetricsService metricsService) {
* @return a Sling Metrics histogram
*/
public Histogram getImportedPackageSize() {
return importedPackageSize;
return metricsService.histogram(getMetricName(IMPORTED_PACKAGE_SIZE, tags));
}

/**
Expand All @@ -121,7 +114,7 @@ public Histogram getImportedPackageSize() {
* @return a Sling Metrics counter
*/
public Counter getItemsBufferSize() {
return itemsBufferSize;
return metricsService.counter(getMetricName(ITEMS_BUFFER_SIZE, tags));
}

/**
Expand All @@ -130,7 +123,7 @@ public Counter getItemsBufferSize() {
* @return a Sling Metrics timer
*/
public Timer getImportedPackageDuration() {
return importedPackageDuration;
return metricsService.timer(getMetricName(IMPORTED_PACKAGE_DURATION, tags));
}

/**
Expand All @@ -139,7 +132,7 @@ public Timer getImportedPackageDuration() {
* @return a Sling Metrics timer
*/
public Timer getRemovedPackageDuration() {
return removedPackageDuration;
return metricsService.timer(getMetricName(REMOVED_PACKAGE_DURATION, tags));
}

/**
Expand All @@ -148,7 +141,7 @@ public Timer getRemovedPackageDuration() {
* @return a Sling Metrics timer
*/
public Timer getRemovedFailedPackageDuration() {
return removedFailedPackageDuration;
return metricsService.timer(getMetricName(REMOVED_FAILED_PACKAGE_DURATION, tags));
}

/**
Expand All @@ -157,7 +150,7 @@ public Timer getRemovedFailedPackageDuration() {
* @return a Sling Metrics meter
*/
public Meter getFailedPackageImports() {
return failedPackageImports;
return metricsService.meter(getMetricName(FAILED_PACKAGE_IMPORTS, tags));
}

/**
Expand All @@ -166,7 +159,7 @@ public Meter getFailedPackageImports() {
* @return a Sling Metric timer
*/
public Timer getSendStoredStatusDuration() {
return sendStoredStatusDuration;
return metricsService.timer(getMetricName(SEND_STORED_STATUS_DURATION, tags));
}

/**
Expand All @@ -175,7 +168,7 @@ public Timer getSendStoredStatusDuration() {
* @return a Sling Metric timer
*/
public Timer getProcessQueueItemDuration() {
return processQueueItemDuration;
return metricsService.timer(getMetricName(PROCESS_QUEUE_ITEM_DURATION, tags));
}

/**
Expand All @@ -185,7 +178,7 @@ public Timer getProcessQueueItemDuration() {
* @return a Sling Metric timer
*/
public Timer getPackageDistributedDuration() {
return packageDistributedDuration;
return metricsService.timer(getMetricName(REQUEST_DISTRIBUTED_DURATION, tags));
}

/**
Expand All @@ -195,7 +188,7 @@ public Timer getPackageDistributedDuration() {
* @return a Sling Metrics timer
*/
public Timer getPackageJournalDistributionDuration() {
return packageJournalDistributionDuration;
return metricsService.timer(getMetricName(PACKAGE_JOURNAL_DISTRIBUTION_DURATION, tags));
}

/**
Expand All @@ -204,63 +197,58 @@ public Timer getPackageJournalDistributionDuration() {
* @return a Sling Metric counter
*/
public Counter getPackageStatusCounter(Status status) {
return metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), "status", status.name()));
Tag tagStatus = Tag.of(TAG_STATUS, status.name());
String name = getMetricName(PACKAGE_STATUS_COUNT, Arrays.asList(tagSubName, tagEditable, tagStatus));
return metricsService.counter(name);
}

public Timer getImportPreProcessDuration() {
return importPreProcessDuration;
return metricsService.timer(getMetricName(IMPORT_PRE_PROCESS_DURATION, tags));
}

public Counter getImportPreProcessSuccess() {
return importPreProcessSuccess;
return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_SUCCESS_COUNT, tags));
}

public Counter getImportPreProcessRequest() {
return importPreProcessRequest;
return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_REQUEST_COUNT, tags));
}

public Timer getImportPostProcessDuration() {
return importPostProcessDuration;
return metricsService.timer(getMetricName(IMPORT_POST_PROCESS_DURATION, tags));
}

public Counter getImportPostProcessSuccess() {
return importPostProcessSuccess;
return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_SUCCESS_COUNT, tags));
}

public Counter getImportPostProcessRequest() {
return importPostProcessRequest;
return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_REQUEST_COUNT, tags));
}

public Timer getInvalidationProcessDuration() {
return invalidationProcessDuration;
return metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
}

public Counter getInvalidationProcessSuccess() {
return invalidationProcessSuccess;
return metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
}

public Counter getInvalidationProcessRequest() {
return invalidationProcessRequest;
return metricsService.counter(getMetricName(INVALIDATION_PROCESS_REQUEST_COUNT, tags));
}

public Counter getTransientImportErrors() {
return transientImportErrors;
return metricsService.counter(getMetricName(TRANSIENT_IMPORT_ERRORS, tags));
}

public Counter getPermanentImportErrors() {
return permanentImportErrors;
return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, tags));
}

public void currentRetries(String subAgentName, Supplier<Integer> retriesCallback) {
String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
metricsService.gauge(nameRetries, retriesCallback);
public void currentRetries(Supplier<Integer> retriesCallback) {
metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), retriesCallback);
}

private String getMetricName(String name) {
return format("%s.%s", SUB_COMPONENT, name);
}

private String getNameWithLabel(String name, String label, String labelVal) {
return format("%s;%s=%s", name, label, labelVal);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public void deactivate() {
public TopologyView getTopologyView() {
return viewManager.getCurrentView();
}

public int getSubscriberCount(String pubAgentName) {
return getTopologyView().getSubscribedAgentIds(pubAgentName).size();
}

@Override
public void run() {
Expand Down

0 comments on commit 502b13c

Please sign in to comment.