From 0533fbef9f32d380f2fd6e46655abd7ddd4d1b70 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Piotr=20R=C5=BCysko?=
Date: Thu, 24 Jan 2019 14:41:54 +0100
Subject: [PATCH] Improved subscription health problem indicators (#972)
---
.../hermes/api/SubscriptionHealthProblem.java | 10 +-
.../tech/hermes/api/SubscriptionMetrics.java | 29 ++-
.../subscription/SubscriptionHealth.js | 1 -
.../static/partials/subscription.html | 4 -
.../config/HealthConfiguration.java | 34 ++-
.../management/config/HealthProperties.java | 54 ++++-
.../health/SubscriptionHealthChecker.java | 3 +-
.../health/SubscriptionHealthContext.java | 65 +++++-
.../SubscriptionHealthContextCreator.java | 31 ---
.../health/SubscriptionMetrics.java | 43 ----
.../subscription/health/TopicMetrics.java | 13 --
.../health/problem/DisabledIndicator.java | 15 ++
.../health/problem/LaggingIndicator.java | 7 +-
.../problem/MalfunctioningIndicator.java | 16 +-
.../ReceivingMalformedMessagesIndicator.java | 30 +--
.../health/problem/SlowIndicator.java | 27 ---
.../health/problem/TimingOutIndicator.java | 17 +-
.../health/problem/UnreachableIndicator.java | 17 +-
.../HybridSubscriptionMetricsRepository.java | 12 +-
.../SubscriptionHealthCheckerTest.groovy | 218 +++++++++++++-----
...idSubscriptionMetricsRepositoryTest.groovy | 4 +-
.../tech/hermes/integration/MetricsTest.java | 5 +-
.../integration/helper/GraphiteEndpoint.java | 115 +++++++--
...istUnhealthySubscriptionsForOwnerTest.java | 39 ++--
.../SubscriptionManagementTest.java | 15 +-
25 files changed, 486 insertions(+), 338 deletions(-)
delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContextCreator.java
delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionMetrics.java
delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/TopicMetrics.java
create mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/DisabledIndicator.java
delete mode 100644 hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/SlowIndicator.java
diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionHealthProblem.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionHealthProblem.java
index 613a488a6d..6610be6cef 100644
--- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionHealthProblem.java
+++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionHealthProblem.java
@@ -9,14 +9,13 @@
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.LAGGING;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.MALFUNCTIONING;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.RECEIVING_MALFORMED_MESSAGES;
-import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.SLOW;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.TIMING_OUT;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.UNREACHABLE;
public class SubscriptionHealthProblem {
public enum ProblemCode {
- LAGGING, SLOW, UNREACHABLE, TIMING_OUT, MALFUNCTIONING, RECEIVING_MALFORMED_MESSAGES
+ LAGGING, UNREACHABLE, TIMING_OUT, MALFUNCTIONING, RECEIVING_MALFORMED_MESSAGES
}
private final ProblemCode code;
@@ -50,13 +49,6 @@ public static SubscriptionHealthProblem receivingMalformedMessages(double code4x
);
}
- public static SubscriptionHealthProblem slow(double subscriptionRate, double topicRate) {
- return new SubscriptionHealthProblem(
- SLOW,
- format("Consumption rate (%.0f RPS) is lower than topic production rate (%.0f RPS)", subscriptionRate, topicRate)
- );
- }
-
public static SubscriptionHealthProblem timingOut(double timeoutsRate) {
return new SubscriptionHealthProblem(
TIMING_OUT,
diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java
index 04fac8575d..5074eec378 100644
--- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java
+++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/SubscriptionMetrics.java
@@ -16,17 +16,24 @@ public class SubscriptionMetrics {
private Subscription.State state;
private String rate;
private String throughput;
+ private String batchRate;
private SubscriptionMetrics() {
}
@JsonCreator
- public SubscriptionMetrics(@JsonProperty("delivered") long delivered, @JsonProperty("discarded") long discarded,
- @JsonProperty("inflight") long inflight, @JsonProperty("timeouts") String timeouts,
- @JsonProperty("otherErrors") String otherErrors, @JsonProperty("codes2xx") String codes2xx,
- @JsonProperty("codes4xx") String codes4xx, @JsonProperty("codes5xx") String codes5xx,
- @JsonProperty("Subscription") Subscription.State state, @JsonProperty("rate") String rate,
- @JsonProperty("throughput") String throughput) {
+ public SubscriptionMetrics(@JsonProperty("delivered") long delivered,
+ @JsonProperty("discarded") long discarded,
+ @JsonProperty("inflight") long inflight,
+ @JsonProperty("timeouts") String timeouts,
+ @JsonProperty("otherErrors") String otherErrors,
+ @JsonProperty("codes2xx") String codes2xx,
+ @JsonProperty("codes4xx") String codes4xx,
+ @JsonProperty("codes5xx") String codes5xx,
+ @JsonProperty("Subscription") Subscription.State state,
+ @JsonProperty("rate") String rate,
+ @JsonProperty("throughput") String throughput,
+ @JsonProperty("batchRate") String batchRate) {
this.delivered = delivered;
this.discarded = discarded;
this.inflight = inflight;
@@ -38,6 +45,7 @@ public SubscriptionMetrics(@JsonProperty("delivered") long delivered, @JsonPrope
this.state = state;
this.rate = rate;
this.throughput = throughput;
+ this.batchRate = batchRate;
}
public long getDelivered() {
@@ -88,6 +96,10 @@ public String getThroughput() {
return throughput;
}
+ public String getBatchRate() {
+ return batchRate;
+ }
+
public static class Builder {
private SubscriptionMetrics subscriptionMetrics;
@@ -155,6 +167,11 @@ public Builder withThroughput(String throughput) {
return this;
}
+ public Builder withBatchRate(String batchRate) {
+ subscriptionMetrics.batchRate = batchRate;
+ return this;
+ }
+
public static Builder subscriptionMetrics() {
return new Builder();
}
diff --git a/hermes-console/static/js/console/subscription/SubscriptionHealth.js b/hermes-console/static/js/console/subscription/SubscriptionHealth.js
index f38bd2e9fd..09a0c94c8e 100644
--- a/hermes-console/static/js/console/subscription/SubscriptionHealth.js
+++ b/hermes-console/static/js/console/subscription/SubscriptionHealth.js
@@ -16,7 +16,6 @@ health.factory('SubscriptionHealth', ['DiscoveryService', '$resource',
status: health.status,
problems: {
lagging: problemOccurs('LAGGING'),
- slow: problemOccurs('SLOW'),
malfunctioning: problemOccurs('MALFUNCTIONING'),
receivingMalformedMessages: problemOccurs('RECEIVING_MALFORMED_MESSAGES'),
timingOut: problemOccurs('TIMING_OUT'),
diff --git a/hermes-console/static/partials/subscription.html b/hermes-console/static/partials/subscription.html
index 6a45124fd6..f62a582b80 100644
--- a/hermes-console/static/partials/subscription.html
+++ b/hermes-console/static/partials/subscription.html
@@ -40,10 +40,6 @@ Subscription health problems
Subscription lag is growing. Examine output rate and service response codes, looks like it is
not consuming at full speed.
-
- Consumption rate is lower than topic production rate. Examine output rate and service response codes.
- If everything is well, take a look at maximum rate, maybe it is too low?
-
Consuming service returns a lot of 5xx codes. Looks like it might be malfunctioning or doesn't know
how to handle messages. Take a look at "Last undelivered message" for more information.
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthConfiguration.java
index 7f2a9efefa..3b42d3200e 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthConfiguration.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthConfiguration.java
@@ -5,46 +5,58 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
+import pl.allegro.tech.hermes.management.domain.subscription.health.problem.DisabledIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.LaggingIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.MalfunctioningIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.ReceivingMalformedMessagesIndicator;
-import pl.allegro.tech.hermes.management.domain.subscription.health.problem.SlowIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.TimingOutIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.UnreachableIndicator;
@Configuration
@EnableConfigurationProperties({HealthProperties.class})
public class HealthConfiguration {
+ private static final DisabledIndicator DISABLED_INDICATOR = new DisabledIndicator();
+
@Autowired
private HealthProperties healthProperties;
@Bean
public SubscriptionHealthProblemIndicator laggingIndicator() {
- return new LaggingIndicator(healthProperties.getMaxLagInSeconds());
- }
-
- @Bean
- public SubscriptionHealthProblemIndicator slowIndicator() {
- return new SlowIndicator(healthProperties.getMinSubscriptionToTopicSpeedRatio());
+ if (healthProperties.isLaggingIndicatorEnabled()) {
+ return new LaggingIndicator(healthProperties.getMaxLagInSeconds());
+ }
+ return DISABLED_INDICATOR;
}
@Bean
public SubscriptionHealthProblemIndicator unreachableIndicator() {
- return new UnreachableIndicator(healthProperties.getMaxOtherErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ if (healthProperties.isUnreachableIndicatorEnabled()) {
+ return new UnreachableIndicator(healthProperties.getMaxOtherErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ }
+ return DISABLED_INDICATOR;
}
@Bean
public SubscriptionHealthProblemIndicator timingOutIndicator() {
- return new TimingOutIndicator(healthProperties.getMaxTimeoutsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ if (healthProperties.isTimingOutIndicatorEnabled()) {
+ return new TimingOutIndicator(healthProperties.getMaxTimeoutsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ }
+ return DISABLED_INDICATOR;
}
@Bean
public SubscriptionHealthProblemIndicator malfunctioningIndicator() {
- return new MalfunctioningIndicator(healthProperties.getMax5xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ if (healthProperties.isMalfunctioningIndicatorEnabled()) {
+ return new MalfunctioningIndicator(healthProperties.getMax5xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ }
+ return DISABLED_INDICATOR;
}
@Bean
public SubscriptionHealthProblemIndicator receivingMalformedMessagesIndicator() {
- return new ReceivingMalformedMessagesIndicator(healthProperties.getMax4xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ if (healthProperties.isReceivingMalformedMessagesIndicatorEnabled()) {
+ return new ReceivingMalformedMessagesIndicator(healthProperties.getMax4xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
+ }
+ return DISABLED_INDICATOR;
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthProperties.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthProperties.java
index bd40e7a910..6c45f8fd29 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthProperties.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/HealthProperties.java
@@ -5,12 +5,16 @@
@ConfigurationProperties(prefix = "health")
public class HealthProperties {
private int maxLagInSeconds = 600;
- private double minSubscriptionToTopicSpeedRatio = 0.8;
private double maxOtherErrorsRatio = 0.5;
private double maxTimeoutsRatio = 0.1;
private double max5xxErrorsRatio = 0.1;
private double max4xxErrorsRatio = 0.1;
private double minSubscriptionRateForReliableMetrics = 2.0;
+ private boolean laggingIndicatorEnabled = true;
+ private boolean malfunctioningIndicatorEnabled = true;
+ private boolean receivingMalformedMessagesIndicatorEnabled = true;
+ private boolean timingOutIndicatorEnabled = true;
+ private boolean unreachableIndicatorEnabled = true;
public int getMaxLagInSeconds() {
return maxLagInSeconds;
@@ -20,14 +24,6 @@ public void setMaxLagInSeconds(int maxLagInSeconds) {
this.maxLagInSeconds = maxLagInSeconds;
}
- public double getMinSubscriptionToTopicSpeedRatio() {
- return minSubscriptionToTopicSpeedRatio;
- }
-
- public void setMinSubscriptionToTopicSpeedRatio(double minSubscriptionToTopicSpeedRatio) {
- this.minSubscriptionToTopicSpeedRatio = minSubscriptionToTopicSpeedRatio;
- }
-
public double getMaxOtherErrorsRatio() {
return maxOtherErrorsRatio;
}
@@ -67,4 +63,44 @@ public double getMinSubscriptionRateForReliableMetrics() {
public void setMinSubscriptionRateForReliableMetrics(double minSubscriptionRateForReliableMetrics) {
this.minSubscriptionRateForReliableMetrics = minSubscriptionRateForReliableMetrics;
}
+
+ public boolean isLaggingIndicatorEnabled() {
+ return laggingIndicatorEnabled;
+ }
+
+ public void setLaggingIndicatorEnabled(boolean laggingIndicatorEnabled) {
+ this.laggingIndicatorEnabled = laggingIndicatorEnabled;
+ }
+
+ public boolean isMalfunctioningIndicatorEnabled() {
+ return malfunctioningIndicatorEnabled;
+ }
+
+ public void setMalfunctioningIndicatorEnabled(boolean malfunctioningIndicatorEnabled) {
+ this.malfunctioningIndicatorEnabled = malfunctioningIndicatorEnabled;
+ }
+
+ public boolean isReceivingMalformedMessagesIndicatorEnabled() {
+ return receivingMalformedMessagesIndicatorEnabled;
+ }
+
+ public void setReceivingMalformedMessagesIndicatorEnabled(boolean receivingMalformedMessagesIndicatorEnabled) {
+ this.receivingMalformedMessagesIndicatorEnabled = receivingMalformedMessagesIndicatorEnabled;
+ }
+
+ public boolean isTimingOutIndicatorEnabled() {
+ return timingOutIndicatorEnabled;
+ }
+
+ public void setTimingOutIndicatorEnabled(boolean timingOutIndicatorEnabled) {
+ this.timingOutIndicatorEnabled = timingOutIndicatorEnabled;
+ }
+
+ public boolean isUnreachableIndicatorEnabled() {
+ return unreachableIndicatorEnabled;
+ }
+
+ public void setUnreachableIndicatorEnabled(boolean unreachableIndicatorEnabled) {
+ this.unreachableIndicatorEnabled = unreachableIndicatorEnabled;
+ }
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthChecker.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthChecker.java
index 0e43436411..cfaefb9b91 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthChecker.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthChecker.java
@@ -17,7 +17,6 @@
@Component
public class SubscriptionHealthChecker {
private final Set problemIndicators;
- private final SubscriptionHealthContextCreator contextCreator = new SubscriptionHealthContextCreator();
@Autowired
public SubscriptionHealthChecker(Set problemIndicators) {
@@ -38,7 +37,7 @@ private boolean isSuspended(Subscription subscription) {
private SubscriptionHealth getActiveSubscriptionHealth(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
try {
- SubscriptionHealthContext healthContext = contextCreator.createContext(subscription, topicMetrics, subscriptionMetrics);
+ SubscriptionHealthContext healthContext = new SubscriptionHealthContext(subscription, topicMetrics, subscriptionMetrics);
Set healthProblems = getHealthProblems(healthContext);
return SubscriptionHealth.of(healthProblems);
} catch (NumberFormatException e) {
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContext.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContext.java
index a94847081e..f3fc96c48c 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContext.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContext.java
@@ -1,27 +1,70 @@
package pl.allegro.tech.hermes.management.domain.subscription.health;
import pl.allegro.tech.hermes.api.Subscription;
+import pl.allegro.tech.hermes.api.SubscriptionMetrics;
+import pl.allegro.tech.hermes.api.TopicMetrics;
+
+import static java.lang.Double.parseDouble;
public final class SubscriptionHealthContext {
private final Subscription subscription;
- private final TopicMetrics topicMetrics;
- private final SubscriptionMetrics subscriptionMetrics;
+ private final double topicRate;
+ private final double subscriptionRate;
+ private final double timeoutsRate;
+ private final double otherErrorsRate;
+ private final double code4xxErrorsRate;
+ private final double code5xxErrorsRate;
+ private final double batchRate;
+ private final long lag;
- public SubscriptionHealthContext(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
+ SubscriptionHealthContext(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
this.subscription = subscription;
- this.topicMetrics = topicMetrics;
- this.subscriptionMetrics = subscriptionMetrics;
+ this.topicRate = parseDouble(topicMetrics.getRate());
+ this.subscriptionRate = parseDouble(subscriptionMetrics.getRate());
+ this.timeoutsRate = parseDouble(subscriptionMetrics.getTimeouts());
+ this.otherErrorsRate = parseDouble(subscriptionMetrics.getOtherErrors());
+ this.code4xxErrorsRate = parseDouble(subscriptionMetrics.getCodes4xx());
+ this.code5xxErrorsRate = parseDouble(subscriptionMetrics.getCodes5xx());
+ this.batchRate = parseDouble(subscriptionMetrics.getBatchRate());
+ this.lag = subscriptionMetrics.getLag();
+ }
+
+ public boolean subscriptionHasRetryOnError() {
+ if (subscription.isBatchSubscription()) {
+ return subscription.getBatchSubscriptionPolicy().isRetryClientErrors();
+ } else {
+ return subscription.getSerialSubscriptionPolicy().isRetryClientErrors();
+ }
+ }
+
+ public double getSubscriptionRateRespectingDeliveryType() {
+ if (subscription.isBatchSubscription()) {
+ return batchRate;
+ }
+ return subscriptionRate;
+ }
+
+ public double getOtherErrorsRate() {
+ return otherErrorsRate;
+ }
+
+ public double getTimeoutsRate() {
+ return timeoutsRate;
+ }
+
+ public double getCode4xxErrorsRate() {
+ return code4xxErrorsRate;
}
- public Subscription getSubscription() {
- return subscription;
+ public double getCode5xxErrorsRate() {
+ return code5xxErrorsRate;
}
- public TopicMetrics getTopicMetrics() {
- return topicMetrics;
+ public long getLag() {
+ return lag;
}
- public SubscriptionMetrics getSubscriptionMetrics() {
- return subscriptionMetrics;
+ public double getTopicRate() {
+ return topicRate;
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContextCreator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContextCreator.java
deleted file mode 100644
index 65abfd71df..0000000000
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionHealthContextCreator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package pl.allegro.tech.hermes.management.domain.subscription.health;
-
-import pl.allegro.tech.hermes.api.Subscription;
-
-import static java.lang.Double.parseDouble;
-
-class SubscriptionHealthContextCreator {
- public SubscriptionHealthContext createContext(Subscription subscription, pl.allegro.tech.hermes.api.TopicMetrics topicMetrics,
- pl.allegro.tech.hermes.api.SubscriptionMetrics subscriptionMetrics) {
- return new SubscriptionHealthContext(
- subscription,
- convertTopicMetrics(topicMetrics),
- convertSubscriptionMetrics(subscriptionMetrics)
- );
- }
-
- private TopicMetrics convertTopicMetrics(pl.allegro.tech.hermes.api.TopicMetrics topicMetrics) {
- double rate = parseDouble(topicMetrics.getRate());
- return new TopicMetrics(rate);
- }
-
- private SubscriptionMetrics convertSubscriptionMetrics(pl.allegro.tech.hermes.api.SubscriptionMetrics subscriptionMetrics) {
- double rate = parseDouble(subscriptionMetrics.getRate());
- double timeoutsRate = parseDouble(subscriptionMetrics.getTimeouts());
- double otherErrorsRate = parseDouble(subscriptionMetrics.getOtherErrors());
- double code4xxErrorsRate = parseDouble(subscriptionMetrics.getCodes4xx());
- double code5xxErrorsRate = parseDouble(subscriptionMetrics.getCodes5xx());
- long lag = subscriptionMetrics.getLag();
- return new SubscriptionMetrics(rate, timeoutsRate, otherErrorsRate, code4xxErrorsRate, code5xxErrorsRate, lag);
- }
-}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionMetrics.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionMetrics.java
deleted file mode 100644
index 295237c116..0000000000
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/SubscriptionMetrics.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package pl.allegro.tech.hermes.management.domain.subscription.health;
-
-public final class SubscriptionMetrics {
- private final double rate;
- private final double timeoutsRate;
- private final double otherErrorsRate;
- private final double code4xxErrorsRate;
- private final double code5xxErrorsRate;
- private final long lag;
-
- public SubscriptionMetrics(double rate, double timeoutsRate, double otherErrorsRate, double code4xxErrorsRate, double code5xxErrorsRate, long lag) {
- this.rate = rate;
- this.timeoutsRate = timeoutsRate;
- this.otherErrorsRate = otherErrorsRate;
- this.code4xxErrorsRate = code4xxErrorsRate;
- this.code5xxErrorsRate = code5xxErrorsRate;
- this.lag = lag;
- }
-
- public double getRate() {
- return rate;
- }
-
- public double getTimeoutsRate() {
- return timeoutsRate;
- }
-
- public double getOtherErrorsRate() {
- return otherErrorsRate;
- }
-
- public double getCode4xxErrorsRate() {
- return code4xxErrorsRate;
- }
-
- public double getCode5xxErrorsRate() {
- return code5xxErrorsRate;
- }
-
- public long getLag() {
- return lag;
- }
-}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/TopicMetrics.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/TopicMetrics.java
deleted file mode 100644
index 9a50839e0b..0000000000
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/TopicMetrics.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package pl.allegro.tech.hermes.management.domain.subscription.health;
-
-public final class TopicMetrics {
- private final double rate;
-
- public TopicMetrics(double rate) {
- this.rate = rate;
- }
-
- public double getRate() {
- return rate;
- }
-}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/DisabledIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/DisabledIndicator.java
new file mode 100644
index 0000000000..6de68151ca
--- /dev/null
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/DisabledIndicator.java
@@ -0,0 +1,15 @@
+package pl.allegro.tech.hermes.management.domain.subscription.health.problem;
+
+import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
+import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
+import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
+
+import java.util.Optional;
+
+public class DisabledIndicator implements SubscriptionHealthProblemIndicator {
+
+ @Override
+ public Optional getProblem(SubscriptionHealthContext context) {
+ return Optional.empty();
+ }
+}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/LaggingIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/LaggingIndicator.java
index 957c654a56..27f338e18a 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/LaggingIndicator.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/LaggingIndicator.java
@@ -17,10 +17,9 @@ public LaggingIndicator(int maxLagInSeconds) {
@Override
public Optional getProblem(SubscriptionHealthContext context) {
- long subscriptionLag = context.getSubscriptionMetrics().getLag();
- double topicRate = context.getTopicMetrics().getRate();
- double lagInSeconds = subscriptionLag / topicRate;
- if (lagInSeconds > maxLagInSeconds) {
+ long subscriptionLag = context.getLag();
+ double topicRate = context.getTopicRate();
+ if (topicRate > 0.0 && subscriptionLag > maxLagInSeconds * topicRate) {
return Optional.of(lagging(subscriptionLag));
}
return Optional.empty();
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/MalfunctioningIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/MalfunctioningIndicator.java
index 903902e453..c0b74e85e4 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/MalfunctioningIndicator.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/MalfunctioningIndicator.java
@@ -3,7 +3,6 @@
import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionMetrics;
import java.util.Optional;
@@ -20,20 +19,19 @@ public MalfunctioningIndicator(double max5xxErrorsRatio, double minSubscriptionR
@Override
public Optional getProblem(SubscriptionHealthContext context) {
- SubscriptionMetrics subscriptionMetrics = context.getSubscriptionMetrics();
- if (areSubscriptionMetricsReliable(subscriptionMetrics) && isCode5xxErrorsRateHigh(subscriptionMetrics)) {
- return Optional.of(malfunctioning(subscriptionMetrics.getCode5xxErrorsRate()));
+ if (areSubscriptionMetricsReliable(context) && isCode5xxErrorsRateHigh(context)) {
+ return Optional.of(malfunctioning(context.getCode5xxErrorsRate()));
}
return Optional.empty();
}
- private boolean areSubscriptionMetricsReliable(SubscriptionMetrics subscriptionMetrics) {
- return subscriptionMetrics.getRate() > minSubscriptionRateForReliableMetrics;
+ private boolean areSubscriptionMetricsReliable(SubscriptionHealthContext context) {
+ return context.getSubscriptionRateRespectingDeliveryType() > minSubscriptionRateForReliableMetrics;
}
- private boolean isCode5xxErrorsRateHigh(SubscriptionMetrics subscriptionMetrics) {
- double code5xxErrorsRate = subscriptionMetrics.getCode5xxErrorsRate();
- double rate = subscriptionMetrics.getRate();
+ private boolean isCode5xxErrorsRateHigh(SubscriptionHealthContext context) {
+ double code5xxErrorsRate = context.getCode5xxErrorsRate();
+ double rate = context.getSubscriptionRateRespectingDeliveryType();
return code5xxErrorsRate > max5xxErrorsRatio * rate;
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/ReceivingMalformedMessagesIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/ReceivingMalformedMessagesIndicator.java
index d78579a876..e652b940fc 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/ReceivingMalformedMessagesIndicator.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/ReceivingMalformedMessagesIndicator.java
@@ -1,10 +1,8 @@
package pl.allegro.tech.hermes.management.domain.subscription.health.problem;
-import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionMetrics;
import java.util.Optional;
@@ -21,31 +19,21 @@ public ReceivingMalformedMessagesIndicator(double max4xxErrorsRatio, double minS
@Override
public Optional getProblem(SubscriptionHealthContext context) {
- Subscription subscription = context.getSubscription();
- SubscriptionMetrics subscriptionMetrics = context.getSubscriptionMetrics();
- if (hasClientErrorRetry(subscription)
- && areSubscriptionMetricsReliable(subscriptionMetrics)
- && isCode4xxErrorsRateHigh(subscriptionMetrics)) {
- return Optional.of(receivingMalformedMessages(subscriptionMetrics.getCode4xxErrorsRate()));
+ if (context.subscriptionHasRetryOnError()
+ && areSubscriptionMetricsReliable(context)
+ && isCode4xxErrorsRateHigh(context)) {
+ return Optional.of(receivingMalformedMessages(context.getCode4xxErrorsRate()));
}
return Optional.empty();
}
- private boolean hasClientErrorRetry(Subscription subscription) {
- if (subscription.isBatchSubscription()) {
- return subscription.getBatchSubscriptionPolicy().isRetryClientErrors();
- } else {
- return subscription.getSerialSubscriptionPolicy().isRetryClientErrors();
- }
- }
-
- private boolean areSubscriptionMetricsReliable(SubscriptionMetrics subscriptionMetrics) {
- return subscriptionMetrics.getRate() > minSubscriptionRateForReliableMetrics;
+ private boolean areSubscriptionMetricsReliable(SubscriptionHealthContext context) {
+ return context.getSubscriptionRateRespectingDeliveryType() > minSubscriptionRateForReliableMetrics;
}
- private boolean isCode4xxErrorsRateHigh(SubscriptionMetrics subscriptionMetrics) {
- double code4xxErrorsRate = subscriptionMetrics.getCode4xxErrorsRate();
- double rate = subscriptionMetrics.getRate();
+ private boolean isCode4xxErrorsRateHigh(SubscriptionHealthContext context) {
+ double code4xxErrorsRate = context.getCode4xxErrorsRate();
+ double rate = context.getSubscriptionRateRespectingDeliveryType();
return code4xxErrorsRate > max4xxErrorsRatio * rate;
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/SlowIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/SlowIndicator.java
deleted file mode 100644
index 4110bd0bae..0000000000
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/SlowIndicator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package pl.allegro.tech.hermes.management.domain.subscription.health.problem;
-
-import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
-
-import java.util.Optional;
-
-import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.slow;
-
-public class SlowIndicator implements SubscriptionHealthProblemIndicator {
- private final double minSubscriptionToTopicSpeedRatio;
-
- public SlowIndicator(double minSubscriptionToTopicSpeedRatio) {
- this.minSubscriptionToTopicSpeedRatio = minSubscriptionToTopicSpeedRatio;
- }
-
- @Override
- public Optional getProblem(SubscriptionHealthContext context) {
- double subscriptionRate = context.getSubscriptionMetrics().getRate();
- double topicRate = context.getTopicMetrics().getRate();
- if (subscriptionRate < minSubscriptionToTopicSpeedRatio * topicRate) {
- return Optional.of(slow(subscriptionRate, topicRate));
- }
- return Optional.empty();
- }
-}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/TimingOutIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/TimingOutIndicator.java
index 35302ae644..02925b5037 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/TimingOutIndicator.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/TimingOutIndicator.java
@@ -3,7 +3,6 @@
import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionMetrics;
import java.util.Optional;
@@ -20,20 +19,18 @@ public TimingOutIndicator(double maxTimeoutsRatio, double minSubscriptionRateFor
@Override
public Optional getProblem(SubscriptionHealthContext context) {
- SubscriptionMetrics subscriptionMetrics = context.getSubscriptionMetrics();
- if (areSubscriptionMetricsReliable(subscriptionMetrics) && isTimeoutsRateHigh(subscriptionMetrics)) {
- return Optional.of(timingOut(subscriptionMetrics.getTimeoutsRate()));
+ if (areSubscriptionMetricsReliable(context) && isTimeoutsRateHigh(context)) {
+ return Optional.of(timingOut(context.getTimeoutsRate()));
}
return Optional.empty();
}
- private boolean areSubscriptionMetricsReliable(SubscriptionMetrics subscriptionMetrics) {
- return subscriptionMetrics.getRate() > minSubscriptionRateForReliableMetrics;
+ private boolean areSubscriptionMetricsReliable(SubscriptionHealthContext context) {
+ return context.getSubscriptionRateRespectingDeliveryType() > minSubscriptionRateForReliableMetrics;
}
- private boolean isTimeoutsRateHigh(SubscriptionMetrics subscriptionMetrics) {
- double timeoutsRate = subscriptionMetrics.getTimeoutsRate();
- double rate = subscriptionMetrics.getRate();
- return timeoutsRate > maxTimeoutsRatio * rate;
+ private boolean isTimeoutsRateHigh(SubscriptionHealthContext context) {
+ double timeoutsRate = context.getTimeoutsRate();
+ return timeoutsRate > maxTimeoutsRatio * context.getSubscriptionRateRespectingDeliveryType();
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/UnreachableIndicator.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/UnreachableIndicator.java
index 1f3699c3af..53401fea49 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/UnreachableIndicator.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/health/problem/UnreachableIndicator.java
@@ -3,7 +3,6 @@
import pl.allegro.tech.hermes.api.SubscriptionHealthProblem;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthContext;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
-import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionMetrics;
import java.util.Optional;
@@ -20,20 +19,18 @@ public UnreachableIndicator(double maxOtherErrorsRatio, double minSubscriptionRa
@Override
public Optional getProblem(SubscriptionHealthContext context) {
- SubscriptionMetrics subscriptionMetrics = context.getSubscriptionMetrics();
- if (areSubscriptionMetricsReliable(subscriptionMetrics) && isOtherErrorsRateHigh(subscriptionMetrics)) {
- return Optional.of(unreachable(subscriptionMetrics.getOtherErrorsRate()));
+ if (areSubscriptionMetricsReliable(context) && isOtherErrorsRateHigh(context)) {
+ return Optional.of(unreachable(context.getOtherErrorsRate()));
}
return Optional.empty();
}
- private boolean areSubscriptionMetricsReliable(SubscriptionMetrics subscriptionMetrics) {
- return subscriptionMetrics.getRate() > minSubscriptionRateForReliableMetrics;
+ private boolean areSubscriptionMetricsReliable(SubscriptionHealthContext context) {
+ return context.getSubscriptionRateRespectingDeliveryType() > minSubscriptionRateForReliableMetrics;
}
- private boolean isOtherErrorsRateHigh(SubscriptionMetrics subscriptionMetrics) {
- double otherErrorsRate = subscriptionMetrics.getOtherErrorsRate();
- double rate = subscriptionMetrics.getRate();
- return otherErrorsRate > maxOtherErrorsRatio * rate;
+ private boolean isOtherErrorsRateHigh(SubscriptionHealthContext context) {
+ double otherErrorsRate = context.getOtherErrorsRate();
+ return otherErrorsRate > maxOtherErrorsRatio * context.getSubscriptionRateRespectingDeliveryType();
}
}
diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java
index a3bdf77a11..a5e5894ee6 100644
--- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java
+++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepository.java
@@ -33,6 +33,7 @@ public class HybridSubscriptionMetricsRepository implements SubscriptionMetricsR
private static final String SUBSCRIPTION_HTTP_STATUSES_PATTERN = "sumSeries(%s.consumer.*.status.%s.%s.m1_rate)";
private static final String SUBSCRIPTION_ERROR_TIMEOUT_PATTERN = "sumSeries(%s.consumer.*.status.%s.errors.timeout.m1_rate)";
private static final String SUBSCRIPTION_ERROR_OTHER_PATTERN = "sumSeries(%s.consumer.*.status.%s.errors.other.m1_rate)";
+ private static final String SUBSCRIPTION_BATCH_RATE_PATTERN = "sumSeries(%s.consumer.*.meter.%s.batch.m1_rate)";
private final GraphiteClient graphiteClient;
@@ -69,8 +70,10 @@ public SubscriptionMetrics loadMetrics(TopicName topicName, String subscriptionN
String codes2xxPath = metricPathHttpStatuses(name, "2xx");
String codes4xxPath = metricPathHttpStatuses(name, "4xx");
String codes5xxPath = metricPathHttpStatuses(name, "5xx");
+ String batchPath = metricPathBatchRate(name);
- GraphiteMetrics graphiteMetrics = graphiteClient.readMetrics(codes2xxPath, codes4xxPath, codes5xxPath, rateMetric, timeouts, otherErrors);
+ GraphiteMetrics graphiteMetrics = graphiteClient.readMetrics(codes2xxPath, codes4xxPath, codes5xxPath,
+ rateMetric, timeouts, otherErrors, batchPath);
ZookeeperMetrics zookeeperMetrics = readZookeeperMetrics(name);
return SubscriptionMetrics.Builder.subscriptionMetrics()
@@ -85,6 +88,7 @@ public SubscriptionMetrics loadMetrics(TopicName topicName, String subscriptionN
.withOtherErrors(graphiteMetrics.metricValue(otherErrors))
.withLag(lagSource.getLag(topicName, subscriptionName))
.withThroughput(graphiteMetrics.metricValue(throughput))
+ .withBatchRate(graphiteMetrics.metricValue(batchPath))
.build();
}
@@ -141,6 +145,12 @@ private String metricPathOtherErrors(SubscriptionName name) {
);
}
+ private String metricPathBatchRate(SubscriptionName name) {
+ return String.format(SUBSCRIPTION_BATCH_RATE_PATTERN,
+ metricsPaths.prefix(), subscriptionNameToPath(name)
+ );
+ }
+
private String subscriptionNameToPath(SubscriptionName name) {
return String.format(SUBSCRIPTION_PATH,
escapeDots(name.getTopicName().getGroupName()), name.getTopicName().getName(), escapeDots(name.getName())
diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionHealthCheckerTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionHealthCheckerTest.groovy
index 10d985a26d..0e09e265f9 100644
--- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionHealthCheckerTest.groovy
+++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionHealthCheckerTest.groovy
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.management.domain.subscription
+import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.SubscriptionHealth
import pl.allegro.tech.hermes.api.SubscriptionMetrics
import pl.allegro.tech.hermes.api.TopicMetrics
@@ -7,12 +8,12 @@ import pl.allegro.tech.hermes.management.domain.subscription.health.Subscription
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.LaggingIndicator
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.MalfunctioningIndicator
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.ReceivingMalformedMessagesIndicator
-import pl.allegro.tech.hermes.management.domain.subscription.health.problem.SlowIndicator
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.TimingOutIndicator
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.UnreachableIndicator
import spock.lang.Specification
import spock.lang.Subject
+import static java.lang.Integer.MAX_VALUE
import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy
import static pl.allegro.tech.hermes.api.Subscription.State.ACTIVE
import static pl.allegro.tech.hermes.api.Subscription.State.SUSPENDED
@@ -22,22 +23,21 @@ import static pl.allegro.tech.hermes.api.SubscriptionHealth.Status.UNHEALTHY
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.lagging
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.malfunctioning
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.receivingMalformedMessages
-import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.slow
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.timingOut
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.unreachable
import static pl.allegro.tech.hermes.api.SubscriptionMetrics.Builder.subscriptionMetrics
+import static pl.allegro.tech.hermes.api.SubscriptionPolicy.Builder.subscriptionPolicy
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
class SubscriptionHealthCheckerTest extends Specification {
static final MIN_SUBSCRIPTION_RATE_FOR_RELIABLE_METRICS = 2.0
- static final ACTIVE_SUBSCRIPTION = subscription("group.topic", "subscription")
+ static final ACTIVE_SERIAL_SUBSCRIPTION = subscription("group.topic", "subscription")
.withState(ACTIVE)
.build()
@Subject
- def subscriptionHealthChecker = new SubscriptionHealthChecker([
+ def healthChecker = new SubscriptionHealthChecker([
new LaggingIndicator(600),
- new SlowIndicator(0.8),
new UnreachableIndicator(0.5, MIN_SUBSCRIPTION_RATE_FOR_RELIABLE_METRICS),
new TimingOutIndicator(0.1, MIN_SUBSCRIPTION_RATE_FOR_RELIABLE_METRICS),
new MalfunctioningIndicator(0.1, MIN_SUBSCRIPTION_RATE_FOR_RELIABLE_METRICS),
@@ -55,13 +55,14 @@ class SubscriptionHealthCheckerTest extends Specification {
.withTimeouts("1.2345")
.withOtherErrors("1.2345")
.withLag(789)
+ .withBatchRate("0.0")
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth == HEALTHY
+ health == HEALTHY
}
def "should indicate that a subscriber with a lag longer than 10 minutes is lagging"() {
@@ -72,29 +73,28 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [lagging(60100)] as Set
+ health.status == UNHEALTHY
+ health.problems == [lagging(60100)] as Set
}
- def "should indicate that a subscriber whose speed is smaller than 80% of the topic speed is slow"() {
+ def "should not indicate lagging if production rate is 0"() {
given:
- def topicMetrics = topicMetricsWithRate("100.0")
+ def topicMetrics = topicMetricsWithRate("0.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
- .withRate("79.0")
+ .withLag(60100)
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [slow(79, 100)] as Set
+ health == HEALTHY
}
- def "should indicate that a subscriber with more than 50% 'other' errors is unreachable"() {
+ def "should indicate that a serial subscriber with more than 50% 'other' errors is unreachable"() {
given:
def topicMetrics = topicMetricsWithRate("100.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
@@ -103,14 +103,31 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+
+ then:
+ health.status == UNHEALTHY
+ health.problems == [unreachable(51)] as Set
+ }
+
+ def "should indicate that a batch subscriber with more than 50% 'other' errors is unreachable"() {
+ given:
+ def topicMetrics = topicMetricsWithRate("100.0")
+ def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
+ .withRate("40.0")
+ .withBatchRate("4.0")
+ .withOtherErrors("6.0")
+ .build()
+
+ when:
+ SubscriptionHealth health = healthChecker.checkHealth(batchSubscriptionWithSize(10), topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [unreachable(51)] as Set
+ health.status == UNHEALTHY
+ health.problems == [unreachable(6.0)] as Set
}
- def "should indicate that a subscriber with more than 10% timeouts is timing out"() {
+ def "should indicate that a serial subscriber with more than 10% timeouts is timing out"() {
given:
def topicMetrics = topicMetricsWithRate("100.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
@@ -119,14 +136,31 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [timingOut(11)] as Set
+ health.status == UNHEALTHY
+ health.problems == [timingOut(11)] as Set
}
- def "should indicate that a subscriber returning more than 10% 5xx errors is malfunctioning"() {
+ def "should indicate that a batch subscriber with more than 10% timeouts is timing out"() {
+ given:
+ def topicMetrics = topicMetricsWithRate("100.0")
+ def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
+ .withRate("80.0")
+ .withBatchRate("8.0")
+ .withTimeouts("2.0")
+ .build()
+
+ when:
+ SubscriptionHealth health = healthChecker.checkHealth(batchSubscriptionWithSize(10), topicMetrics, subscriptionMetrics)
+
+ then:
+ health.status == UNHEALTHY
+ health.problems == [timingOut(2.0)] as Set
+ }
+
+ def "should indicate that a serial subscriber returning more than 10% 5xx errors is malfunctioning"() {
given:
def topicMetrics = topicMetricsWithRate("100.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
@@ -135,14 +169,53 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+
+ then:
+ health.status == UNHEALTHY
+ health.problems == [malfunctioning(11)] as Set
+ }
+
+ def "should indicate that a batch subscriber returning more than 10% 5xx errors is malfunctioning"() {
+ given:
+ def topicMetrics = topicMetricsWithRate("100.0")
+ def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
+ .withRate("80.0")
+ .withBatchRate("8.0")
+ .withCodes5xx("2.0")
+ .build()
+
+ when:
+ SubscriptionHealth health = healthChecker.checkHealth(batchSubscriptionWithSize(10), topicMetrics, subscriptionMetrics)
+
+ then:
+ health.status == UNHEALTHY
+ health.problems == [malfunctioning(2.0)] as Set
+ }
+
+ def "should indicate that a serial subscriber with retry returning more than 10% 4xx errors is receiving malformed events"() {
+ given:
+ def retrySubscriptionPolicy = subscriptionPolicy()
+ .withClientErrorRetry()
+ .build()
+ def subscriptionWithRetry = subscription("group.topic", "subscription")
+ .withSubscriptionPolicy(retrySubscriptionPolicy)
+ .build()
+ def topicMetrics = topicMetricsWithRate("100.0")
+ def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
+ .withRate("100.0")
+ .withCodes4xx("11.0")
+ .build()
+
+ when:
+ SubscriptionHealth health = healthChecker.checkHealth(subscriptionWithRetry, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [malfunctioning(11)] as Set
+ health.status == UNHEALTHY
+ health.problems == [receivingMalformedMessages(11.0)] as Set
}
- def "should indicate that a subscriber with client error retry returning more than 10% 4xx errors is receiving malformed events"() {
+ def "should indicate that a batch subscriber with retry returning more than 10% 4xx errors is receiving malformed events"() {
given:
def retrySubscriptionPolicy = batchSubscriptionPolicy()
.withClientErrorRetry(true)
@@ -153,20 +226,21 @@ class SubscriptionHealthCheckerTest extends Specification {
def topicMetrics = topicMetricsWithRate("100.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
.withRate("100.0")
- .withCodes4xx("11.0")
+ .withBatchRate("8.0")
+ .withCodes4xx("2.0")
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(subscriptionWithRetry, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(subscriptionWithRetry, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [receivingMalformedMessages(11)] as Set
+ health.status == UNHEALTHY
+ health.problems == [receivingMalformedMessages(2.0)] as Set
}
- def "should not indicate that a subscriber without client error retry returning more than 10% 4xx errors is receiving malformed events"() {
+ def "should not indicate that a serial subscriber without retry returning more than 10% 4xx errors is receiving malformed events"() {
given:
- def subscriptionWithoutRetry = ACTIVE_SUBSCRIPTION
+ def subscriptionWithoutRetry = ACTIVE_SERIAL_SUBSCRIPTION
def topicMetrics = topicMetricsWithRate("100.0")
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
.withRate("100.0")
@@ -174,10 +248,32 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(subscriptionWithoutRetry, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(subscriptionWithoutRetry, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth == HEALTHY
+ health == HEALTHY
+ }
+
+ def "should not indicate that a batch subscriber without retry returning more than 10% 4xx errors is receiving malformed events"() {
+ given:
+ def noRetrySubscriptionPolicy = batchSubscriptionPolicy()
+ .withClientErrorRetry(false)
+ .build()
+ def subscriptionWithoutRetry = subscription("group.topic", "subscription")
+ .withSubscriptionPolicy(noRetrySubscriptionPolicy)
+ .build()
+ def topicMetrics = topicMetricsWithRate("100.0")
+ def subscriptionMetrics = otherwiseHealthySubscriptionMetrics()
+ .withRate("100.0")
+ .withBatchRate("8.0")
+ .withCodes4xx("2.0")
+ .build()
+
+ when:
+ SubscriptionHealth health = healthChecker.checkHealth(subscriptionWithoutRetry, topicMetrics, subscriptionMetrics)
+
+ then:
+ health == HEALTHY
}
def "should return healthy status for a suspended subscription even if its metrics are not healthy"() {
@@ -197,31 +293,10 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(suspendedSubscription, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(suspendedSubscription, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth == HEALTHY
- }
-
- def "should indicate lagging and slowness but not other health problems for a lagging and slow subscriber with rate lower than 2"() {
- given:
- def topicMetrics = topicMetricsWithRate("100.0")
- def subscriptionMetrics = subscriptionMetrics()
- .withRate("1.9")
- .withCodes2xx("0.0")
- .withCodes4xx("0.25")
- .withCodes5xx("0.25")
- .withTimeouts("0.25")
- .withOtherErrors("1.0")
- .withLag(60100)
- .build()
-
- when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
-
- then:
- subscriptionHealth.status == UNHEALTHY
- subscriptionHealth.problems == [lagging(60100), slow(1.9, 100)] as Set
+ health == HEALTHY
}
def "should return healthy status for a healthy subscription when the topic is idle"() {
@@ -230,10 +305,10 @@ class SubscriptionHealthCheckerTest extends Specification {
def subscriptionMetrics = otherwiseHealthySubscriptionMetrics().build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth == HEALTHY
+ health == HEALTHY
}
def "should return 'no data' status and no problems when some of the metrics are unavailable even if others can be calculated"() {
@@ -245,10 +320,10 @@ class SubscriptionHealthCheckerTest extends Specification {
.build()
when:
- SubscriptionHealth subscriptionHealth = subscriptionHealthChecker.checkHealth(ACTIVE_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
+ SubscriptionHealth health = healthChecker.checkHealth(ACTIVE_SERIAL_SUBSCRIPTION, topicMetrics, subscriptionMetrics)
then:
- subscriptionHealth == NO_DATA
+ health == NO_DATA
}
static TopicMetrics topicMetricsWithRate(String rate) {
@@ -266,5 +341,20 @@ class SubscriptionHealthCheckerTest extends Specification {
.withTimeouts("0.0")
.withOtherErrors("0.0")
.withLag(0)
+ .withBatchRate("0.0")
+ }
+
+ private static Subscription batchSubscriptionWithSize(int batchSize) {
+ subscription("group.topic", "subscription")
+ .withState(ACTIVE)
+ .withSubscriptionPolicy(batchSubscriptionPolicy()
+ .withMessageTtl(100)
+ .withRequestTimeout(100)
+ .withMessageBackoff(10)
+ .withBatchSize(batchSize)
+ .withBatchTime(MAX_VALUE)
+ .withBatchVolume(1024)
+ .build())
+ .build()
}
}
diff --git a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepositoryTest.groovy b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepositoryTest.groovy
index 67371fe10f..e28cc03b43 100644
--- a/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepositoryTest.groovy
+++ b/hermes-management/src/test/groovy/pl/allegro/tech/hermes/management/infrastructure/metrics/HybridSubscriptionMetricsRepositoryTest.groovy
@@ -34,7 +34,7 @@ class HybridSubscriptionMetricsRepositoryTest extends Specification {
String timeouts = 'sumSeries(stats.consumer.*.status.group.topic.subscription.errors.timeout.m1_rate)'
String otherErrors = 'sumSeries(stats.consumer.*.status.group.topic.subscription.errors.other.m1_rate)'
- client.readMetrics(_ as String, _ as String, _ as String, rate, timeouts, otherErrors) >> new GraphiteMetrics()
+ client.readMetrics(_ as String, _ as String, _ as String, rate, timeouts, otherErrors, _ as String) >> new GraphiteMetrics()
.addMetricValue(rate, '10').addMetricValue(timeouts, '100').addMetricValue(otherErrors, '1000')
sharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/delivered') >> 100
sharedCounter.getValue('/hermes/groups/group/topics/topic/subscriptions/subscription/metrics/discarded') >> 1
@@ -56,7 +56,7 @@ class HybridSubscriptionMetricsRepositoryTest extends Specification {
def "should read subscription metrics for all http status codes"() {
given:
client.readMetrics(getHttpStatusCodeForFamily(2), getHttpStatusCodeForFamily(4), getHttpStatusCodeForFamily(5),
- _ as String, _ as String, _ as String) >> new GraphiteMetrics()
+ _ as String, _ as String, _ as String, _ as String) >> new GraphiteMetrics()
.addMetricValue(getHttpStatusCodeForFamily(2), '2')
.addMetricValue(getHttpStatusCodeForFamily(4), '4')
.addMetricValue(getHttpStatusCodeForFamily(5), '5')
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/MetricsTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/MetricsTest.java
index e01098d8bf..07de079392 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/MetricsTest.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/MetricsTest.java
@@ -25,6 +25,7 @@
import static java.lang.Integer.MAX_VALUE;
import static javax.ws.rs.core.Response.Status.CREATED;
import static org.assertj.core.api.Assertions.assertThat;
+import static pl.allegro.tech.hermes.integration.helper.GraphiteEndpoint.subscriptionMetricsStub;
import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy;
import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
@@ -74,7 +75,7 @@ public void shouldIncreaseSubscriptionDeliveredMetricsAfterMessageDelivered() {
// given
Topic topic = operations.buildTopic("pl.group", "topic");
operations.createSubscription(topic, "subscription", HTTP_ENDPOINT_URL);
- graphiteEndpoint.returnMetricForSubscription("pl_group", "topic", "subscription", 15);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("pl_group.topic.subscription").withRate(15).build());
remoteService.expectMessages(TestMessage.simple().body());
assertThat(publisher.publish("pl.group.topic", TestMessage.simple().body()).getStatus())
@@ -117,7 +118,7 @@ public void shouldReadSubscriptionDeliveryRate() {
// given
Topic topic = operations.buildTopic("pl.allegro.tech.hermes", "topic");
operations.createSubscription(topic, "pl.allegro.tech.hermes.subscription", HTTP_ENDPOINT_URL);
- graphiteEndpoint.returnMetricForSubscription("pl_allegro_tech_hermes", "topic", "pl_allegro_tech_hermes_subscription", 15);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("pl_allegro_tech_hermes.topic.pl_allegro_tech_hermes_subscription").withRate(15).build());
wait.until(() -> {
// when
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/GraphiteEndpoint.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/GraphiteEndpoint.java
index 398717cb3c..75f6f725ee 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/GraphiteEndpoint.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/helper/GraphiteEndpoint.java
@@ -1,12 +1,20 @@
package pl.allegro.tech.hermes.integration.helper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import pl.allegro.tech.hermes.integration.env.EnvironmentAware;
+import java.util.ArrayList;
+import java.util.List;
+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
public class GraphiteEndpoint implements EnvironmentAware {
@@ -20,19 +28,9 @@ public class GraphiteEndpoint implements EnvironmentAware {
+ "[[DELIVERY, TIMESTAMP]]}"
+ "]";
- private static final String SUBSCRIPTION_RESPONSE
- = "[{" +
- " \"target\": \"sumSeries(stats.tech.hermes.consumer.*.meter.SUBSCRIPTION.m1_rate)\"," +
- " \"datapoints\": [[RATE, TIMESTAMP]]," +
- " \"tags\": []" +
- "}]";
-
private static final String TOPIC_URL_PATTERN = "/.*sumSeries%28stats.tech.hermes\\." +
"(consumer|producer)\\.%2A\\.meter\\.GROUP\\.TOPIC\\.m1_rate%29.*";
- private static final String SUBSCRIPTION_URL_PATTERN = "/.*sumSeries%28stats.tech.hermes\\." +
- "consumer\\.%2A\\.meter\\.GROUP\\.TOPIC\\.SUBSCRIPTION\\.m1_rate%29.*";
-
private final WireMock graphiteListener;
public GraphiteEndpoint(WireMockServer graphiteMock) {
@@ -52,18 +50,6 @@ public void returnMetricForTopic(String group, String topic, int rate, int deliv
.withBody(response)));
}
- public void returnMetricForSubscription(String group, String topic, String subscription, int rate) {
- String response = SUBSCRIPTION_RESPONSE.replaceAll("SUBSCRIPTION", group + "." + topic + "." + subscription)
- .replaceAll("RATE", Integer.toString(rate))
- .replaceAll("TIMESTAMP", TIMESTAMP);
- String urlPattern = SUBSCRIPTION_URL_PATTERN.replace("GROUP", group).replace("TOPIC", topic).replace("SUBSCRIPTION", subscription);
- graphiteListener.register(get(urlMatching(urlPattern))
- .willReturn(aResponse()
- .withStatus(200)
- .withHeader("Content-Type", "application/json")
- .withBody(response)));
- }
-
public void returnServerErrorForAllTopics() {
graphiteListener.register(get(urlMatching(TOPIC_URL_PATTERN))
.willReturn(aResponse()
@@ -73,4 +59,89 @@ public void returnServerErrorForAllTopics() {
);
}
+ public void returnMetric(SubscriptionMetricsStubDefinition metricsStubDefinition) {
+ graphiteListener.register(get(urlMatching(metricsStubDefinition.toUrlPattern()))
+ .willReturn(aResponse()
+ .withStatus(200)
+ .withHeader("Content-Type", "application/json")
+ .withBody(metricsStubDefinition.toBody())));
+ }
+
+ private static class GraphiteStubResponse {
+ private final String target;
+ private final List> datapoints;
+
+ private GraphiteStubResponse(String target, List> datapoints) {
+ this.target = target;
+ this.datapoints = datapoints;
+ }
+
+ public String getTarget() {
+ return target;
+ }
+
+ public List> getDatapoints() {
+ return datapoints;
+ }
+ }
+
+ private static class SubscriptionMetricsStubDefinition {
+ private final String subscription;
+ private final List responseBody;
+
+ private SubscriptionMetricsStubDefinition(String subscription, List responseBody) {
+ this.subscription = subscription;
+ this.responseBody = responseBody;
+ }
+
+ private String toUrlPattern() {
+ return "/.*sumSeries%28stats.tech.hermes\\.consumer\\.%2A\\.meter\\." + subscription + "\\.m1_rate%29.*";
+ }
+
+ private String toBody() {
+ try {
+ return new ObjectMapper().writeValueAsString(responseBody);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class SubscriptionMetricsStubDefinitionBuilder {
+ private final String subscription;
+ private final List response = new ArrayList<>();
+
+ private SubscriptionMetricsStubDefinitionBuilder(String subscription) {
+ this.subscription = subscription;
+ }
+
+ public SubscriptionMetricsStubDefinitionBuilder withRate(int rate) {
+ String target = "sumSeries(stats.tech.hermes.consumer.*.meter." + subscription + ".m1_rate)";
+ response.add(new GraphiteStubResponse(target, dataPointOf(rate)));
+ return this;
+ }
+
+ public SubscriptionMetricsStubDefinitionBuilder withStatusRate(int httpStatus, int rate) {
+ String statusFamily = httpStatusFamily(httpStatus);
+ String target = "sumSeries(stats.tech.hermes.consumer.*.status." + subscription + "." + statusFamily + ".m1_rate)";
+ response.add(new GraphiteStubResponse(target, dataPointOf(rate)));
+ return this;
+ }
+
+ public SubscriptionMetricsStubDefinition build() {
+ return new SubscriptionMetricsStubDefinition(subscription, response);
+ }
+
+ private String httpStatusFamily(int statusCode) {
+ return format("%dxx", statusCode / 100);
+ }
+
+ private static List> dataPointOf(int rate) {
+ return singletonList(asList(rate, TIMESTAMP));
+ }
+ }
+
+ public static SubscriptionMetricsStubDefinitionBuilder subscriptionMetricsStub(String subscription) {
+ return new SubscriptionMetricsStubDefinitionBuilder(subscription);
+ }
}
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/ListUnhealthySubscriptionsForOwnerTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/ListUnhealthySubscriptionsForOwnerTest.java
index 959f441531..827b636206 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/ListUnhealthySubscriptionsForOwnerTest.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/ListUnhealthySubscriptionsForOwnerTest.java
@@ -20,7 +20,8 @@
import static javax.ws.rs.core.MediaType.TEXT_PLAIN;
import static pl.allegro.tech.hermes.api.MonitoringDetails.Severity;
-import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.slow;
+import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.malfunctioning;
+import static pl.allegro.tech.hermes.integration.helper.GraphiteEndpoint.subscriptionMetricsStub;
import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
@@ -55,16 +56,16 @@ public void shouldReturnOnlyUnhealthySubscriptionOfSingleOwner() {
createSubscriptionForOwner("ownedSubscription3", "Team B");
graphiteEndpoint.returnMetricForTopic("group", "topic", 100, 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription1", 100);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription2", 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription3", 100);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription1").withRate(100).withStatusRate(500, 0).build());
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription2").withRate(50).withStatusRate(500, 11).build());
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription3").withRate(100).withStatusRate(500, 0).build());
// then
assertThat(listUnhealthySubscriptionsForOwner("Team A")).containsOnly(
- new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(slow(50, 100)))
+ new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(malfunctioning(11)))
);
assertThat(listUnhealthySubscriptionsForOwnerAsPlainText("Team A")).isEqualTo(
- "ownedSubscription2 - Consumption rate (50 RPS) is lower than topic production rate (100 RPS)"
+ "ownedSubscription2 - Consuming service returns a lot of 5xx codes, currently 11 5xx/s"
);
}
@@ -75,15 +76,15 @@ public void shouldReportAllUnhealthySubscriptionsForEmptyOwnerSource() {
createSubscriptionForOwner("ownedSubscription2", "Team A");
graphiteEndpoint.returnMetricForTopic("group", "topic", 100, 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription1", 100);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription2", 50);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription1").withRate(100).withStatusRate(500, 0).build());
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription2").withRate(50).withStatusRate(500, 11).build());
// then
assertThat(listAllUnhealthySubscriptions()).containsOnly(
- new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(slow(50, 100)))
+ new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(malfunctioning(11)))
);
assertThat(listAllUnhealthySubscriptionsAsPlainText()).isEqualTo(
- "ownedSubscription2 - Consumption rate (50 RPS) is lower than topic production rate (100 RPS)"
+ "ownedSubscription2 - Consuming service returns a lot of 5xx codes, currently 11 5xx/s"
);
}
@@ -95,20 +96,20 @@ public void shouldReportUnhealthySubscriptionsDisrespectingSeverity() {
createSubscriptionForOwner("ownedSubscription3", "Team A", Severity.NON_IMPORTANT);
graphiteEndpoint.returnMetricForTopic("group", "topic", 100, 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription1", 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription2", 50);
- graphiteEndpoint.returnMetricForSubscription("group", "topic", "ownedSubscription3", 50);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription1").withRate(50).withStatusRate(500, 11).build());
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription2").withRate(50).withStatusRate(500, 11).build());
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("group.topic.ownedSubscription3").withRate(50).withStatusRate(500, 11).build());
// then
assertThat(listUnhealthySubscriptionsDisrespectingSeverity("Team A")).contains(
- new UnhealthySubscription("ownedSubscription1", "group.topic", Severity.CRITICAL, ImmutableSet.of(slow(50, 100))),
- new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(slow(50, 100))),
- new UnhealthySubscription("ownedSubscription3", "group.topic", Severity.NON_IMPORTANT, ImmutableSet.of(slow(50, 100)))
+ new UnhealthySubscription("ownedSubscription1", "group.topic", Severity.CRITICAL, ImmutableSet.of(malfunctioning(11))),
+ new UnhealthySubscription("ownedSubscription2", "group.topic", Severity.IMPORTANT, ImmutableSet.of(malfunctioning(11))),
+ new UnhealthySubscription("ownedSubscription3", "group.topic", Severity.NON_IMPORTANT, ImmutableSet.of(malfunctioning(11)))
);
assertThat(listUnhealthySubscriptionsDisrespectingSeverityAsPlainText("Team A")).isEqualTo(
- "ownedSubscription1 - Consumption rate (50 RPS) is lower than topic production rate (100 RPS)\r\n" +
- "ownedSubscription2 - Consumption rate (50 RPS) is lower than topic production rate (100 RPS)\r\n" +
- "ownedSubscription3 - Consumption rate (50 RPS) is lower than topic production rate (100 RPS)"
+ "ownedSubscription1 - Consuming service returns a lot of 5xx codes, currently 11 5xx/s\r\n" +
+ "ownedSubscription2 - Consuming service returns a lot of 5xx codes, currently 11 5xx/s\r\n" +
+ "ownedSubscription3 - Consuming service returns a lot of 5xx codes, currently 11 5xx/s"
);
}
diff --git a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
index f8d7694cd3..fa6b79b41d 100644
--- a/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
+++ b/integration/src/integration/java/pl/allegro/tech/hermes/integration/management/SubscriptionManagementTest.java
@@ -4,13 +4,13 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import pl.allegro.tech.hermes.api.ContentType;
+import pl.allegro.tech.hermes.api.DeliveryType;
import pl.allegro.tech.hermes.api.EndpointAddress;
import pl.allegro.tech.hermes.api.PatchData;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionHealth;
import pl.allegro.tech.hermes.api.SubscriptionPolicy;
import pl.allegro.tech.hermes.api.Topic;
-import pl.allegro.tech.hermes.api.DeliveryType;
import pl.allegro.tech.hermes.api.TrackingMode;
import pl.allegro.tech.hermes.client.HermesClient;
import pl.allegro.tech.hermes.client.jersey.JerseyHermesSender;
@@ -37,8 +37,9 @@
import static javax.ws.rs.client.ClientBuilder.newClient;
import static pl.allegro.tech.hermes.api.PatchData.patchData;
import static pl.allegro.tech.hermes.api.SubscriptionHealth.Status.UNHEALTHY;
-import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.slow;
+import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.malfunctioning;
import static pl.allegro.tech.hermes.client.HermesClientBuilder.hermesClient;
+import static pl.allegro.tech.hermes.integration.helper.GraphiteEndpoint.subscriptionMetricsStub;
import static pl.allegro.tech.hermes.integration.test.HermesAssertions.assertThat;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic;
@@ -273,7 +274,7 @@ public void shouldReturnHealthyStatusForAHealthySubscription() {
Topic topic = operations.buildTopic(groupName, topicName);
operations.createSubscription(topic, subscriptionName, HTTP_ENDPOINT_URL);
graphiteEndpoint.returnMetricForTopic(groupName, topicName, 100, 100);
- graphiteEndpoint.returnMetricForSubscription(groupName, topicName, subscriptionName, 100);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("healthHealthy.topic.subscription").withRate(100).build());
// when
SubscriptionHealth subscriptionHealth = management.subscription().getHealth(topic.getQualifiedName(), subscriptionName);
@@ -283,7 +284,7 @@ public void shouldReturnHealthyStatusForAHealthySubscription() {
}
@Test
- public void shouldReturnUnhealthyStatusWithAProblemForASlowSubscription() {
+ public void shouldReturnUnhealthyStatusWithAProblemForMalfunctioningSubscription() {
// given
String groupName = "healthUnhealthy";
String topicName = "topic";
@@ -293,14 +294,14 @@ public void shouldReturnUnhealthyStatusWithAProblemForASlowSubscription() {
Topic topic = operations.buildTopic(groupName, topicName);
operations.createSubscription(topic, subscriptionName, HTTP_ENDPOINT_URL);
graphiteEndpoint.returnMetricForTopic(groupName, topicName, 100, 50);
- graphiteEndpoint.returnMetricForSubscription(groupName, topicName, subscriptionName, 50);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("healthUnhealthy.topic.subscription").withRate(50).withStatusRate(500, 11).build());
// when
SubscriptionHealth subscriptionHealth = management.subscription().getHealth(topic.getQualifiedName(), subscriptionName);
// then
assertThat(subscriptionHealth.getStatus()).isEqualTo(UNHEALTHY);
- assertThat(subscriptionHealth.getProblems()).containsOnly(slow(50, 100));
+ assertThat(subscriptionHealth.getProblems()).containsOnly(malfunctioning(11));
}
@Test
@@ -314,7 +315,7 @@ public void shouldReturnNoDataStatusWhenGraphiteRespondsWithAnError() {
Topic topic = operations.buildTopic(groupName, topicName);
operations.createSubscription(topic, subscriptionName, HTTP_ENDPOINT_URL);
graphiteEndpoint.returnServerErrorForAllTopics();
- graphiteEndpoint.returnMetricForSubscription(groupName, topicName, subscriptionName, 100);
+ graphiteEndpoint.returnMetric(subscriptionMetricsStub("healthUnhealthy.topic.subscription").withRate(100).build());
// when
SubscriptionHealth subscriptionHealth = management.subscription().getHealth(topic.getQualifiedName(), subscriptionName);