Skip to content

Commit

Permalink
Improved subscription health problem indicators (#972)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko committed Jan 24, 2019
1 parent 73d0013 commit 0533fbe
Show file tree
Hide file tree
Showing 25 changed files with 486 additions and 338 deletions.
Expand Up @@ -9,14 +9,13 @@
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.LAGGING;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.MALFUNCTIONING;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.RECEIVING_MALFORMED_MESSAGES;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.SLOW;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.TIMING_OUT;
import static pl.allegro.tech.hermes.api.SubscriptionHealthProblem.ProblemCode.UNREACHABLE;

public class SubscriptionHealthProblem {

public enum ProblemCode {
LAGGING, SLOW, UNREACHABLE, TIMING_OUT, MALFUNCTIONING, RECEIVING_MALFORMED_MESSAGES
LAGGING, UNREACHABLE, TIMING_OUT, MALFUNCTIONING, RECEIVING_MALFORMED_MESSAGES
}

private final ProblemCode code;
Expand Down Expand Up @@ -50,13 +49,6 @@ public static SubscriptionHealthProblem receivingMalformedMessages(double code4x
);
}

public static SubscriptionHealthProblem slow(double subscriptionRate, double topicRate) {
return new SubscriptionHealthProblem(
SLOW,
format("Consumption rate (%.0f RPS) is lower than topic production rate (%.0f RPS)", subscriptionRate, topicRate)
);
}

public static SubscriptionHealthProblem timingOut(double timeoutsRate) {
return new SubscriptionHealthProblem(
TIMING_OUT,
Expand Down
Expand Up @@ -16,17 +16,24 @@ public class SubscriptionMetrics {
private Subscription.State state;
private String rate;
private String throughput;
private String batchRate;

private SubscriptionMetrics() {
}

@JsonCreator
public SubscriptionMetrics(@JsonProperty("delivered") long delivered, @JsonProperty("discarded") long discarded,
@JsonProperty("inflight") long inflight, @JsonProperty("timeouts") String timeouts,
@JsonProperty("otherErrors") String otherErrors, @JsonProperty("codes2xx") String codes2xx,
@JsonProperty("codes4xx") String codes4xx, @JsonProperty("codes5xx") String codes5xx,
@JsonProperty("Subscription") Subscription.State state, @JsonProperty("rate") String rate,
@JsonProperty("throughput") String throughput) {
public SubscriptionMetrics(@JsonProperty("delivered") long delivered,
@JsonProperty("discarded") long discarded,
@JsonProperty("inflight") long inflight,
@JsonProperty("timeouts") String timeouts,
@JsonProperty("otherErrors") String otherErrors,
@JsonProperty("codes2xx") String codes2xx,
@JsonProperty("codes4xx") String codes4xx,
@JsonProperty("codes5xx") String codes5xx,
@JsonProperty("Subscription") Subscription.State state,
@JsonProperty("rate") String rate,
@JsonProperty("throughput") String throughput,
@JsonProperty("batchRate") String batchRate) {
this.delivered = delivered;
this.discarded = discarded;
this.inflight = inflight;
Expand All @@ -38,6 +45,7 @@ public SubscriptionMetrics(@JsonProperty("delivered") long delivered, @JsonPrope
this.state = state;
this.rate = rate;
this.throughput = throughput;
this.batchRate = batchRate;
}

public long getDelivered() {
Expand Down Expand Up @@ -88,6 +96,10 @@ public String getThroughput() {
return throughput;
}

public String getBatchRate() {
return batchRate;
}

public static class Builder {
private SubscriptionMetrics subscriptionMetrics;

Expand Down Expand Up @@ -155,6 +167,11 @@ public Builder withThroughput(String throughput) {
return this;
}

public Builder withBatchRate(String batchRate) {
subscriptionMetrics.batchRate = batchRate;
return this;
}

public static Builder subscriptionMetrics() {
return new Builder();
}
Expand Down
Expand Up @@ -16,7 +16,6 @@ health.factory('SubscriptionHealth', ['DiscoveryService', '$resource',
status: health.status,
problems: {
lagging: problemOccurs('LAGGING'),
slow: problemOccurs('SLOW'),
malfunctioning: problemOccurs('MALFUNCTIONING'),
receivingMalformedMessages: problemOccurs('RECEIVING_MALFORMED_MESSAGES'),
timingOut: problemOccurs('TIMING_OUT'),
Expand Down
4 changes: 0 additions & 4 deletions hermes-console/static/partials/subscription.html
Expand Up @@ -40,10 +40,6 @@ <h3 class="panel-title">Subscription health problems</h3>
<b>Subscription lag is growing</b>. Examine output rate and service response codes, looks like it is
not consuming at full speed.
</p>
<p ng-show="health.problems.slow">
<b>Consumption rate is lower than topic production rate</b>. Examine output rate and service response codes.
If everything is well, take a look at maximum rate, maybe it is too low?
</p>
<p ng-show="health.problems.malfunctioning">
<b>Consuming service returns a lot of 5xx codes</b>. Looks like it might be malfunctioning or doesn't know
how to handle messages. Take a look at "Last undelivered message" for more information.
Expand Down
Expand Up @@ -5,46 +5,58 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthProblemIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.DisabledIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.LaggingIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.MalfunctioningIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.ReceivingMalformedMessagesIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.SlowIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.TimingOutIndicator;
import pl.allegro.tech.hermes.management.domain.subscription.health.problem.UnreachableIndicator;

@Configuration
@EnableConfigurationProperties({HealthProperties.class})
public class HealthConfiguration {
private static final DisabledIndicator DISABLED_INDICATOR = new DisabledIndicator();

@Autowired
private HealthProperties healthProperties;

@Bean
public SubscriptionHealthProblemIndicator laggingIndicator() {
return new LaggingIndicator(healthProperties.getMaxLagInSeconds());
}

@Bean
public SubscriptionHealthProblemIndicator slowIndicator() {
return new SlowIndicator(healthProperties.getMinSubscriptionToTopicSpeedRatio());
if (healthProperties.isLaggingIndicatorEnabled()) {
return new LaggingIndicator(healthProperties.getMaxLagInSeconds());
}
return DISABLED_INDICATOR;
}

@Bean
public SubscriptionHealthProblemIndicator unreachableIndicator() {
return new UnreachableIndicator(healthProperties.getMaxOtherErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
if (healthProperties.isUnreachableIndicatorEnabled()) {
return new UnreachableIndicator(healthProperties.getMaxOtherErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
}
return DISABLED_INDICATOR;
}

@Bean
public SubscriptionHealthProblemIndicator timingOutIndicator() {
return new TimingOutIndicator(healthProperties.getMaxTimeoutsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
if (healthProperties.isTimingOutIndicatorEnabled()) {
return new TimingOutIndicator(healthProperties.getMaxTimeoutsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
}
return DISABLED_INDICATOR;
}

@Bean
public SubscriptionHealthProblemIndicator malfunctioningIndicator() {
return new MalfunctioningIndicator(healthProperties.getMax5xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
if (healthProperties.isMalfunctioningIndicatorEnabled()) {
return new MalfunctioningIndicator(healthProperties.getMax5xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
}
return DISABLED_INDICATOR;
}

@Bean
public SubscriptionHealthProblemIndicator receivingMalformedMessagesIndicator() {
return new ReceivingMalformedMessagesIndicator(healthProperties.getMax4xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
if (healthProperties.isReceivingMalformedMessagesIndicatorEnabled()) {
return new ReceivingMalformedMessagesIndicator(healthProperties.getMax4xxErrorsRatio(), healthProperties.getMinSubscriptionRateForReliableMetrics());
}
return DISABLED_INDICATOR;
}
}
Expand Up @@ -5,12 +5,16 @@
@ConfigurationProperties(prefix = "health")
public class HealthProperties {
private int maxLagInSeconds = 600;
private double minSubscriptionToTopicSpeedRatio = 0.8;
private double maxOtherErrorsRatio = 0.5;
private double maxTimeoutsRatio = 0.1;
private double max5xxErrorsRatio = 0.1;
private double max4xxErrorsRatio = 0.1;
private double minSubscriptionRateForReliableMetrics = 2.0;
private boolean laggingIndicatorEnabled = true;
private boolean malfunctioningIndicatorEnabled = true;
private boolean receivingMalformedMessagesIndicatorEnabled = true;
private boolean timingOutIndicatorEnabled = true;
private boolean unreachableIndicatorEnabled = true;

public int getMaxLagInSeconds() {
return maxLagInSeconds;
Expand All @@ -20,14 +24,6 @@ public void setMaxLagInSeconds(int maxLagInSeconds) {
this.maxLagInSeconds = maxLagInSeconds;
}

public double getMinSubscriptionToTopicSpeedRatio() {
return minSubscriptionToTopicSpeedRatio;
}

public void setMinSubscriptionToTopicSpeedRatio(double minSubscriptionToTopicSpeedRatio) {
this.minSubscriptionToTopicSpeedRatio = minSubscriptionToTopicSpeedRatio;
}

public double getMaxOtherErrorsRatio() {
return maxOtherErrorsRatio;
}
Expand Down Expand Up @@ -67,4 +63,44 @@ public double getMinSubscriptionRateForReliableMetrics() {
public void setMinSubscriptionRateForReliableMetrics(double minSubscriptionRateForReliableMetrics) {
this.minSubscriptionRateForReliableMetrics = minSubscriptionRateForReliableMetrics;
}

public boolean isLaggingIndicatorEnabled() {
return laggingIndicatorEnabled;
}

public void setLaggingIndicatorEnabled(boolean laggingIndicatorEnabled) {
this.laggingIndicatorEnabled = laggingIndicatorEnabled;
}

public boolean isMalfunctioningIndicatorEnabled() {
return malfunctioningIndicatorEnabled;
}

public void setMalfunctioningIndicatorEnabled(boolean malfunctioningIndicatorEnabled) {
this.malfunctioningIndicatorEnabled = malfunctioningIndicatorEnabled;
}

public boolean isReceivingMalformedMessagesIndicatorEnabled() {
return receivingMalformedMessagesIndicatorEnabled;
}

public void setReceivingMalformedMessagesIndicatorEnabled(boolean receivingMalformedMessagesIndicatorEnabled) {
this.receivingMalformedMessagesIndicatorEnabled = receivingMalformedMessagesIndicatorEnabled;
}

public boolean isTimingOutIndicatorEnabled() {
return timingOutIndicatorEnabled;
}

public void setTimingOutIndicatorEnabled(boolean timingOutIndicatorEnabled) {
this.timingOutIndicatorEnabled = timingOutIndicatorEnabled;
}

public boolean isUnreachableIndicatorEnabled() {
return unreachableIndicatorEnabled;
}

public void setUnreachableIndicatorEnabled(boolean unreachableIndicatorEnabled) {
this.unreachableIndicatorEnabled = unreachableIndicatorEnabled;
}
}
Expand Up @@ -17,7 +17,6 @@
@Component
public class SubscriptionHealthChecker {
private final Set<SubscriptionHealthProblemIndicator> problemIndicators;
private final SubscriptionHealthContextCreator contextCreator = new SubscriptionHealthContextCreator();

@Autowired
public SubscriptionHealthChecker(Set<SubscriptionHealthProblemIndicator> problemIndicators) {
Expand All @@ -38,7 +37,7 @@ private boolean isSuspended(Subscription subscription) {

private SubscriptionHealth getActiveSubscriptionHealth(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
try {
SubscriptionHealthContext healthContext = contextCreator.createContext(subscription, topicMetrics, subscriptionMetrics);
SubscriptionHealthContext healthContext = new SubscriptionHealthContext(subscription, topicMetrics, subscriptionMetrics);
Set<SubscriptionHealthProblem> healthProblems = getHealthProblems(healthContext);
return SubscriptionHealth.of(healthProblems);
} catch (NumberFormatException e) {
Expand Down
@@ -1,27 +1,70 @@
package pl.allegro.tech.hermes.management.domain.subscription.health;

import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionMetrics;
import pl.allegro.tech.hermes.api.TopicMetrics;

import static java.lang.Double.parseDouble;

public final class SubscriptionHealthContext {
private final Subscription subscription;
private final TopicMetrics topicMetrics;
private final SubscriptionMetrics subscriptionMetrics;
private final double topicRate;
private final double subscriptionRate;
private final double timeoutsRate;
private final double otherErrorsRate;
private final double code4xxErrorsRate;
private final double code5xxErrorsRate;
private final double batchRate;
private final long lag;

public SubscriptionHealthContext(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
SubscriptionHealthContext(Subscription subscription, TopicMetrics topicMetrics, SubscriptionMetrics subscriptionMetrics) {
this.subscription = subscription;
this.topicMetrics = topicMetrics;
this.subscriptionMetrics = subscriptionMetrics;
this.topicRate = parseDouble(topicMetrics.getRate());
this.subscriptionRate = parseDouble(subscriptionMetrics.getRate());
this.timeoutsRate = parseDouble(subscriptionMetrics.getTimeouts());
this.otherErrorsRate = parseDouble(subscriptionMetrics.getOtherErrors());
this.code4xxErrorsRate = parseDouble(subscriptionMetrics.getCodes4xx());
this.code5xxErrorsRate = parseDouble(subscriptionMetrics.getCodes5xx());
this.batchRate = parseDouble(subscriptionMetrics.getBatchRate());
this.lag = subscriptionMetrics.getLag();
}

public boolean subscriptionHasRetryOnError() {
if (subscription.isBatchSubscription()) {
return subscription.getBatchSubscriptionPolicy().isRetryClientErrors();
} else {
return subscription.getSerialSubscriptionPolicy().isRetryClientErrors();
}
}

public double getSubscriptionRateRespectingDeliveryType() {
if (subscription.isBatchSubscription()) {
return batchRate;
}
return subscriptionRate;
}

public double getOtherErrorsRate() {
return otherErrorsRate;
}

public double getTimeoutsRate() {
return timeoutsRate;
}

public double getCode4xxErrorsRate() {
return code4xxErrorsRate;
}

public Subscription getSubscription() {
return subscription;
public double getCode5xxErrorsRate() {
return code5xxErrorsRate;
}

public TopicMetrics getTopicMetrics() {
return topicMetrics;
public long getLag() {
return lag;
}

public SubscriptionMetrics getSubscriptionMetrics() {
return subscriptionMetrics;
public double getTopicRate() {
return topicRate;
}
}

This file was deleted.

0 comments on commit 0533fbe

Please sign in to comment.