Skip to content

Commit

Permalink
stateless: add validation metrics (#1112)
Browse files Browse the repository at this point in the history
When reporting data to aggregator check the response and
report validation metrics/logs. This makes it easier to
debug if data is getting rejected due to validation errors.
  • Loading branch information
brharrington committed Jan 30, 2024
1 parent 184f017 commit 937add9
Show file tree
Hide file tree
Showing 8 changed files with 407 additions and 5 deletions.
4 changes: 1 addition & 3 deletions spectator-reg-stateless/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
## Description

**DEPRECATED:** should not be used for any new development as Prana is deprecated.

Registry implementation for reporting data to a [Prana] sidecar.
Registry implementation for reporting data to an aggregator cluster.

[Prana]: https://github.com/Netflix/Prana

Expand Down
1 change: 1 addition & 0 deletions spectator-reg-stateless/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
api project(':spectator-api')
api project(':spectator-ext-ipc')
implementation 'com.fasterxml.jackson.core:jackson-core'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.netflix.spectator.impl.Scheduler;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.time.Duration;
Expand All @@ -48,6 +50,8 @@
*/
public final class StatelessRegistry extends AbstractRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(StatelessRegistry.class);

private final boolean enabled;
private final Duration frequency;
private final long meterTTL;
Expand All @@ -58,6 +62,7 @@ public final class StatelessRegistry extends AbstractRegistry {
private final Map<String, String> commonTags;

private final HttpClient client;
private final ValidationHelper validationHelper;

private Scheduler scheduler;

Expand All @@ -73,6 +78,7 @@ public StatelessRegistry(Clock clock, StatelessConfig config) {
this.batchSize = config.batchSize();
this.commonTags = config.commonTags();
this.client = HttpClient.create(this);
this.validationHelper = new ValidationHelper(LOGGER, this);
}

/**
Expand Down Expand Up @@ -121,10 +127,12 @@ private void collectData() {
.withReadTimeout(readTimeout)
.withContent("application/json", payload)
.compress(Deflater.BEST_SPEED)
.send();
.send()
.decompress();
if (res.status() != 200) {
logger.warn("failed to send metrics, status {}: {}", res.status(), res.entityAsString());
}
validationHelper.recordResults(batch.size(), res);
}
removeExpiredMeters();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.stateless;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.ipc.http.HttpResponse;
import org.slf4j.Logger;

final class ValidationHelper {

private final Logger logger;

private final Counter measurementsSent;
private final Counter measurementsDroppedInvalid;
private final Counter measurementsDroppedHttp;
private final Counter measurementsDroppedOther;

ValidationHelper(Logger logger, Registry registry) {
this.logger = logger;

Id baseId = registry.createId("spectator.measurements");
Id droppedId = baseId.withTag("id", "dropped");
this.measurementsSent = registry.counter(baseId.withTag("id", "sent"));
this.measurementsDroppedHttp = registry.counter(droppedId.withTag("error", "http-error"));
this.measurementsDroppedInvalid = registry.counter(droppedId.withTag("error", "validation"));
this.measurementsDroppedOther = registry.counter(droppedId.withTag("error", "other"));
}

void incrementDroppedHttp(int amount) {
measurementsDroppedHttp.increment(amount);
}

/**
* Report metrics and do basic logging of validation results to help the user with
* debugging.
*/
void recordResults(int numMeasurements, HttpResponse res) {
if (res.status() == 200) {
measurementsSent.increment(numMeasurements);
} else if (res.status() < 500) {
// For validation:
// 202 - partial failure
// 400 - all failed, could also be some other sort of failure
try {
ValidationResponse vres = ValidationResponse.fromJson(res.entity());
measurementsDroppedInvalid.increment(vres.getErrorCount());
measurementsSent.increment(numMeasurements - vres.getErrorCount());
logger.warn("{} measurement(s) dropped due to validation errors: {}",
vres.getErrorCount(), vres.errorSummary());
} catch (Exception e) {
// Likely some other 400 error. Log at trace level in case the cause is really needed.
logger.trace("failed to parse response", e);
logger.warn("{} measurement(s) dropped. Http status: {}", numMeasurements, res.status());
measurementsDroppedOther.increment(numMeasurements);
}
} else {
// Some sort of server side failure
measurementsDroppedHttp.increment(numMeasurements);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.stateless;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;

/**
* Validation failure response from Atlas aggregator endpoint.
*/
@SuppressWarnings("PMD.DataClass")
final class ValidationResponse {

private final String type;
private final int errorCount;
private final List<String> message; // Singular to match backend response

ValidationResponse(String type, int errorCount, List<String> message) {
this.type = type;
this.errorCount = errorCount;
this.message = message;
}

public String getType() {
return type;
}

public int getErrorCount() {
return errorCount;
}

public List<String> getMessage() {
return message;
}

String errorSummary() {
return (message == null || message.isEmpty())
? "unknown cause"
: String.join("; ", message);
}

private static final JsonFactory JSON_FACTORY = new JsonFactory();

static ValidationResponse fromJson(byte[] json) throws IOException {
try (JsonParser parser = JSON_FACTORY.createParser(json)) {
String type = null;
int errorCount = 0;
List<String> messages = new ArrayList<>();

checkToken(parser.nextToken(), EnumSet.of(JsonToken.START_OBJECT));
while (parser.nextToken() == JsonToken.FIELD_NAME) {
switch (parser.getText()) {
case "type":
type = parser.nextTextValue();
break;
case "errorCount":
errorCount = parser.nextIntValue(0);
break;
case "message":
JsonToken token = parser.nextToken();
checkToken(token, EnumSet.of(JsonToken.VALUE_NULL, JsonToken.START_ARRAY));
if (token == JsonToken.START_ARRAY) {
while (parser.nextToken() != JsonToken.END_ARRAY) {
messages.add(parser.getText());
}
}
break;
default:
parser.nextToken();
parser.skipChildren();
break;
}
}

return new ValidationResponse(type, errorCount, messages);
}
}

private static void checkToken(JsonToken actual, EnumSet<JsonToken> expected) throws IOException {
if (!expected.contains(actual)) {
throw new JsonParseException("expected " + expected + ", but found " + actual);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,4 @@ public void batchesExpiration() {
clock.setWallTime(Duration.ofMinutes(15).toMillis() + 1);
Assertions.assertEquals(0, registry.getBatches().size());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.stateless;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.DefaultRegistry;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.ipc.http.HttpResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;

public class ValidationHelperTest {

private static final Logger LOGGER = LoggerFactory.getLogger(ValidationHelperTest.class);
private static final ObjectMapper MAPPER = new ObjectMapper();

private void check(Registry r, long sent, long http, long invalid, long other) {
Id baseId = r.createId("spectator.measurements");
Id droppedId = baseId.withTag("id", "dropped");

Assertions.assertEquals(sent, r.counter(baseId.withTag("id", "sent")).count());
Assertions.assertEquals(http, r.counter(droppedId.withTag("error", "http-error")).count());
Assertions.assertEquals(invalid, r.counter(droppedId.withTag("error", "validation")).count());
Assertions.assertEquals(other, r.counter(droppedId.withTag("error", "other")).count());
}

private HttpResponse httpResponse(int status, ValidationResponse vres) throws IOException {
String json = MAPPER.writeValueAsString(vres);
return new HttpResponse(status, Collections.emptyMap(), json.getBytes(StandardCharsets.UTF_8));
}

@Test
public void incrementDroppedHttp() {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
helper.incrementDroppedHttp(42);
check(registry, 0, 42, 0, 0);
}

@Test
public void ok() {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
helper.recordResults(42, new HttpResponse(200, Collections.emptyMap()));
check(registry, 42, 0, 0, 0);
}

@Test
public void validationErrorPartial() throws IOException {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
ValidationResponse vres = new ValidationResponse(
"error",
3,
Collections.singletonList("foo")
);
helper.recordResults(42, httpResponse(202, vres));
check(registry, 39, 0, 3, 0);
}

@Test
public void validationErrorAll() throws IOException {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
ValidationResponse vres = new ValidationResponse(
"error",
42,
Collections.singletonList("foo")
);
helper.recordResults(42, httpResponse(400, vres));
check(registry, 0, 0, 42, 0);
}

@Test
public void validationErrorNullMessages() throws IOException {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
ValidationResponse vres = new ValidationResponse("error", 42, null);
helper.recordResults(42, httpResponse(400, vres));
check(registry, 0, 0, 42, 0);
}

@Test
public void validationErrorEmptyMessages() throws IOException {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
ValidationResponse vres = new ValidationResponse(
"error",
42,
Collections.emptyList()
);
helper.recordResults(42, httpResponse(400, vres));
check(registry, 0, 0, 42, 0);
}

@Test
public void validationErrorBadJson() throws IOException {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
HttpResponse res = new HttpResponse(400, Collections.emptyMap());
helper.recordResults(42, res);
check(registry, 0, 0, 0, 42);
}

@Test
public void serverError() {
Registry registry = new DefaultRegistry();
ValidationHelper helper = new ValidationHelper(LOGGER, registry);
helper.recordResults(42, new HttpResponse(500, Collections.emptyMap()));
check(registry, 0, 42, 0, 0);
}
}

0 comments on commit 937add9

Please sign in to comment.