Skip to content

Commit

Permalink
Add Cumulative Cardinality agg (and Data Science plugin) (#43661)
Browse files Browse the repository at this point in the history
This adds a pipeline aggregation that calculates the cumulative
cardinality of a field.  It does this by iteratively merging in the
HLL sketch from consecutive buckets and emitting the cardinality up
to that point.

This is useful for things like finding the total "new" users that have
visited a website (as opposed to "repeat" visitors).

This is a Basic+ aggregation and adds a new Data Science plugin
to house it and future advanced analytics/data science aggregations.
  • Loading branch information
polyfractal committed Aug 26, 2019
1 parent f4703be commit 273c35f
Show file tree
Hide file tree
Showing 30 changed files with 1,653 additions and 4 deletions.
1 change: 1 addition & 0 deletions distribution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ task run(type: RunTask) {
setting 'xpack.monitoring.enabled', 'true'
setting 'xpack.sql.enabled', 'true'
setting 'xpack.rollup.enabled', 'true'
setting 'xpack.data-science.enabled', 'true'
keystoreSetting 'bootstrap.password', 'password'
}
}
Expand Down
36 changes: 36 additions & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,42 @@ buildRestTests.setups['sales'] = '''
{"index":{}}
{"date": "2015/03/01 00:00:00", "price": 175, "promoted": false, "rating": 2, "type": "t-shirt"}'''

// Used by cumulative cardinality aggregation docs
buildRestTests.setups['user_hits'] = '''
- do:
indices.create:
index: user_hits
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
properties:
user_id:
type: keyword
timestamp:
type: date
- do:
bulk:
index: user_hits
refresh: true
body: |
{"index":{}}
{"timestamp": "2019-01-01T13:00:00", "user_id": "1"}
{"index":{}}
{"timestamp": "2019-01-01T13:00:00", "user_id": "2"}
{"index":{}}
{"timestamp": "2019-01-02T13:00:00", "user_id": "1"}
{"index":{}}
{"timestamp": "2019-01-02T13:00:00", "user_id": "3"}
{"index":{}}
{"timestamp": "2019-01-03T13:00:00", "user_id": "1"}
{"index":{}}
{"timestamp": "2019-01-03T13:00:00", "user_id": "2"}
{"index":{}}
{"timestamp": "2019-01-03T13:00:00", "user_id": "4"}'''


