Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable analytics geoip in behavioral analytics. #96624

Merged
merged 26 commits into from Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
823e276
When using a managed pipeline GeoIpDownloader is triggered only when …
afoucret Jun 6, 2023
4bf04ae
When using a managed pipeline GeoIpDownloader is triggered only when …
afoucret Jun 6, 2023
6f0767f
Adding the geoip processor back
afoucret Jun 6, 2023
97009d6
Adding tags to the events mapping.
afoucret Jun 6, 2023
b86bbb9
Fix a forbidden API call into tests.
afoucret Jun 6, 2023
7ee7a2e
lint
afoucret Jun 6, 2023
c8158fc
Adding an integration tests for managed pipelines.
afoucret Jun 9, 2023
051d5d4
lint
afoucret Jun 9, 2023
c257f07
Add a geoip_database_lazy_download param to pipelines and use it inst…
afoucret Jun 9, 2023
9a3a079
Fix a edge case: pipeline can be set after index is created.
afoucret Jun 9, 2023
abb80c0
lint.
afoucret Jun 9, 2023
f44caf6
Update docs/changelog/96624.yaml
afoucret Jun 9, 2023
728c18f
Update 96624.yaml
afoucret Jun 9, 2023
199ccd2
Uses a processor setting (download_database_on_pipeline_creation) to …
afoucret Jun 12, 2023
e4db7aa
Removing debug instruction.
afoucret Jun 12, 2023
ca4e909
Improved documentation.
afoucret Jun 12, 2023
62a6ab8
Improved the way to check for referenced pipelines.
afoucret Jun 13, 2023
ef123f7
Fixing an error in test.
afoucret Jun 13, 2023
c3baf3d
Improved integration tests.
afoucret Jun 13, 2023
6489976
Lint.
afoucret Jun 13, 2023
6ade089
Fix failing tests.
afoucret Jun 15, 2023
90e65fa
Fix failing tests (2).
afoucret Jun 15, 2023
9dc3f81
Adding javadoc.
afoucret Jun 15, 2023
31082be
lint javadoc.
afoucret Jun 15, 2023
22aa720
Using a set instead of a list to store checked pipelines.
afoucret Jun 15, 2023
022a6ec
Merge branch 'elastic:main' into enable-analytics-geip
afoucret Jun 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/96624.yaml
@@ -0,0 +1,5 @@
pr: 96624
summary: Enable analytics geoip in behavioral analytics
area: "Application"
type: feature
issues: []
17 changes: 9 additions & 8 deletions docs/reference/ingest/processors/geoip.asciidoc
Expand Up @@ -14,7 +14,7 @@ CC BY-SA 4.0 license. It automatically downloads these databases if your nodes c

* `ingest.geoip.downloader.eager.download` is set to true
* your cluster has at least one pipeline with a `geoip` processor

{es} automatically downloads updates for these databases from the Elastic GeoIP endpoint:
https://geoip.elastic.co/v1/database. To get download statistics for these
updates, use the <<geoip-stats-api,GeoIP stats API>>.
Expand All @@ -33,13 +33,14 @@ field instead.
.`geoip` options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
| Name | Required | Default | Description
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database.
| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
| `download_database_on_pipeline_creation` | no | `true` | If `true` (and if `ingest.geoip.downloader.eager.download` is `false`), the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as the `default_pipeline` or `final_pipeline` in an index.
|======

*Depends on what is available in `database_file`:
Expand Down
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand Down Expand Up @@ -150,6 +152,7 @@ public void cleanUp() throws Exception {
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221")
public void testInvalidTimestamp() throws Exception {
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
setupDatabasesInConfigDirectory();
putGeoIpPipeline();
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> {
Expand Down Expand Up @@ -283,6 +286,42 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
}, 2, TimeUnit.MINUTES);
}

public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception {
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
String pipelineId = randomIdentifier();

// Removing databases from tmp dir. So we can test the downloader.
deleteDatabasesInConfigDirectory();

// Enabling the downloader.
putGeoIpPipeline("_id", false);
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
assertBusy(() -> assertNotNull(getTask()));

// Creating a pipeline containing a geo processor with lazy download enable.
// Download should not be triggered and task state should stay null.
putGeoIpPipeline(pipelineId, false);
assertNull(getTask().getState());

// Creating an index which does not reference the pipeline should not trigger the database download.
String indexIdentifier = randomIdentifier();
assertAcked(client().admin().indices().prepareCreate(indexIdentifier).get());
assertNull(getTask().getState());

// Set the pipeline as default_pipeline or final_pipeline for the index.
// This should trigger the database download.
Setting<String> pipelineSetting = randomFrom(IndexSettings.FINAL_PIPELINE, IndexSettings.DEFAULT_PIPELINE);
Settings indexSettings = Settings.builder().put(pipelineSetting.getKey(), pipelineId).build();
assertAcked(client().admin().indices().prepareUpdateSettings(indexIdentifier).setSettings(indexSettings).get());
assertBusy(() -> {
GeoIpTaskState state = getGeoIpTaskState();
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
}, 2, TimeUnit.MINUTES);

// Remove the created index.
assertAcked(client().admin().indices().prepareDelete(indexIdentifier).get());
}

