Skip to content

Commit

Permalink
initial version of integration with lwc api
Browse files Browse the repository at this point in the history
Still needs a lot of work, but is a minimal base version
that will send data at the same frequency as publishing
to Atlas.

For more details on the server side see:
Netflix/atlas#459.

Future iteration needed for refactoring to honor frequency
in the subscription, performance tuning, etc.
  • Loading branch information
brharrington committed Dec 15, 2016
1 parent a3eb6c2 commit 656334e
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ public void testRecordRunnableException() throws Exception {
Assert.assertEquals(t.totalTime(), 400L);
}

private int square(int v) {
return v * v;
}

@Test
public void testCallable() throws Exception {
Timer t = new DefaultTimer(clock, NoopId.INSTANCE);
int v2 = t.record(() -> square(42));
}

@Test
public void testMeasure() {
Timer t = new DefaultTimer(clock, new DefaultId("foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ default Duration step() {
return (v == null) ? Duration.ofMinutes(1) : Duration.parse(v);
}

/**
* Returns true if publishing to Atlas is enabled. Default is true.
*/
default boolean enabled() {
String v = get("atlas.enabled");
return (v == null) ? true : Boolean.valueOf(v);
}

/**
* Returns the number of threads to use with the scheduler. The default is
* 2 threads.
Expand All @@ -52,6 +60,38 @@ default String uri() {
return (v == null) ? "http://localhost:7101/api/v1/publish" : v;
}

/**
* Returns true if streaming to Atlas LWC is enabled. Default is false.
*/
default boolean lwcEnabled() {
String v = get("atlas.lwc.enabled");
return (v == null) ? false : Boolean.valueOf(v);
}

/** Returns the frequency for refreshing config settings from the LWC service. */
default Duration configRefreshFrequency() {
String v = get("atlas.configRefreshFrequency");
return (v == null) ? Duration.ofSeconds(10) : Duration.parse(v);
}

/**
* Returns the URI for the Atlas LWC endpoint to retrieve current subscriptions.
* The default is {@code http://localhost:7101/lwc/api/v1/expressions/local-dev}.
*/
default String configUri() {
String v = get("atlas.config-uri");
return (v == null) ? "http://localhost:7101/lwc/api/v1/expressions/local-dev" : v;
}

/**
* Returns the URI for the Atlas LWC endpoint to evaluate the data for a suscription.
* The default is {@code http://localhost:7101/lwc/api/v1/evaluate}.
*/
default String evalUri() {
String v = get("atlas.eval-uri");
return (v == null) ? "http://localhost:7101/lwc/api/v1/evaluate" : v;
}

/**
* Returns the connection timeout for requests to the backend. The default is
* 1 second.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.spectator.atlas;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
Expand Down Expand Up @@ -49,26 +50,45 @@ public final class AtlasRegistry extends AbstractRegistry {
private static final String CLOCK_SKEW_TIMER = "spectator.atlas.clockSkew";

private final Clock clock;

private final boolean enabled;
private final Duration step;
private final long stepMillis;
private final URI uri;

private final boolean lwcEnabled;
private final Duration configRefreshFrequency;
private final URI configUri;
private final URI evalUri;

private final int connectTimeout;
private final int readTimeout;
private final int batchSize;
private final int numThreads;
private final Map<String, String> commonTags;

private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;

private Scheduler scheduler;

private final ObjectMapper mapper;
private volatile List<Subscription> subscriptions;

/** Create a new instance. */
public AtlasRegistry(Clock clock, AtlasConfig config) {
super(new StepClock(clock, config.step().toMillis()), config);
this.clock = clock;

this.enabled = config.enabled();
this.step = config.step();
this.stepMillis = step.toMillis();
this.uri = URI.create(config.uri());

this.lwcEnabled = config.lwcEnabled();
this.configRefreshFrequency = config.configRefreshFrequency();
this.configUri = URI.create(config.configUri());
this.evalUri = URI.create(config.evalUri());

this.connectTimeout = (int) config.connectTimeout().toMillis();
this.readTimeout = (int) config.readTimeout().toMillis();
this.batchSize = config.batchSize();
Expand All @@ -77,23 +97,38 @@ public AtlasRegistry(Clock clock, AtlasConfig config) {

SimpleModule module = new SimpleModule()
.addSerializer(Measurement.class, new MeasurementSerializer());
this.mapper = new ObjectMapper(new SmileFactory()).registerModule(module);
this.jsonMapper = new ObjectMapper(new JsonFactory()).registerModule(module);
this.smileMapper = new ObjectMapper(new SmileFactory()).registerModule(module);
}

/**
* Start the scheduler to collect metrics data.
*/
public void start() {
if (scheduler == null) {
Scheduler.Options options = new Scheduler.Options()
.withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, step)
.withInitialDelay(Duration.ofMillis(getInitialDelay(stepMillis)))
.withStopOnFailure(false);

scheduler = new Scheduler(this, "spectator-reg-atlas", numThreads);
scheduler.schedule(options, this::collectData);
logger.info("started collecting metrics every {} reporting to {}", step, uri);
logger.info("common tags: {}", commonTags);
// Setup main collection for publishing to Atlas
if (enabled || lwcEnabled) {
Scheduler.Options options = new Scheduler.Options()
.withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, step)
.withInitialDelay(Duration.ofMillis(getInitialDelay(stepMillis)))
.withStopOnFailure(false);
scheduler = new Scheduler(this, "spectator-reg-atlas", numThreads);
scheduler.schedule(options, this::collectData);
logger.info("started collecting metrics every {} reporting to {}", step, uri);
logger.info("common tags: {}", commonTags);
} else {
logger.info("publishing is not enabled");
}

// Setup collection for subscriptions
if (lwcEnabled) {
Scheduler.Options options = new Scheduler.Options()
.withFrequency(Scheduler.Policy.FIXED_DELAY, configRefreshFrequency)
.withStopOnFailure(false);
scheduler.schedule(options, this::fetchSubscriptions);
} else {
logger.info("subscriptions are not enabled");
}
} else {
logger.warn("registry already started, ignoring duplicate request");
}
Expand Down Expand Up @@ -135,17 +170,77 @@ public void stop() {
}

private void collectData() {
try {
for (List<Measurement> batch : getBatches()) {
PublishPayload p = new PublishPayload(commonTags, batch);
HttpResponse res = HttpClient.DEFAULT.newRequest("spectator-reg-atlas", uri)
// Send data for any subscriptions
if (lwcEnabled) {
try {
handleSubscriptions();
} catch (Exception e) {
logger.warn("failed to handle subscriptions", e);
}
}

// Publish to Atlas
if (enabled) {
try {
for (List<Measurement> batch : getBatches()) {
PublishPayload p = new PublishPayload(commonTags, batch);
HttpResponse res = HttpClient.DEFAULT.newRequest("spectator-reg-atlas", uri)
.withMethod("POST")
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withContent("application/x-jackson-smile", smileMapper.writeValueAsBytes(p))
.send();
Instant date = res.dateHeader("Date");
recordClockSkew((date == null) ? 0L : date.toEpochMilli());
}
} catch (Exception e) {
logger.warn("failed to send metrics", e);
}
}
}

private void handleSubscriptions() {
List<Subscription> subs = subscriptions;
if (!subs.isEmpty()) {
List<TagsValuePair> ms = getMeasurements().stream()
.map(m -> TagsValuePair.from(commonTags, m))
.collect(Collectors.toList());
List<EvalPayload.Metric> metrics = new ArrayList<>();
for (Subscription s : subs) {
DataExpr expr = Parser.parseDataExpr(s.getExpression());
for (TagsValuePair pair : expr.eval(ms)) {
EvalPayload.Metric m = new EvalPayload.Metric(s.getId(), pair.tags(), pair.value());
metrics.add(m);
}
}
try {
String json = jsonMapper.writeValueAsString(new EvalPayload(clock().wallTime(), metrics));
HttpClient.DEFAULT.newRequest("spectator-lwc-eval", evalUri)
.withMethod("POST")
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.withContent("application/x-jackson-smile", mapper.writeValueAsBytes(p))
.send();
Instant date = res.dateHeader("Date");
recordClockSkew((date == null) ? 0L : date.toEpochMilli());
.withJsonContent(json)
.send()
.decompress();
} catch (Exception e) {
logger.warn("failed to send metrics for subscriptions", e);
}
}
}

private void fetchSubscriptions() {
try {
HttpResponse res = HttpClient.DEFAULT.newRequest("spectator-lwc-subs", configUri)
.withMethod("GET")
.withConnectTimeout(connectTimeout)
.withReadTimeout(readTimeout)
.send()
.decompress();
if (res.status() != 200) {
logger.warn("failed to update subscriptions, received status {}", res.status());
} else {
Subscriptions subs = jsonMapper.readValue(res.entity(), Subscriptions.class);
subscriptions = subs.validated();
}
} catch (Exception e) {
logger.warn("failed to send metrics", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2014-2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.atlas;

import java.util.List;
import java.util.Map;

/**
* Wraps a list of measurements with a set of common tags. The common tags are
* typically used for things like the application and instance id.
*/
class EvalPayload {

private final long timestamp;
private final List<Metric> metrics;

/** Create a new instance. */
EvalPayload(long timestamp, List<Metric> metrics) {
this.timestamp = timestamp;
this.metrics = metrics;
}

/** Return the timestamp for metrics in this payload. */
public long getTimestamp() {
return timestamp;
}

/** Return the metric values for the data for this payload. */
public List<Metric> getMetrics() {
return metrics;
}

/** Metric value. */
static class Metric {
private final String id;
private final Map<String, String> tags;
private final double value;

/** Create a new instance. */
Metric(String id, Map<String, String> tags, double value) {
this.id = id;
this.tags = tags;
this.value = value;
}

/** Id for the expression that this data corresponds with. */
public String getId() {
return id;
}

/** Tags for identifying the metric. */
public Map<String, String> getTags() {
return tags;
}

/** Value for the metric. */
public double getValue() {
return value;
}
}
}

0 comments on commit 656334e

Please sign in to comment.