Skip to content

Commit

Permalink
Formalize usage stats for analytics (#52966)
Browse files Browse the repository at this point in the history
This moves the usage statistics gathering from the `AnalyticsPlugin`
into an `AnalyicsUsage`, removing the static state. It also checks the
license level when parsing all analytics aggregations. This is how we
were checking them before but we did it in an easy to forget way. This
way is slightly simpler, I think.
  • Loading branch information
nik9000 committed Mar 3, 2020
1 parent 4b33908 commit 999f934
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,25 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.analytics.action.AnalyticsInfoTransportAction;
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
Expand All @@ -28,24 +39,22 @@
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.TopMetricsAggregatorFactory;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.Collections.singletonList;

public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugin, MapperPlugin {

// TODO this should probably become more structured
public static AtomicLong cumulativeCardUsage = new AtomicLong(0);
public static AtomicLong topMetricsUsage = new AtomicLong(0);
private final AnalyticsUsage usage = new AnalyticsUsage();

public AnalyticsPlugin() { }

Expand All @@ -58,7 +67,8 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
CumulativeCardinalityPipelineAggregationBuilder.PARSER)
usage.track(AnalyticsUsage.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
}

Expand All @@ -68,16 +78,17 @@ public List<AggregationSpec> getAggregations() {
new AggregationSpec(
StringStatsAggregationBuilder.NAME,
StringStatsAggregationBuilder::new,
StringStatsAggregationBuilder.PARSER).addResultReader(InternalStringStats::new),
usage.track(AnalyticsUsage.Item.STRING_STATS, checkLicense(StringStatsAggregationBuilder.PARSER)))
.addResultReader(InternalStringStats::new),
new AggregationSpec(
BoxplotAggregationBuilder.NAME,
BoxplotAggregationBuilder::new,
BoxplotAggregationBuilder.PARSER)
usage.track(AnalyticsUsage.Item.BOXPLOT, checkLicense(BoxplotAggregationBuilder.PARSER)))
.addResultReader(InternalBoxplot::new),
new AggregationSpec(
TopMetricsAggregationBuilder.NAME,
TopMetricsAggregationBuilder::new,
track(TopMetricsAggregationBuilder.PARSER, topMetricsUsage))
usage.track(AnalyticsUsage.Item.TOP_METRICS, checkLicense(TopMetricsAggregationBuilder.PARSER)))
.addResultReader(InternalTopMetrics::new)
);
}
Expand All @@ -100,15 +111,20 @@ public Map<String, Mapper.TypeParser> getMappers() {
return Collections.singletonMap(HistogramFieldMapper.CONTENT_TYPE, new HistogramFieldMapper.TypeParser());
}

/**
* Track successful parsing.
*/
private static <T> ContextParser<String, T> track(ContextParser<String, T> realParser, AtomicLong usage) {
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver) {
return singletonList(new AnalyticsUsage());
}