@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
Expand Down Expand Up @@ -450,6 +489,17 @@ private void putGeoIpPipeline() throws IOException {
* @throws IOException
*/
private void putGeoIpPipeline(String pipelineId) throws IOException {
putGeoIpPipeline(pipelineId, true);
}

/**
* This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is
* enabled).
* @param pipelineId The name of the new pipeline with a geoip processor
* @param downloadDatabaseOnPipelineCreation Indicates whether the pipeline creation should trigger database download or not.
* @throws IOException
*/
private void putGeoIpPipeline(String pipelineId, boolean downloadDatabaseOnPipelineCreation) throws IOException {
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
Expand All @@ -463,6 +513,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
}
}
builder.endObject();
}
Expand All @@ -474,6 +527,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
builder.field("field", "ip");
builder.field("target_field", "ip-country");
builder.field("database_file", "GeoLite2-Country.mmdb");
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
}
}
builder.endObject();
}
Expand All @@ -485,6 +541,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
builder.field("field", "ip");
builder.field("target_field", "ip-asn");
builder.field("database_file", "GeoLite2-ASN.mmdb");
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
}
}
builder.endObject();
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
Expand All @@ -45,9 +46,11 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation;

/**
* Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node.
Expand Down Expand Up @@ -207,7 +210,14 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

if (event.metadataChanged() && event.changedCustomMetadataSet().contains(IngestMetadata.TYPE)) {
if (event.metadataChanged() == false) {
return;
}

boolean hasIndicesChanges = event.previousState().metadata().indices().equals(event.state().metadata().indices()) == false;
boolean hasIngestPipelineChanges = event.changedCustomMetadataSet().contains(IngestMetadata.TYPE);

if (hasIngestPipelineChanges || hasIndicesChanges) {
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state());
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
atLeastOneGeoipProcessor = true;
Expand All @@ -222,41 +232,112 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

@SuppressWarnings("unchecked")
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> {
Map<String, Object> pipelineMap = pipelineDefinition.getConfigAsMap();
return hasAtLeastOneGeoipProcessor((List<Map<String, Object>>) pipelineMap.get(Pipeline.PROCESSORS_KEY));
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
return true;
}

List<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a Set since all interactions with it are via contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I did pushed an update to use a Set

.map(PipelineConfiguration::getId)
.collect(Collectors.toList());

if (checkReferencedPipelines.isEmpty()) {
return false;
}

return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
});
}

private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors) {
return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor);
/**
* Retrieve list of pipelines that have at least one geoip processor.
* @param clusterState Cluster state.
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
* matching the param.
* @return A list of {@link PipelineConfiguration} matching criteria.
*/
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some javadocs for these methods. Specifically it would be good documenting what the input args are (the code makes sense when you get into it, but it's not intuitive to me what impact downloadDatabaseOnPipelineCreation has on the pipelineConfigurationsWithGeoIpProcessor that are returned from this method from just looking at the method signature)..

ClusterState clusterState,
boolean downloadDatabaseOnPipelineCreation
) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().filter(pipelineConfig -> {
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
}).collect(Collectors.toList());
}

private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor) {
return processor != null
&& (processor.containsKey(GeoIpProcessor.TYPE)
|| isProcessorWithOnFailureGeoIpProcessor(processor)
|| isForeachProcessorWithGeoipProcessor(processor));
/**
* Check if a list of processor contains at least a geoip processor.
* @param processors List of processors.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
}

/**
* Check if a processor config is a geoip processor or contains at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
if (processor == null) {
return false;
}

if (processor.containsKey(GeoIpProcessor.TYPE)) {
Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GeoIpProcessor.TYPE);
return downloadDatabaseOnPipelineCreation(processorConfig) == downloadDatabaseOnPipelineCreation;
}

return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
}

/**
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isProcessorWithOnFailureGeoIpProcessor(Map<String, Object> processor) {
private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
) {
return processor != null
&& processor.values()
.stream()
.anyMatch(
value -> value instanceof Map
&& hasAtLeastOneGeoipProcessor(((Map<String, List<Map<String, Object>>>) value).get("on_failure"))
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
)
);
}

/**
* Check if a processor is a foreach processor containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor) {
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
return processor.containsKey("foreach")
&& hasAtLeastOneGeoipProcessor(((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"));
&& hasAtLeastOneGeoipProcessor(
((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"),
downloadDatabaseOnPipelineCreation
);
}

private void startTask(Runnable onFailure) {
Expand Down