Skip to content

Commit

Permalink
Handle .geoip_databases being an alias or a concrete index (#85792)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed May 3, 2022
1 parent a30ab86 commit cfe1f11
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 48 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/85792.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 85792
summary: Handle `.geoip_databases` being an alias or a concrete index
area: Ingest
type: bug
issues:
- 85756
95 changes: 95 additions & 0 deletions modules/ingest-geoip/qa/full-cluster-restart/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask

apply plugin: 'elasticsearch.internal-testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.internal-test-artifact'
apply plugin: 'elasticsearch.bwc-test'

def useFixture = providers.environmentVariable("geoip_use_service")
.map { s -> Boolean.parseBoolean(s) == false }
.getOrElse(true)

def fixtureAddress = {
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = tasks.getByPath(":test:fixtures:geoip-fixture:postProcessFixture").ext."test.fixtures.geoip-fixture-restart.tcp.80"
assert ephemeralPort > 0
return "http://127.0.0.1:${ephemeralPort}/"
}

if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(':test:fixtures:geoip-fixture', 'geoip-fixture-restart')
}

tasks.withType(Test).configureEach {
if (useFixture) {
nonInputProperties.systemProperty "geoip_endpoint", "${-> fixtureAddress()}"
}
}


BuildParams.bwcVersions.withWireCompatible(v -> v.before("8.0.0")) { bwcVersion, baseName ->
def baseCluster = testClusters.register(baseName) {
testDistribution = "DEFAULT"
if (bwcVersion.before(BuildParams.bwcVersions.minimumWireCompatibleVersion)) {
// When testing older versions we have to first upgrade to 7.last
versions = [bwcVersion.toString(), BuildParams.bwcVersions.minimumWireCompatibleVersion.toString(), project.version]
} else {
versions = [bwcVersion.toString(), project.version]
}
numberOfNodes = 2
// some tests rely on the translog not being flushed
setting 'indices.memory.shard_inactive_time', '60m'
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
setting 'xpack.security.enabled', 'false'
if (useFixture) {
setting 'ingest.geoip.downloader.endpoint', { "${-> fixtureAddress()}" }
}
requiresFeature 'es.index_mode_feature_flag_registered', Version.fromString("8.0.0")
}
tasks.register("${baseName}#oldClusterTest", StandaloneRestIntegTestTask) {
useCluster baseCluster
mustRunAfter("precommit")
doFirst {
delete("${buildDir}/cluster/shared/repo/${baseName}")
}
systemProperty 'tests.is_old_cluster', 'true'
}
tasks.register("${baseName}#upgradedClusterTest", StandaloneRestIntegTestTask) {
useCluster baseCluster
dependsOn "${baseName}#oldClusterTest"
doFirst {
baseCluster.get().goToNextVersion()
if (bwcVersion.before(BuildParams.bwcVersions.minimumWireCompatibleVersion)) {
// When doing a full cluster restart of older versions we actually have to upgrade twice. First to 7.last, then to the current version.
baseCluster.get().goToNextVersion()
}
}
systemProperty 'tests.is_old_cluster', 'false'
}
String oldVersion = bwcVersion.toString().minus("-SNAPSHOT")
tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
it.systemProperty 'tests.old_cluster_version', oldVersion
it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
it.nonInputProperties.systemProperty('tests.rest.cluster', baseCluster.map(c -> c.allHttpSocketURI.join(",")))
it.nonInputProperties.systemProperty('tests.clustername', baseName)
}
tasks.register(bwcTaskName(bwcVersion)) {
dependsOn tasks.named("${baseName}#upgradedClusterTest")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.ingest.geoip;

import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.upgrades.AbstractFullClusterRestartTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.contains;

public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {

public void testGeoIpSystemFeaturesMigration() throws Exception {
if (isRunningAgainstOldCluster()) {
Request enableDownloader = new Request("PUT", "/_cluster/settings");
enableDownloader.setJsonEntity("""
{"persistent": {"ingest.geoip.downloader.enabled": true}}
""");
assertOK(client().performRequest(enableDownloader));

Request putPipeline = new Request("PUT", "/_ingest/pipeline/geoip");
putPipeline.setJsonEntity("""
{
"description": "Add geoip info",
"processors": [{
"geoip": {
"field": "ip",
"target_field": "geo",
"database_file": "GeoLite2-Country.mmdb"
}
}]
}
""");
assertOK(client().performRequest(putPipeline));

// wait for the geo databases to all be loaded
assertBusy(() -> testDatabasesLoaded(), 30, TimeUnit.SECONDS);

// the geoip index should be created
assertBusy(() -> testCatIndices(".geoip_databases"));
assertBusy(() -> testIndexGeoDoc());
} else {
Request migrateSystemFeatures = new Request("POST", "/_migration/system_features");
assertOK(client().performRequest(migrateSystemFeatures));

assertBusy(() -> testCatIndices(".geoip_databases-reindexed-for-8", "my-index-00001"));
assertBusy(() -> testIndexGeoDoc());

Request disableDownloader = new Request("PUT", "/_cluster/settings");
disableDownloader.setJsonEntity("""
{"persistent": {"ingest.geoip.downloader.enabled": false}}
""");
assertOK(client().performRequest(disableDownloader));

// the geoip index should be deleted
assertBusy(() -> testCatIndices("my-index-00001"));

Request enableDownloader = new Request("PUT", "/_cluster/settings");
enableDownloader.setJsonEntity("""
{"persistent": {"ingest.geoip.downloader.enabled": true}}
""");
assertOK(client().performRequest(enableDownloader));

// wait for the geo databases to all be loaded
assertBusy(() -> testDatabasesLoaded(), 30, TimeUnit.SECONDS);

// the geoip index should be recreated
assertBusy(() -> testCatIndices(".geoip_databases", "my-index-00001"));
assertBusy(() -> testIndexGeoDoc());
}
}

private void testDatabasesLoaded() throws IOException {
Request getTaskState = new Request("GET", "/_cluster/state");
ObjectPath state = ObjectPath.createFromResponse(client().performRequest(getTaskState));

Map<String, Object> databases = null;
try {
databases = state.evaluate("metadata.persistent_tasks.tasks.0.task.geoip-downloader.state.databases");
} catch (Exception e) {
// ObjectPath doesn't like the 0 above if the list of tasks is empty, and it throws rather than returning null,
// catch that and throw an AssertionError instead (which assertBusy will handle)
fail();
}
assertNotNull(databases);

for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
Object database = databases.get(name);
assertNotNull(database);
assertNotNull(ObjectPath.evaluate(database, "md5"));
}
}

private void testCatIndices(String... indexNames) throws IOException {
Request catIndices = new Request("GET", "_cat/indices/*?s=index&h=index&expand_wildcards=all");
String response = EntityUtils.toString(client().performRequest(catIndices).getEntity());
List<String> indices = List.of(response.trim().split("\\s+"));
assertThat(indices, contains(indexNames));
}

private void testIndexGeoDoc() throws IOException {
Request putDoc = new Request("PUT", "/my-index-00001/_doc/my_id?pipeline=geoip");
putDoc.setJsonEntity("""
{"ip": "89.160.20.128"}
""");
assertOK(client().performRequest(putDoc));

Request getDoc = new Request("GET", "/my-index-00001/_doc/my_id");
ObjectPath doc = ObjectPath.createFromResponse(client().performRequest(getDoc));
assertNull(doc.evaluate("_source.tags"));
assertEquals("Sweden", doc.evaluate("_source.geo.country_name"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand Down Expand Up @@ -191,9 +193,16 @@ void checkDatabases(ClusterState state) {
return;
}

IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(GeoIpDownloader.DATABASES_INDEX);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
IndexAbstraction databasesAbstraction = state.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (databasesAbstraction == null) {
return;
} else {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
IndexRoutingTable databasesIndexRT = state.getRoutingTable().index(databasesIndex);
if (databasesIndexRT == null || databasesIndexRT.allPrimaryShardsActive() == false) {
return;
}
}

PersistentTasksCustomMetadata.PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskState;
Expand Down Expand Up @@ -172,18 +174,19 @@ private void stopTask(Runnable onFailure) {
}
}
);
persistentTasksService.sendRemoveRequest(
GEOIP_DOWNLOADER,
ActionListener.runAfter(
listener,
() -> client.admin().indices().prepareDelete(DATABASES_INDEX).execute(ActionListener.wrap(rr -> {}, e -> {
persistentTasksService.sendRemoveRequest(GEOIP_DOWNLOADER, ActionListener.runAfter(listener, () -> {
IndexAbstraction databasesAbstraction = clusterService.state().metadata().getIndicesLookup().get(DATABASES_INDEX);
if (databasesAbstraction != null) {
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
Index databasesIndex = databasesAbstraction.getWriteIndex();
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> {
Throwable t = e instanceof RemoteTransportException ? e.getCause() : e;
if (t instanceof ResourceNotFoundException == false) {
logger.warn("failed to remove " + DATABASES_INDEX, e);
logger.warn("failed to remove " + databasesIndex, e);
}
}))
)
);
}));
}
}));
}

public GeoIpDownloader getCurrentTask() {
Expand Down

0 comments on commit cfe1f11

Please sign in to comment.