Skip to content

Commit

Permalink
[7.14] Fix GeoIpProcessor when there's no updated db (#74944) (#74947)
Browse files Browse the repository at this point in the history
* Fix GeoIpProcessor when there's no updated db (#74944)

This change fixes problem with GeoIpProcessor when there's GeoIpTaskState present in the cluster state but there's no database matching the one used by the processor. It can happen when there are some but not all databases already updated.

* fix compilation

Co-authored-by: Przemko Robakowski <przemko.robakowski@elastic.co>
  • Loading branch information
elasticsearchmachine and probakowski committed Jul 6, 2021
1 parent ff2ed38 commit 899110c
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ public GeoIpProcessor create(
return true;
}
GeoIpTaskState state = (GeoIpTaskState) task.getState();
return state.getDatabases().get(databaseFile).isValid(currentState.metadata().settings());
GeoIpTaskState.Metadata metadata = state.getDatabases().get(databaseFile);
// we never remove metadata from cluster state, if metadata is null we deal with built-in database, which is always valid
return metadata == null || metadata.isValid(currentState.metadata().settings());
};
return new GeoIpProcessor(processorTag, description, ipField, supplier, isValid, targetField, properties, ignoreMissing,
firstOnly);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;

Expand Down Expand Up @@ -169,7 +170,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1")
.build())
.setOrigin("geoip")
.setOrigin(INGEST_ORIGIN)
.setVersionMetaKey("version")
.setPrimaryIndex(DATABASES_INDEX)
.setNetNew()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@
package org.elasticsearch.ingest.geoip;

import com.carrotsearch.randomizedtesting.generators.RandomPicks;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.StreamsUtils;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -367,6 +370,26 @@ public void testFallbackUsingDefaultDatabases() throws Exception {
}
}

public void testDefaultDatabaseWithTaskPresent() throws Exception {
PersistentTasksCustomMetadata tasks = PersistentTasksCustomMetadata.builder()
.addTask(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpDownloader.GEOIP_DOWNLOADER, null, null)
.updateTaskState(GeoIpDownloader.GEOIP_DOWNLOADER, GeoIpTaskState.EMPTY)
.build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasks))
.build();
when(clusterService.state()).thenReturn(clusterState);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
String processorTag = randomAlphaOfLength(10);

GeoIpProcessor processor = factory.create(null, processorTag, null, config);

processor.execute(RandomDocumentPicks.randomIngestDocument(random(), org.elasticsearch.core.Map.of("_field", "89.160.20.128")));
}

public void testFallbackUsingDefaultDatabasesWhileIngesting() throws Exception {
copyDatabaseFile(geoipTmpDir, "GeoLite2-City-Test.mmdb");
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseRegistry, clusterService);
Expand Down

0 comments on commit 899110c

Please sign in to comment.