Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable analytics geoip in behavioral analytics. #96624

Merged
merged 26 commits into from Jun 15, 2023

Conversation

afoucret
Copy link
Contributor

@afoucret afoucret commented Jun 6, 2023

The default pipeline used by behavioral analytics contains a geo ip processor and is installed through an IndexTemplateRegistry.

With the current implementation, it means that the GeoIpDownloader will be run as soon as Elasticsearch server will start.
We do not want this because it is not optimal for users that do not use behavioral analytcis.

This PR add an additional flag optionalgeoip_database_lazy_download in the pipeline _meta to determine if the pipeline install should trigger the download or not

  "processors": [
      { 
         "geoip": {}
      }
  ],
  "_meta": {
      "geoip_database_lazy_download": true
  }

If the flag is missing or set to false the behavior is unchanged.
If the flag is set to true, the geoip downloader will be triggered only when an index exists with the pipeline set has default_pipeline or final_pipeline

@afoucret afoucret added >non-issue :EnterpriseSearch/Application Enterprise Search Team:Enterprise Search Meta label for Enterprise Search team v8.9.0 labels Jun 6, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/ent-search-eng (Team:Enterprise Search)

@@ -226,11 +229,28 @@ public void clusterChanged(ClusterChangedEvent event) {
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> {
if (isLazyDownloadPipeline(pipelineDefinition) && (isPipelineUsed(clusterState, pipelineDefinition.getId()) == false)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ For all pipeline that have _meta.geoip_database_lazy_download set to false, the download is triggered only when an index with the pipeline set as default_pipeline or final_pipeline exists.

@@ -207,7 +209,8 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

if (event.metadataChanged() && event.changedCustomMetadataSet().contains(IngestMetadata.TYPE)) {
if (event.metadataChanged()
&& (event.indicesCreated().isEmpty() == false || event.changedCustomMetadataSet().contains(IngestMetadata.TYPE))) {
Copy link
Contributor Author

@afoucret afoucret Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Now we need to check if an index is created with the pipeline set.

@@ -65,7 +72,8 @@
}
],
"_meta": {
"managed": true
"managed": true,
"geoip_database_lazy_download": true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Adding geoip_database_lazy_download to our pipeline, so the database is downloaded only when an Analytics Collection is created.

public void testGeoIpDownloader() throws Exception {
assumeTrue("Disabled until PR #95621 is backported to branch " + Version.V_8_8_0, UPGRADE_FROM_VERSION.onOrBefore(Version.V_8_7_0));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Enable again this test for all versions. not have been disabled.

@@ -259,6 +259,10 @@
}
}
}
},
"tags": {
Copy link
Contributor Author

@afoucret afoucret Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ️ Adding a tags field to events as it is used by the geoip processor.

@jimczi jimczi added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team >feature and removed >non-issue labels Jun 9, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@elasticsearchmachine
Copy link
Collaborator

Hi @afoucret, I've created a changelog YAML for you.

Copy link
Contributor

@jimczi jimczi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach. We shouldn't have to switch ingest pipeline under the hood to avoid the eager download at startup.
I approved for our side so let's wait for @elastic/es-data-management's feedback now.

@afoucret
Copy link
Contributor Author

afoucret commented Jun 9, 2023

Also asked a review to @eyalkoren cause he was involved in the index template registry stuff

@afoucret afoucret requested a review from eyalkoren June 9, 2023 10:08
@afoucret
Copy link
Contributor Author

afoucret commented Jun 9, 2023

@elasticsearchmachine run elasticsearch-ci/doc-check

@afoucret
Copy link
Contributor Author

afoucret commented Jun 9, 2023

@elasticsearchmachine run elasticsearch-ci/part-1

@afoucret afoucret removed the :EnterpriseSearch/Application Enterprise Search label Jun 9, 2023
@elasticsearchmachine elasticsearchmachine removed the Team:Enterprise Search Meta label for Enterprise Search team label Jun 9, 2023
@masseyke masseyke assigned masseyke and unassigned masseyke Jun 9, 2023
@eyalkoren
Copy link
Contributor

@afoucret thanks for the trust, though I am really not in a position to review this area as I am still learning it myself 😊
Someone from the @elastic/es-data-management would be a better fit.

@eyalkoren eyalkoren removed their request for review June 11, 2023 13:25
| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
| `download_database_on_pipeline_creation` | no | `true` | If `true`, the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as `default_pipeline` or `final_pipeline` in an index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth mentioning ingest.geoip.downloader.eager.download? Something like:
If `true` (and if `ingest.geoip.downloader.eager.download` is false), the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as the `default_pipeline` or `final_pipeline` in an index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙇 Added your change


// Creating a pipeline containing a geo processor with lazy download enable.
// Download should not be triggered and task state should stay null.
putGeoIpPipeline(pipelineId, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be false? We want to not download on pipeline creation for this test right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And related, we probably ought to have a test that sets the value to true and checks that it does download right? Like

        putGeoIpPipeline(pipelineId, true);
        assertBusy(() -> assertNotNull(getTask().getState()));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another good test might be to create an index at this point that does not have a geoip processor in its pipeline, to make sure the cluster state change listener doesn't trigger the download when just any index is created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed the test and added few more step to it as suggested.

Map<String, Object> pipelineMap = pipelineDefinition.getConfigAsMap();
return hasAtLeastOneGeoipProcessor((List<Map<String, Object>>) pipelineMap.get(Pipeline.PROCESSORS_KEY));
return pipelineDefinitions.stream().anyMatch(pipelineConfig -> {
boolean isPipelineUsed = isPipelineUsed(clusterState, pipelineConfig.getId());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check whether it has a geoip processor before potentially looping through all of the indices to see if it's used? I'm a little worried about performance here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be best to collect the pipelines that have geoip processors together first while noting which of those pipelines have geoip processors that all have the "download on index created" setting set to true. If there are any pipelines that don't have that setting present, we can skip reading the indices and start the download. If they all have the setting, then we can check to see if any indices have a default/final pipeline usage that references one of the noted pipelines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some change:

  1. We collect all the pipeline downlaod_database_on_pipeline_creation being true and return true if not empty
  2. We collect all the pipeline download_database_on_pipeline_creation being false and return false if empty
  3. We loop over the indices to check if one of the collected pipeline is referenced only if we did not fall into an early return case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK just to confirm, I think that the worst case performance is now:
Every time someone adds/modifies/deletes an index or adds/modifies/deletes a pipeline (so fairly often), for each pipeline in the cluster state with a geoip processor with "download_database_on_pipeline_creation" set (probably relatively few), we look through each index in the cluster state (potentially 50k or more) to see if that is the default or final pipeline.
That check will happen more often than I'd like but I think that'll be acceptably fast.

@@ -463,6 +512,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
if (downloadDatabaseOnPipelineCreation) {
builder.field("download_database_on_pipeline_creation", true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want the ability to set this to false for the test right? It defaults to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
The condition has been updated to:

if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {

The random boolean allow to randomize testing between download_database_on_pipeline_creation missing or set to true

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the integration test needs to be updated (I think maybe it just wasn't fully updated after the property name changed). I also have some concerns about possible performance problems in the cluster state change listener.

@afoucret afoucret requested a review from masseyke June 13, 2023 16:18
@afoucret
Copy link
Contributor Author

@masseyke I did update the PR according to your feedback. Would be nice if you could re-review it. Thank you.

private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors) {
return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor);
@SuppressWarnings("unchecked")
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some javadocs for these methods. Specifically it would be good documenting what the input args are (the code makes sense when you get into it, but it's not intuitive to me what impact downloadDatabaseOnPipelineCreation has on the pipelineConfigurationsWithGeoIpProcessor that are returned from this method from just looking at the method signature)..

Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to see some more javadocs in the new methods in GeoIpDownloaderTaskExecutor, but the approach overall seems good to me and I think you've addressed the worst of the performance problems. Thanks for all the work on this.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left one small nit but otherwise thank you for iterating!

return true;
}

List<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be a Set since all interactions with it are via contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I did pushed an update to use a Set

@afoucret afoucret merged commit dd1d157 into elastic:main Jun 15, 2023
12 checks passed
salvatore-campagna pushed a commit to salvatore-campagna/elasticsearch that referenced this pull request Jun 19, 2023
* When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline.

* When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline.

* Adding the geoip processor back

* Adding tags to the events mapping.

* Fix a forbidden API call into tests.

* lint

* Adding an integration tests for managed pipelines.

* lint

* Add a geoip_database_lazy_download param to pipelines and use it instead of managed.

* Fix a edge case: pipeline can be set after index is created.

* lint.

* Update docs/changelog/96624.yaml

* Update 96624.yaml

* Uses a processor setting (download_database_on_pipeline_creation) to decide database download strategy.

* Removing debug instruction.

* Improved documentation.

* Improved the way to check for referenced pipelines.

* Fixing an error in test.

* Improved integration tests.

* Lint.

* Fix failing tests.

* Fix failing tests (2).

* Adding javadoc.

* lint javadoc.

* Using a set instead of a list to store checked pipelines.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >feature Team:Data Management Meta label for data/management team v8.9.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants