Skip to content

Commit

Permalink
Behavioral Analytics event ingest tuning (#95405)
Browse files Browse the repository at this point in the history
  • Loading branch information
afoucret committed Apr 21, 2023
1 parent 8993ddc commit c501fe7
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 129 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/95405.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95405
summary: Behavioral Analytics event ingest tuning
area: Application
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"actions": {
"rollover": {
"max_age": "30d",
"max_size": "3GB"
"max_primary_shard_size": "50gb"
}
}
},
Expand All @@ -19,6 +19,10 @@
}
}
},
"cold": {
"min_age": "30d",
"actions": {}
},
"delete": {
"min_age": "180d",
"actions":{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
{
"description": "Built-in ingest pipeline applied by default as final pipeline to behavioral analytics event data streams.",
"processors": [
{
"set": {
"field": "_routing",
"copy_from": "session.id"
}
},
{
"uri_parts": {
"field": "page.url",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,12 @@
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
"ignore_above": 1024,
"type": "keyword"
},
"path": {
"type": "wildcard"
"ignore_above": 1024,
"type": "keyword"
},
"port": {
"type": "long"
Expand Down Expand Up @@ -118,15 +115,12 @@
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
"ignore_above": 1024,
"type": "keyword"
},
"path": {
"type": "wildcard"
"ignore_above": 1024,
"type": "keyword"
},
"port": {
"type": "long"
Expand Down Expand Up @@ -184,67 +178,8 @@
"type": "integer"
},
"items": {
"properties": {
"document": {
"properties": {

"index": {
"type": "keyword",
"ignore_above": 1024
},
"id": {
"type": "keyword",
"ignore_above": 1024
}
}
},
"page": {
"properties": {
"url": {
"properties": {
"domain": {
"ignore_above": 1024,
"type": "keyword"
},
"extension": {
"ignore_above": 1024,
"type": "keyword"
},
"fragment": {
"ignore_above": 1024,
"type": "keyword"
},
"original": {
"fields": {
"text": {
"type": "match_only_text"
}
},
"type": "wildcard"
},
"path": {
"type": "wildcard"
},
"port": {
"type": "long"
},
"query": {
"ignore_above": 1024,
"type": "keyword"
},
"scheme": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"title": {
"type": "keyword",
"ignore_above": 1024
}
}
}
}
"type": "object",
"enabled": false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
"number_of_shards": 1,
"number_of_replicas": 0,
"auto_expand_replicas": "0-1",
"final_pipeline": "behavioral_analytics-events-final_pipeline"
"final_pipeline": "behavioral_analytics-events-final_pipeline",
"sort": {
"field": ["session.id", "@timestamp"],
"order": ["asc", "asc"]
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"index_patterns": ["${event_data_stream.index_pattern}"],
"data_stream": {},
"data_stream": {
"allow_custom_routing": true
},
"priority": 100,
"composed_of": [
"behavioral_analytics-events-settings",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.xpack.application.analytics.action.TransportGetAnalyticsCollectionAction;
import org.elasticsearch.xpack.application.analytics.action.TransportPostAnalyticsEventAction;
import org.elasticsearch.xpack.application.analytics.action.TransportPutAnalyticsCollectionAction;
import org.elasticsearch.xpack.application.analytics.ingest.BulkProcessorConfig;
import org.elasticsearch.xpack.application.analytics.ingest.AnalyticsEventIngestConfig;
import org.elasticsearch.xpack.application.search.SearchApplicationIndexService;
import org.elasticsearch.xpack.application.search.action.DeleteSearchApplicationAction;
import org.elasticsearch.xpack.application.search.action.GetSearchApplicationAction;
Expand Down Expand Up @@ -199,9 +199,10 @@ public String getFeatureDescription() {
@Override
public List<Setting<?>> getSettings() {
return List.of(
BulkProcessorConfig.MAX_NUMBER_OF_EVENTS_PER_BULK_SETTING,
BulkProcessorConfig.FLUSH_DELAY_SETTING,
BulkProcessorConfig.MAX_NUMBER_OF_RETRIES_SETTING
AnalyticsEventIngestConfig.MAX_NUMBER_OF_EVENTS_PER_BULK_SETTING,
AnalyticsEventIngestConfig.FLUSH_DELAY_SETTING,
AnalyticsEventIngestConfig.MAX_NUMBER_OF_RETRIES_SETTING,
AnalyticsEventIngestConfig.MAX_BYTES_IN_FLIGHT_SETTING
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.application.analytics.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkProcessor2;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
Expand All @@ -27,19 +29,22 @@
import org.elasticsearch.xpack.application.analytics.event.AnalyticsEventFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN;

public class AnalyticsEventEmitter extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(AnalyticsEvent.class);
private static final Logger logger = LogManager.getLogger(AnalyticsEventEmitter.class);

private final Client client;

private final BulkProcessor2 bulkProcessor;

private final AnalyticsEventFactory eventFactory;

private final AtomicBoolean dropEvent = new AtomicBoolean(false);

@Inject
public AnalyticsEventEmitter(Client client, BulkProcessorFactory bulkProcessorFactory) {
this(client, bulkProcessorFactory, AnalyticsEventFactory.INSTANCE);
Expand Down Expand Up @@ -67,6 +72,10 @@ public void emitEvent(

bulkProcessor.add(eventIndexRequest);

if (dropEvent.compareAndSet(true, false)) {
logger.warn("Bulk processor has been flushed. Accepting new events again.");
}

if (request.isDebug()) {
listener.onResponse(new PostAnalyticsEventAction.DebugResponse(true, event));
} else {
Expand All @@ -75,8 +84,13 @@ public void emitEvent(
} catch (IOException e) {
listener.onFailure(new ElasticsearchException("Unable to parse the event.", e));
} catch (EsRejectedExecutionException e) {
listener.onFailure(new ElasticsearchException("Unable to add the event to the bulk."));
logger.error("Unable to add the event to the bulk.", e);
listener.onFailure(
new ElasticsearchStatusException("Unable to add the event: too many requests.", RestStatus.TOO_MANY_REQUESTS)
);

if (dropEvent.compareAndSet(false, true)) {
logger.warn("Bulk processor is full. Start dropping events.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;

Expand All @@ -20,30 +21,46 @@
* - max_events_per_bulk: the maximum number of events that can be added to the bulk before flushing the bulk (default: 1000)
* - max_number_of_retries: the maximum number of retries when bulk execution fails (default: 3)
*/
public class BulkProcessorConfig {
private static final String SETTING_ROOT_PATH = "xpack.applications.behavioral_analytics.ingest.bulk_processor";
public class AnalyticsEventIngestConfig {
private static final String SETTING_ROOT_PATH = "xpack.applications.behavioral_analytics.ingest";

private static final TimeValue DEFAULT_FLUSH_DELAY = TimeValue.timeValueSeconds(10);
private static final TimeValue MIN_FLUSH_DELAY = TimeValue.timeValueSeconds(1);
private static final TimeValue MAX_FLUSH_DELAY = TimeValue.timeValueSeconds(60);
public static final Setting<TimeValue> FLUSH_DELAY_SETTING = Setting.timeSetting(
Strings.format("%s.%s", SETTING_ROOT_PATH, "flush_delay"),
TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(1),
TimeValue.timeValueSeconds(60),
Strings.format("%s.%s", SETTING_ROOT_PATH, "bulk_processor.flush_delay"),
DEFAULT_FLUSH_DELAY,
MIN_FLUSH_DELAY,
MAX_FLUSH_DELAY,
Setting.Property.NodeScope
);

private static final int DEFAULT_BULK_SIZE = 500;
private static final int MIN_BULK_SIZE = 1;
private static final int MAX_BULK_SIZE = 1000;
public static final Setting<Integer> MAX_NUMBER_OF_EVENTS_PER_BULK_SETTING = Setting.intSetting(
Strings.format("%s.%s", SETTING_ROOT_PATH, "max_events_per_bulk"),
1000,
1,
10000,
Strings.format("%s.%s", SETTING_ROOT_PATH, "bulk_processor.max_events_per_bulk"),
DEFAULT_BULK_SIZE,
MIN_BULK_SIZE,
MAX_BULK_SIZE,
Setting.Property.NodeScope
);

private static final int DEFAULT_MAX_NUMBER_OF_RETRIES = 1;
private static final int MIN_MAX_NUMBER_OF_RETRIES = 0;
private static final int MAX_MAX_NUMBER_OF_RETRIES = 5;
public static final Setting<Integer> MAX_NUMBER_OF_RETRIES_SETTING = Setting.intSetting(
Strings.format("%s.%s", SETTING_ROOT_PATH, "max_number_of_retries"),
3,
0,
5,
Strings.format("%s.%s", SETTING_ROOT_PATH, "bulk_processor.max_number_of_retries"),
DEFAULT_MAX_NUMBER_OF_RETRIES,
MIN_MAX_NUMBER_OF_RETRIES,
MAX_MAX_NUMBER_OF_RETRIES,
Setting.Property.NodeScope
);

private static final String DEFAULT_MAX_BYTES_IN_FLIGHT = "5%";
public static final Setting<ByteSizeValue> MAX_BYTES_IN_FLIGHT_SETTING = Setting.memorySizeSetting(
Strings.format("%s.%s", SETTING_ROOT_PATH, "bulk_processor.max_bytes_in_flight"),
DEFAULT_MAX_BYTES_IN_FLIGHT,
Setting.Property.NodeScope
);

Expand All @@ -53,11 +70,14 @@ public class BulkProcessorConfig {

private final int maxNumberOfEventsPerBulk;

private final ByteSizeValue maxBytesInFlight;

@Inject
public BulkProcessorConfig(Settings settings) {
public AnalyticsEventIngestConfig(Settings settings) {
this.flushDelay = FLUSH_DELAY_SETTING.get(settings);
this.maxNumberOfRetries = MAX_NUMBER_OF_RETRIES_SETTING.get(settings);
this.maxNumberOfEventsPerBulk = MAX_NUMBER_OF_EVENTS_PER_BULK_SETTING.get(settings);
this.maxBytesInFlight = MAX_BYTES_IN_FLIGHT_SETTING.get(settings);
}

public TimeValue flushDelay() {
Expand All @@ -71,4 +91,8 @@ public int maxNumberOfRetries() {
public int maxNumberOfEventsPerBulk() {
return maxNumberOfEventsPerBulk;
}

public ByteSizeValue maxBytesInFlight() {
return maxBytesInFlight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;

Expand All @@ -28,19 +26,19 @@
public class BulkProcessorFactory {
private static final Logger logger = LogManager.getLogger(AnalyticsEventEmitter.class);

private final BulkProcessorConfig config;
private final AnalyticsEventIngestConfig config;

@Inject
public BulkProcessorFactory(BulkProcessorConfig config) {
public BulkProcessorFactory(AnalyticsEventIngestConfig config) {
this.config = config;
}

public BulkProcessor2 create(Client client) {
return BulkProcessor2.builder(client::bulk, new BulkProcessorListener(), client.threadPool())
.setMaxNumberOfRetries(config.maxNumberOfRetries())
.setBulkActions(config.maxNumberOfEventsPerBulk())
.setBulkSize(new ByteSizeValue(-1, ByteSizeUnit.BYTES))
.setFlushInterval(config.flushDelay())
.setMaxBytesInFlight(config.maxBytesInFlight())
.build();
}

Expand Down

0 comments on commit c501fe7

Please sign in to comment.