Skip to content

Commit

Permalink
[Behavioral Analytics] Add a final_pipeline to event data streams. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
afoucret committed Apr 18, 2023
1 parent 8512249 commit e03378f
Show file tree
Hide file tree
Showing 18 changed files with 774 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/95198.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95198
summary: "[Behavioral Analytics] Add a `final_pipeline` to event data streams"
area: Application
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"description": "Built-in ingest pipeline applied by default as final pipeline to behavioral analytics event data streams.",
"processors": [
{
"uri_parts": {
"field": "page.url",
"target_field": "page.url",
"ignore_missing": true
}
},
{
"uri_parts": {
"field": "page.referrer",
"target_field": "page.referrer",
"ignore_missing": true
}
},
{
"foreach": {
"field": "search.results.items",
"ignore_missing": true,
"processor": {
"uri_parts": {
"field": "_ingest._value.page.url",
"target_field": "_ingest._value.page.url",
"ignore_missing": true
}
}
}
}
],
"_meta": {
"managed": true
},
"version": ${xpack.entsearch.analytics.template.version}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,84 @@
"page": {
"properties": {
"url": {
"ignore_above": 1024,
"type": "keyword"
"properties": {
"domain": {
"ignore_above": 1024,
"type": "keyword"
},
"extension": {
"ignore_above": 1024,
"type": "keyword"
},
"fragment": {
"ignore_above": 1024,
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
},
"path": {
"type": "wildcard"
},
"port": {
"type": "long"
},
"query": {
"ignore_above": 1024,
"type": "keyword"
},
"scheme": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"title": {
"ignore_above": 1024,
"type": "keyword"
},
"referrer": {
"ignore_above": 1024,
"type": "keyword"
"properties": {
"domain": {
"ignore_above": 1024,
"type": "keyword"
},
"extension": {
"ignore_above": 1024,
"type": "keyword"
},
"fragment": {
"ignore_above": 1024,
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
},
"path": {
"type": "wildcard"
},
"port": {
"type": "long"
},
"query": {
"ignore_above": 1024,
"type": "keyword"
},
"scheme": {
"ignore_above": 1024,
"type": "keyword"
}
}
}
}
},
Expand Down Expand Up @@ -133,8 +201,42 @@
"page": {
"properties": {
"url": {
"type": "keyword",
"ignore_above": 1024
"properties": {
"domain": {
"ignore_above": 1024,
"type": "keyword"
},
"extension": {
"ignore_above": 1024,
"type": "keyword"
},
"fragment": {
"ignore_above": 1024,
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
},
"path": {
"type": "wildcard"
},
"port": {
"type": "long"
},
"query": {
"ignore_above": 1024,
"type": "keyword"
},
"scheme": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"title": {
"type": "keyword",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"refresh_interval": "1s",
"number_of_shards": 1,
"number_of_replicas": 0,
"auto_expand_replicas": "0-1"
"auto_expand_replicas": "0-1",
"final_pipeline": "behavioral_analytics-events-final_pipeline"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.application.analytics.AnalyticsIngestPipelineRegistry;
import org.elasticsearch.xpack.application.analytics.AnalyticsTemplateRegistry;
import org.elasticsearch.xpack.application.analytics.action.DeleteAnalyticsCollectionAction;
import org.elasticsearch.xpack.application.analytics.action.GetAnalyticsCollectionAction;
Expand Down Expand Up @@ -171,7 +172,13 @@ public Collection<Object> createComponents(
);
analyticsTemplateRegistry.initialize();

return Arrays.asList(analyticsTemplateRegistry);
final AnalyticsIngestPipelineRegistry analyticsPipelineRegistry = new AnalyticsIngestPipelineRegistry(
clusterService,
threadPool,
client
);

return Arrays.asList(analyticsTemplateRegistry, analyticsPipelineRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX;

/**
* The {@link AnalyticsCollection} model.
*/
Expand Down Expand Up @@ -68,7 +70,7 @@ public String getName() {
* @return Event data stream name/
*/
public String getEventDataStream() {
return AnalyticsTemplateRegistry.EVENT_DATA_STREAM_INDEX_PREFIX + name;
return EVENT_DATA_STREAM_INDEX_PREFIX + name;
}

/**
Expand Down Expand Up @@ -114,13 +116,13 @@ public static AnalyticsCollection fromXContent(String resourceName, XContentPars
}

public static AnalyticsCollection fromDataStreamName(String dataStreamName) {
if (dataStreamName.startsWith(AnalyticsTemplateRegistry.EVENT_DATA_STREAM_INDEX_PREFIX) == false) {
if (dataStreamName.startsWith(EVENT_DATA_STREAM_INDEX_PREFIX) == false) {
throw new IllegalArgumentException(
"Data stream name (" + dataStreamName + " must start with " + AnalyticsTemplateRegistry.EVENT_DATA_STREAM_INDEX_PREFIX
"Data stream name (" + dataStreamName + " must start with " + EVENT_DATA_STREAM_INDEX_PREFIX
);
}

return new AnalyticsCollection(dataStreamName.replaceFirst(AnalyticsTemplateRegistry.EVENT_DATA_STREAM_INDEX_PREFIX, ""));
return new AnalyticsCollection(dataStreamName.replaceFirst(EVENT_DATA_STREAM_INDEX_PREFIX, ""));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import static java.util.function.Predicate.not;
import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PATTERN;

/**
* A service that allows the resolution of {@link AnalyticsCollection} by name.
Expand Down Expand Up @@ -82,7 +83,7 @@ public List<AnalyticsCollection> collections(ClusterState state, String... expre
List<String> dataStreams = indexNameExpressionResolver.dataStreamNames(
state,
IndicesOptions.lenientExpandOpen(),
AnalyticsTemplateRegistry.EVENT_DATA_STREAM_INDEX_PATTERN
EVENT_DATA_STREAM_INDEX_PATTERN
);

Map<String, AnalyticsCollection> collections = dataStreams.stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.application.analytics;

public class AnalyticsConstants {

private AnalyticsConstants() {}

// Data stream related constants.
public static final String EVENT_DATA_STREAM_DATASET = "events";
public static final String EVENT_DATA_STREAM_TYPE = "behavioral_analytics";
public static final String EVENT_DATA_STREAM_INDEX_PREFIX = EVENT_DATA_STREAM_TYPE + "-" + EVENT_DATA_STREAM_DATASET + "-";
public static final String EVENT_DATA_STREAM_INDEX_PATTERN = EVENT_DATA_STREAM_INDEX_PREFIX + "*";

// Resource config.
public static final String ROOT_RESOURCE_PATH = "/org/elasticsearch/xpack/entsearch/analytics/";

// The variable to be replaced with the template version number
public static final String TEMPLATE_VERSION_VARIABLE = "xpack.entsearch.analytics.template.version";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.application.analytics;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.application.utils.ingest.PipelineRegistry;
import org.elasticsearch.xpack.application.utils.ingest.PipelineTemplateConfiguration;

import java.util.Collections;
import java.util.List;

import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.EVENT_DATA_STREAM_INDEX_PREFIX;
import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.ROOT_RESOURCE_PATH;
import static org.elasticsearch.xpack.application.analytics.AnalyticsConstants.TEMPLATE_VERSION_VARIABLE;
import static org.elasticsearch.xpack.application.analytics.AnalyticsTemplateRegistry.REGISTRY_VERSION;
import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN;

/**
* Set up the behavioral analytics ingest pipelines.
*/
public class AnalyticsIngestPipelineRegistry extends PipelineRegistry {

// Ingest pipelines configuration.
static final String EVENT_DATA_STREAM_INGEST_PIPELINE_NAME = EVENT_DATA_STREAM_INDEX_PREFIX + "final_pipeline";
static final List<PipelineTemplateConfiguration> INGEST_PIPELINES = Collections.singletonList(
new PipelineTemplateConfiguration(
EVENT_DATA_STREAM_INGEST_PIPELINE_NAME,
ROOT_RESOURCE_PATH + EVENT_DATA_STREAM_INGEST_PIPELINE_NAME + ".json",
REGISTRY_VERSION,
TEMPLATE_VERSION_VARIABLE
)
);

public AnalyticsIngestPipelineRegistry(ClusterService clusterService, ThreadPool threadPool, Client client) {
super(clusterService, threadPool, client);
}

@Override
protected String getOrigin() {
return ENT_SEARCH_ORIGIN;
}

@Override
protected List<PipelineTemplateConfiguration> getIngestPipelineConfigs() {
return INGEST_PIPELINES;
}
}

0 comments on commit e03378f

Please sign in to comment.