private static <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser) {
return (parser, name) -> {
T value = realParser.parse(parser, name);
// Intentionally doesn't count unless the parser returns cleanly.
usage.addAndGet(1);
return value;
if (getLicenseState().isDataScienceAllowed() == false) {
throw LicenseUtils.newComplianceException(XPackField.ANALYTICS);
}
return realParser.parse(parser, name);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;

import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks usage of the Analytics aggregations.
*/
public class AnalyticsUsage {
/**
* Items to track.
*/
public enum Item {
BOXPLOT,
CUMULATIVE_CARDINALITY,
STRING_STATS,
TOP_METRICS;
}

private final Map<Item, AtomicLong> trackers = new EnumMap<>(Item.class);

public AnalyticsUsage() {
for (Item item: Item.values()) {
trackers.put(item, new AtomicLong(0));
}
}

/**
* Track successful parsing.
*/
public <C, T> ContextParser<C, T> track(Item item, ContextParser<C, T> realParser) {
AtomicLong usage = trackers.get(item);
return (parser, context) -> {
T value = realParser.parse(parser, context);
// Intentionally doesn't count unless the parser returns cleanly.
usage.incrementAndGet();
return value;
};
}

public AnalyticsStatsAction.NodeResponse stats(DiscoveryNode node) {
return new AnalyticsStatsAction.NodeResponse(node,
trackers.get(Item.BOXPLOT).get(),
trackers.get(Item.CUMULATIVE_CARDINALITY).get(),
trackers.get(Item.STRING_STATS).get(),
trackers.get(Item.TOP_METRICS).get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.analytics.AnalyticsUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;

import java.io.IOException;
import java.util.List;

public class TransportAnalyticsStatsAction extends TransportNodesAction<AnalyticsStatsAction.Request, AnalyticsStatsAction.Response,
AnalyticsStatsAction.NodeRequest, AnalyticsStatsAction.NodeResponse> {

AnalyticsStatsAction.NodeRequest, AnalyticsStatsAction.NodeResponse> {
private final AnalyticsUsage usage;

@Inject
public TransportAnalyticsStatsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters) {
ThreadPool threadPool, ActionFilters actionFilters, AnalyticsUsage usage) {
super(AnalyticsStatsAction.NAME, threadPool, clusterService, transportService, actionFilters,
AnalyticsStatsAction.Request::new, AnalyticsStatsAction.NodeRequest::new, ThreadPool.Names.MANAGEMENT,
AnalyticsStatsAction.NodeResponse.class);
this.usage = usage;
}

@Override
Expand All @@ -51,10 +52,7 @@ protected AnalyticsStatsAction.NodeResponse newNodeResponse(StreamInput in) thro

@Override
protected AnalyticsStatsAction.NodeResponse nodeOperation(AnalyticsStatsAction.NodeRequest request, Task task) {
AnalyticsStatsAction.NodeResponse statsResponse = new AnalyticsStatsAction.NodeResponse(clusterService.localNode());
statsResponse.setCumulativeCardinalityUsage(AnalyticsPlugin.cumulativeCardUsage.get());
statsResponse.setTopMetricsUsage(AnalyticsPlugin.topMetricsUsage.get());
return statsResponse;
return usage.stats(clusterService.localNode());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.elasticsearch.xpack.core.XPackField;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -35,14 +32,6 @@ public class CumulativeCardinalityPipelineAggregationBuilder

public static final ConstructingObjectParser<CumulativeCardinalityPipelineAggregationBuilder, String> PARSER =
new ConstructingObjectParser<>(NAME, false, (args, name) -> {
if (AnalyticsPlugin.getLicenseState().isDataScienceAllowed() == false) {
throw LicenseUtils.newComplianceException(XPackField.ANALYTICS);
}

// Increment usage here since it is a good boundary between internal and external, and should correlate 1:1 with
// usage and not internal instantiations
AnalyticsPlugin.cumulativeCardUsage.incrementAndGet();

return new CumulativeCardinalityPipelineAggregationBuilder(name, (String) args[0]);
});
static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,81 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.analytics.AnalyticsUsage;
import org.elasticsearch.xpack.core.analytics.action.AnalyticsStatsAction;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TransportAnalyticsStatsActionTests extends ESTestCase {

private TransportAnalyticsStatsAction action;

@Before
public void setupTransportAction() {
public TransportAnalyticsStatsAction action(AnalyticsUsage usage) {
TransportService transportService = mock(TransportService.class);
ThreadPool threadPool = mock(ThreadPool.class);

ClusterService clusterService = mock(ClusterService.class);
DiscoveryNode discoveryNode = new DiscoveryNode("nodeId", buildNewFakeTransportAddress(), Version.CURRENT);
when(clusterService.localNode()).thenReturn(discoveryNode);

ClusterName clusterName = new ClusterName("cluster_name");
when(clusterService.getClusterName()).thenReturn(clusterName);


ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(clusterState);

return new TransportAnalyticsStatsAction(transportService, clusterService, threadPool,
new ActionFilters(Collections.emptySet()), usage);
}

action = new TransportAnalyticsStatsAction(transportService, clusterService, threadPool, new
ActionFilters(Collections.emptySet()));
public void test() throws IOException {
AnalyticsUsage.Item item = randomFrom(AnalyticsUsage.Item.values());
AnalyticsUsage realUsage = new AnalyticsUsage();
AnalyticsUsage emptyUsage = new AnalyticsUsage();
ContextParser<Void, Void> parser = realUsage.track(item, (p, c) -> c);
ObjectPath unused = run(realUsage, emptyUsage);
assertThat(unused.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
assertThat(unused.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
int count = between(1, 10000);
for (int i = 0; i < count; i++) {
assertNull(parser.parse(null, null));
}
ObjectPath used = run(realUsage, emptyUsage);
assertThat(used.evaluate("stats.0." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(count));
assertThat(used.evaluate("stats.1." + item.name().toLowerCase(Locale.ROOT) + "_usage"), equalTo(0));
}

public void testCumulativeCardStats() throws Exception {
private ObjectPath run(AnalyticsUsage... nodeUsages) throws IOException {
AnalyticsStatsAction.Request request = new AnalyticsStatsAction.Request();
AnalyticsStatsAction.NodeResponse nodeResponse1 = action.nodeOperation(new AnalyticsStatsAction.NodeRequest(request), null);
AnalyticsStatsAction.NodeResponse nodeResponse2 = action.nodeOperation(new AnalyticsStatsAction.NodeRequest(request), null);

AnalyticsStatsAction.Response response = action.newResponse(request,
Arrays.asList(nodeResponse1, nodeResponse2), Collections.emptyList());
List<AnalyticsStatsAction.NodeResponse> nodeResponses = Arrays.stream(nodeUsages)
.map(usage -> action(usage).nodeOperation(new AnalyticsStatsAction.NodeRequest(request), null))
.collect(toList());
AnalyticsStatsAction.Response response = new AnalyticsStatsAction.Response(
new ClusterName("cluster_name"), nodeResponses, emptyList());

try (XContentBuilder builder = jsonBuilder()) {
builder.startObject();
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();

ObjectPath objectPath = ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
assertThat(objectPath.evaluate("stats.0.cumulative_cardinality_usage"), equalTo(0));
assertThat(objectPath.evaluate("stats.1.cumulative_cardinality_usage"), equalTo(0));
return ObjectPath.createFromXContent(JsonXContent.jsonXContent, BytesReference.bytes(builder));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@

package org.elasticsearch.xpack.analytics.boxplot;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.analytics.AnalyticsPlugin;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.hasSize;

public class BoxplotAggregationBuilderTests extends AbstractSerializingTestCase<BoxplotAggregationBuilder> {
Expand All @@ -31,8 +30,10 @@ public void setupName() {

@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.singletonList(new AnalyticsPlugin()));
return new NamedXContentRegistry(searchModule.getNamedXContents());
return new NamedXContentRegistry(singletonList(new NamedXContentRegistry.Entry(
BaseAggregationBuilder.class,
new ParseField(BoxplotAggregationBuilder.NAME),
(p, n) -> BoxplotAggregationBuilder.PARSER.apply(p, (String) n))));
}

@Override
Expand Down

0 comments on commit 999f934

Please sign in to comment.