// Dummy bank account data used by getting-started.asciidoc
buildRestTests.setups['bank'] = '''
- do:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-cumulative-cardinality-aggregation]]
=== Cumulative Cardinality Aggregation

A parent pipeline aggregation which calculates the Cumulative Cardinality in a parent histogram (or date_histogram)
aggregation. The specified metric must be a cardinality aggregation and the enclosing histogram
must have `min_doc_count` set to `0` (default for `histogram` aggregations).

The `cumulative_cardinality` agg is useful for finding "total new items", like the number of new visitors to your
website each day. A regular cardinality aggregation will tell you how many unique visitors came each day, but doesn't
differentiate between "new" or "repeat" visitors. The Cumulative Cardinality aggregation can be used to determine
how many of each day's unique visitors are "new".

==== Syntax

A `cumulative_cardinality` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"cumulative_cardinality": {
"buckets_path": "my_cardinality_agg"
}
}
--------------------------------------------------
// NOTCONSOLE

[[cumulative-cardinality-params]]
.`cumulative_cardinality` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the cardinality aggregation we wish to find the cumulative cardinality for (see <<buckets-path-syntax>> for more
details) |Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

The following snippet calculates the cumulative cardinality of the total daily `users`:

[source,js]
--------------------------------------------------
GET /user_hits/_search
{
"size": 0,
"aggs" : {
"users_per_day" : {
"date_histogram" : {
"field" : "timestamp",
"calendar_interval" : "day"
},
"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},
"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users" <1>
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:user_hits]

<1> `buckets_path` instructs this aggregation to use the output of the `distinct_users` aggregation for the cumulative cardinality

And the following may be the response:

[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"users_per_day": {
"buckets": [
{
"key_as_string": "2019-01-01T00:00:00.000Z",
"key": 1546300800000,
"doc_count": 2,
"distinct_users": {
"value": 2
},
"total_new_users": {
"value": 2
}
},
{
"key_as_string": "2019-01-02T00:00:00.000Z",
"key": 1546387200000,
"doc_count": 2,
"distinct_users": {
"value": 2
},
"total_new_users": {
"value": 3
}
},
{
"key_as_string": "2019-01-03T00:00:00.000Z",
"key": 1546473600000,
"doc_count": 3,
"distinct_users": {
"value": 3
},
"total_new_users": {
"value": 4
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]


Note how the second day, `2019-01-02`, has two distinct users but the `total_new_users` metric generated by the
cumulative pipeline agg only increments to three. This means that only one of the two users that day were
new, the other had already been seen in the previous day. This happens again on the third day, where only
one of three users is completely new.

==== Incremental cumulative cardinality

The `cumulative_cardinality` agg will show you the total, distinct count since the beginning of the time period
being queried. Sometimes, however, it is useful to see the "incremental" count. Meaning, how many new users
are added each day, rather than the total cumulative count.

This can be accomplished by adding a `derivative` aggregation to our query:

[source,js]
--------------------------------------------------
GET /user_hits/_search
{
"size": 0,
"aggs" : {
"users_per_day" : {
"date_histogram" : {
"field" : "timestamp",
"calendar_interval" : "day"
},
"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},
"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users"
}
},
"incremental_new_users": {
"derivative": {
"buckets_path": "total_new_users"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:user_hits]


And the following may be the response:

[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"users_per_day": {
"buckets": [
{
"key_as_string": "2019-01-01T00:00:00.000Z",
"key": 1546300800000,
"doc_count": 2,
"distinct_users": {
"value": 2
},
"total_new_users": {
"value": 2
}
},
{
"key_as_string": "2019-01-02T00:00:00.000Z",
"key": 1546387200000,
"doc_count": 2,
"distinct_users": {
"value": 2
},
"total_new_users": {
"value": 3
},
"incremental_new_users": {
"value": 1.0
}
},
{
"key_as_string": "2019-01-03T00:00:00.000Z",
"key": 1546473600000,
"doc_count": 3,
"distinct_users": {
"value": 3
},
"total_new_users": {
"value": 4
},
"incremental_new_users": {
"value": 1.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
4 changes: 4 additions & 0 deletions docs/reference/rest-api/info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ Example response:
"available" : true,
"enabled" : true
},
"data_science" : {
"available" : true,
"enabled" : true
},
"flattened" : {
"available" : true,
"enabled" : true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public long getValue() {
return counts == null ? 0 : counts.cardinality(0);
}

HyperLogLogPlusPlus getCounts() {
public HyperLogLogPlusPlus getCounts() {
return counts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public class XPackLicenseState {
"Creating and Starting rollup jobs will no longer be allowed.",
"Stopping/Deleting existing jobs, RollupCaps API and RollupSearch continue to function."
});
messages.put(XPackField.DATA_SCIENCE, new String[] {
"Aggregations provided by Data Science plugin are no longer usable."
});
EXPIRATION_MESSAGES = Collections.unmodifiableMap(messages);
}

Expand Down Expand Up @@ -744,6 +747,15 @@ public boolean isSpatialAllowed() {
return localStatus.active;
}

/**
* Datascience is always available as long as there is a valid license
*
* @return true if the license is active
*/
public synchronized boolean isDataScienceAllowed() {
return status.active;
}

public synchronized boolean isTrialLicense() {
return status.mode == OperationMode.TRIAL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import org.elasticsearch.xpack.core.datascience.DataScienceFeatureSetUsage;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.flattened.FlattenedFeatureSetUsage;
import org.elasticsearch.xpack.core.frozen.FrozenIndicesFeatureSetUsage;
Expand Down Expand Up @@ -509,7 +510,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
// Frozen indices
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.FROZEN_INDICES, FrozenIndicesFeatureSetUsage::new),
// Spatial
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new)
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.SPATIAL, SpatialFeatureSetUsage::new),
// data science
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_SCIENCE, DataScienceFeatureSetUsage::new)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public final class XPackField {
public static final String FROZEN_INDICES = "frozen_indices";
/** Name constant for spatial features. */
public static final String SPATIAL = "spatial";
/** Name constant for the data science plugin. */
public static final String DATA_SCIENCE = "data_science";

private XPackField() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ private XPackSettings() {
/** Setting for enabling or disabling vectors. Defaults to true. */
public static final Setting<Boolean> VECTORS_ENABLED = Setting.boolSetting("xpack.vectors.enabled", true, Setting.Property.NodeScope);

/** Setting for enabling or disabling data science plugin. Defaults to true. */
public static final Setting<Boolean> DATA_SCIENCE_ENABLED = Setting.boolSetting("xpack.datascience.enabled",
true, Setting.Property.NodeScope);

/*
* SSL settings. These are the settings that are specifically registered for SSL. Many are private as we do not explicitly use them
* but instead parse based on a prefix (eg *.ssl.*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ public class XPackInfoFeatureAction extends ActionType<XPackInfoFeatureResponse>
public static final XPackInfoFeatureAction VOTING_ONLY = new XPackInfoFeatureAction(XPackField.VOTING_ONLY);
public static final XPackInfoFeatureAction FROZEN_INDICES = new XPackInfoFeatureAction(XPackField.FROZEN_INDICES);
public static final XPackInfoFeatureAction SPATIAL = new XPackInfoFeatureAction(XPackField.SPATIAL);
public static final XPackInfoFeatureAction DATA_SCIENCE = new XPackInfoFeatureAction(XPackField.DATA_SCIENCE);

public static final List<XPackInfoFeatureAction> ALL = Arrays.asList(
SECURITY, MONITORING, WATCHER, GRAPH, MACHINE_LEARNING, LOGSTASH, SQL, ROLLUP, INDEX_LIFECYCLE, CCR, DATA_FRAME, FLATTENED,
VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL
VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL, DATA_SCIENCE
);

private XPackInfoFeatureAction(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ public class XPackUsageFeatureAction extends ActionType<XPackUsageFeatureRespons
public static final XPackUsageFeatureAction VOTING_ONLY = new XPackUsageFeatureAction(XPackField.VOTING_ONLY);
public static final XPackUsageFeatureAction FROZEN_INDICES = new XPackUsageFeatureAction(XPackField.FROZEN_INDICES);
public static final XPackUsageFeatureAction SPATIAL = new XPackUsageFeatureAction(XPackField.SPATIAL);
public static final XPackUsageFeatureAction DATA_SCIENCE = new XPackUsageFeatureAction(XPackField.DATA_SCIENCE);

public static final List<XPackUsageFeatureAction> ALL = Arrays.asList(
SECURITY, MONITORING, WATCHER, GRAPH, MACHINE_LEARNING, LOGSTASH, SQL, ROLLUP, INDEX_LIFECYCLE, CCR, DATA_FRAME, FLATTENED,
VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL
VECTORS, VOTING_ONLY, FROZEN_INDICES, SPATIAL, DATA_SCIENCE
);

private XPackUsageFeatureAction(String name) {
Expand Down
Loading

0 comments on commit 273c35f

Please sign in to comment.