Skip to content
Permalink
Browse files

Merge d0a3c28 into f7ef137

  • Loading branch information
mprimi committed May 11, 2019
2 parents f7ef137 + d0a3c28 commit cf78f0cb1eb15526e2deb4ead5c329b2f41d37d4
@@ -349,6 +349,12 @@ published within the local JVM and available on the Actuator `/metrics` endpoint
|ClusterCheckerTask
|-
|genie.tasks.clusterChecker.health.counter
|Counts the number of time an remote node's health check indicator returned a given status
|count
|ClusterCheckerTask
|host, healthIndicator, healthStatus
|genie.tasks.clusterChecker.lostJobs.rate
|Number of jobs marked as "lost" due to a consistent failure to contact the Genie node hosting them
|count
@@ -406,7 +406,7 @@ determined by Zookeeper or other mechanism via Spring
|genie.tasks.cluster-checker.healthIndicatorsToIgnore
|The health indicator groups from the actuator /health endpoint to ignore when determining if a node is lost or not as
a comma separated list
|memory,genieMemory,discoveryComposite
|genieMemory,mail,genieAgent
|no

|genie.tasks.cluster-checker.lostThreshold
@@ -43,5 +43,5 @@
private int port = 8080;
private long rate = 300_000L;
private int lostThreshold = 3;
private String healthIndicatorsToIgnore = "memory,genieMemory,discoveryComposite";
private String healthIndicatorsToIgnore = "genieMemory,mail,genieAgent";
}
@@ -29,8 +29,11 @@
import com.netflix.genie.web.services.JobPersistenceService;
import com.netflix.genie.web.services.JobSearchService;
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
import com.netflix.genie.web.util.MetricsConstants;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointProperties;
import org.springframework.boot.actuate.health.Status;
@@ -55,19 +58,23 @@
@Slf4j
public class ClusterCheckerTask extends LeadershipTask {
private static final String PROPERTY_STATUS = "status";
private static final String ERROR_COUNTS_GAUGE_METRIC_NAME = "genie.tasks.clusterChecker.errorCounts.gauge";
private static final String LOST_JOBS_RATE_METRIC_NAME = "genie.tasks.clusterChecker.lostJobs.rate";
private static final String FAILED_TO_UPDATE_RATE_METRIC_NAME = "genie.tasks.clusterChecker.unableToUpdateJob.rate";
private static final String REMOTE_NODE_HEALTH_METRIC_NAME = "genie.tasks.clusterChecker.health.counter";

private final String hostname;
private final ClusterCheckerProperties properties;
private final JobSearchService jobSearchService;
private final JobPersistenceService jobPersistenceService;
private final RestTemplate restTemplate;
private final MeterRegistry registry;
private final String scheme;
private final String healthEndpoint;
private final List<String> healthIndicatorsToIgnore;

private final Map<String, Integer> errorCounts = new HashMap<>();

// TODO: Add metrics
private final Counter lostJobsCounter;
private final Counter unableToUpdateJobCounter;

@@ -96,14 +103,15 @@ public ClusterCheckerTask(
this.jobSearchService = jobSearchService;
this.jobPersistenceService = jobPersistenceService;
this.restTemplate = restTemplate;
this.registry = registry;
this.scheme = this.properties.getScheme() + "://";
this.healthEndpoint = ":" + this.properties.getPort() + webEndpointProperties.getBasePath() + "/health";
this.healthIndicatorsToIgnore = Splitter.on(",").omitEmptyStrings()
.trimResults().splitToList(properties.getHealthIndicatorsToIgnore());
// Keep track of the number of nodes currently unreachable from the the master
registry.gauge("genie.tasks.clusterChecker.errorCounts.gauge", this.errorCounts, Map::size);
this.lostJobsCounter = registry.counter("genie.tasks.clusterChecker.lostJobs.rate");
this.unableToUpdateJobCounter = registry.counter("genie.tasks.clusterChecker.unableToUpdateJob.rate");
registry.gauge(ERROR_COUNTS_GAUGE_METRIC_NAME, this.errorCounts, Map::size);
this.lostJobsCounter = registry.counter(LOST_JOBS_RATE_METRIC_NAME);
this.unableToUpdateJobCounter = registry.counter(FAILED_TO_UPDATE_RATE_METRIC_NAME);
}

/**
@@ -193,13 +201,42 @@ private boolean isNodeHealthy(final String host) {
TypeFactory.defaultInstance().constructMapType(Map.class, String.class, Object.class)
);
for (Map.Entry<String, Object> responseEntry : responseMap.entrySet()) {
if (responseEntry.getValue() instanceof Map
&& !this.healthIndicatorsToIgnore.contains(responseEntry.getKey())
&& !Status.UP.getCode().equals(((Map) responseEntry.getValue()).get(PROPERTY_STATUS))) {
result = false;
break;
if (responseEntry.getValue() instanceof Map) {
final Map indicatorMap = (Map) responseEntry.getValue();

final String indicatorName = responseEntry.getKey();
final Object indicatorStatusOrNull = indicatorMap.get(PROPERTY_STATUS);

final Status indicatorStatus;

if (indicatorStatusOrNull instanceof Status) {
indicatorStatus = (Status) indicatorStatusOrNull;
} else if (indicatorStatusOrNull instanceof String) {
indicatorStatus = new Status((String) indicatorStatusOrNull);
} else {
indicatorStatus = Status.UNKNOWN;
}

//Increment counter tagged with target hostname and name of health indicator
final Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
tags.add(Tag.of(MetricsConstants.TagKeys.HOST, host));
tags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_INDICATOR, indicatorName));
tags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_STATUS, indicatorStatus.getCode()));
this.registry.counter(REMOTE_NODE_HEALTH_METRIC_NAME, tags).increment();

if (this.healthIndicatorsToIgnore.contains(indicatorName)) {
log.debug("Ignoring indicator: {}", indicatorName);
} else if (Status.UP.equals(indicatorStatus)) {
log.debug("Indicator {} is UP", indicatorName);
} else {
log.warn("Indicator {} is {} for host {}", indicatorName, indicatorStatus, host);
// Mark host as failed but keep iterating to publish metrics.
result = false;
}
}
}
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
log.error("Failed reading the error response when validating host {}", host, ex);
result = false;
@@ -87,6 +87,21 @@ private MetricsConstants() {
*/
public static final String USER = "user";

/**
* Key to tag a hostname.
*/
public static final String HOST = "host";

/**
* Key to tag a health indicator name.
*/
public static final String HEALTH_INDICATOR = "healthIndicator";

/**
* Key to tag a health indicator status.
*/
public static final String HEALTH_STATUS = "healthStatus";

/**
* Utility class private constructor.
*/
@@ -89,7 +89,7 @@ genie:
port: 8080
rate: 300000
lostThreshold: 3
healthIndicatorsToIgnore: memory,genieMemory,discoveryComposite
healthIndicatorsToIgnore: genieMemory,mail,genieAgent
database-cleanup:
enabled: true
expression: 0 0 0 * * *
@@ -27,6 +27,10 @@
import com.netflix.genie.web.services.JobPersistenceService;
import com.netflix.genie.web.services.JobSearchService;
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
import com.netflix.genie.web.util.MetricsConstants;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.hamcrest.Matchers;
import org.junit.Assert;
@@ -59,6 +63,7 @@
private RestTemplate restTemplate;
private String scheme;
private String healthEndpoint;
private MeterRegistry meterRegistry;

/**
* Setup for the tests.
@@ -72,6 +77,7 @@ public void setup() {
this.jobSearchService = Mockito.mock(JobSearchService.class);
this.jobPersistenceService = Mockito.mock(JobPersistenceService.class);
this.restTemplate = Mockito.mock(RestTemplate.class);
this.meterRegistry = new SimpleMeterRegistry();
final WebEndpointProperties serverProperties = Mockito.mock(WebEndpointProperties.class);
Mockito.when(serverProperties.getBasePath()).thenReturn("/actuator");
this.task = new ClusterCheckerTask(
@@ -81,7 +87,7 @@ public void setup() {
this.jobPersistenceService,
this.restTemplate,
serverProperties,
new SimpleMeterRegistry()
meterRegistry
);

this.scheme = properties.getScheme() + "://";
@@ -131,7 +137,9 @@ public void canRun() throws GenieException {
"",
(
"{\"status\":\"OUT_OF_SERVICE\", \"genie\": { \"status\": \"OUT_OF_SERVICE\"}, "
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"}}"
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"},"
+ "\"disk\": { \"status\": \"UP\"}"
+ "}"
).getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8
)
@@ -142,7 +150,9 @@ public void canRun() throws GenieException {
"",
(
"{\"status\":\"OUT_OF_SERVICE\", \"genie\": { \"status\": \"OUT_OF_SERVICE\"}, "
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"}}"
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"},"
+ "\"disk\": { \"status\": \"UP\"}"
+ "}"
).getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8
)
@@ -153,7 +163,9 @@ public void canRun() throws GenieException {
"",
(
"{\"status\":\"OUT_OF_SERVICE\", \"genie\": { \"status\": \"OUT_OF_SERVICE\"}, "
+ "\"db\": { \"status\": \"UP\"}}"
+ "\"db\": { \"status\": \"UP\"},"
+ "\"disk\": { \"status\": \"UP\"}"
+ "}"
).getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8
)
@@ -164,7 +176,9 @@ public void canRun() throws GenieException {
"",
(
"{\"status\":\"OUT_OF_SERVICE\", \"genie\": { \"status\": \"OUT_OF_SERVICE\"}, "
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"}}"
+ "\"db\": { \"status\": \"OUT_OF_SERVICE\"},"
+ "\"disk\": { \"status\": \"UP\"}"
+ "}"
).getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8
)
@@ -250,6 +264,33 @@ public void canRun() throws GenieException {
Mockito.eq(null),
Mockito.eq(null)
);

final Set<Tag> dbIndicatorTags = MetricsUtils.newSuccessTagsSet();
dbIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HOST, host2));
dbIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_INDICATOR, "db"));
dbIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_STATUS, "OUT_OF_SERVICE"));

Assert.assertThat(
this.meterRegistry.counter(
"genie.tasks.clusterChecker.health.counter",
dbIndicatorTags
).count(),
Matchers.is(3.0)
);

final Set<Tag> diskIndicatorTags = MetricsUtils.newSuccessTagsSet();
diskIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HOST, host2));
diskIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_INDICATOR, "disk"));
diskIndicatorTags.add(Tag.of(MetricsConstants.TagKeys.HEALTH_STATUS, "UP"));

Assert.assertThat(
this.meterRegistry.counter(
"genie.tasks.clusterChecker.health.counter",
diskIndicatorTags
).count(),
Matchers.is(4.0)
);

}

/**

0 comments on commit cf78f0c

Please sign in to comment.
You can’t perform that action at this time.