Skip to content

Commit

Permalink
Don't download geoip databases if geoip system index is blocked. (#86842
Browse files Browse the repository at this point in the history
) (#86893)

For example in the case that the a cluster is running out of disk
space and indices reject writes.

This can otherwise load to unnecessary error logs being printed and
just add to more instability. Instead, the GeoIpDownloader should
just try to download the files the next it runs.
  • Loading branch information
martijnvg committed May 18, 2022
1 parent ad4d7bf commit 3585a6a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 6 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86842.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86842
summary: Don't download geoip databases if geoip system index is blocked
area: Ingest
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -126,6 +128,18 @@ public void setPollInterval(TimeValue pollInterval) {

// visible for testing
void updateDatabases() throws IOException {
var clusterState = clusterService.state();
var geoipIndex = clusterState.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
if (geoipIndex != null) {
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
throw new ElasticsearchException("not all primary shards of [" + DATABASES_INDEX + "] index are active");
}
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
if (blockException != null) {
throw blockException;
}
}

logger.debug("updating geoip databases");
List<Map<String, Object>> response = fetchDatabasesOverview();
for (Map<String, Object> res : response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,11 @@ private String mockSearches(String databaseName, int firstChunk, int lastChunk)
return MessageDigests.toHexString(md.digest());
}

private static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) {
static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata) {
return createClusterState(tasksCustomMetadata, false);
}

static ClusterState createClusterState(PersistentTasksCustomMetadata tasksCustomMetadata, boolean noStartedShards) {
boolean aliasGeoipDatabase = randomBoolean();
String indexName = aliasGeoipDatabase
? GeoIpDownloader.DATABASES_INDEX + "-" + randomAlphaOfLength(5)
Expand All @@ -376,9 +380,11 @@ private static ClusterState createClusterState(PersistentTasksCustomMetadata tas
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")
);
String nodeId = ESTestCase.randomAlphaOfLength(8);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)).addShard(
shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize()).moveToStarted()
).build();
shardRouting = shardRouting.initialize(nodeId, null, shardRouting.getExpectedShardSize());
if (noStartedShards == false) {
shardRouting = shardRouting.moveToStarted();
}
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(new ShardId(index, 0)).addShard(shardRouting).build();
return ClusterState.builder(new ClusterName("name"))
.metadata(Metadata.builder().putCustom(TYPE, tasksCustomMetadata).put(idxMeta))
.nodes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.ingest.geoip;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -22,13 +23,16 @@
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
Expand All @@ -52,10 +56,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import static org.elasticsearch.ingest.geoip.DatabaseNodeServiceTests.createClusterState;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.ENDPOINT_SETTING;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.MAX_CHUNK_SIZE;
import static org.elasticsearch.tasks.TaskId.EMPTY_TASK_ID;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class GeoIpDownloaderTests extends ESTestCase {
Expand All @@ -77,7 +84,7 @@ public void setup() {
Set.of(GeoIpDownloader.ENDPOINT_SETTING, GeoIpDownloader.POLL_INTERVAL_SETTING, GeoIpDownloaderTaskExecutor.ENABLED_SETTING)
)
);
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
when(clusterService.state()).thenReturn(state);
client = new MockClient(threadPool);
geoIpDownloader = new GeoIpDownloader(
Expand Down Expand Up @@ -455,6 +462,38 @@ void processDatabase(Map<String, Object> databaseInfo) {
assertFalse(it.hasNext());
}

public void testUpdateDatabasesWriteBlock() {
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()));
var geoIpIndex = state.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex().getName();
state = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
var e = expectThrows(ClusterBlockException.class, () -> geoIpDownloader.updateDatabases());
assertThat(
e.getMessage(),
equalTo(
"index ["
+ geoIpIndex
+ "] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, "
+ "index has read-only-allow-delete block];"
)
);
verifyNoInteractions(httpClient);
}

public void testUpdateDatabasesIndexNotReady() {
ClusterState state = createClusterState(new PersistentTasksCustomMetadata(1L, Map.of()), true);
var geoIpIndex = state.getMetadata().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX).getWriteIndex().getName();
state = ClusterState.builder(state)
.blocks(new ClusterBlocks.Builder().addIndexBlock(geoIpIndex, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.build();
when(clusterService.state()).thenReturn(state);
var e = expectThrows(ElasticsearchException.class, () -> geoIpDownloader.updateDatabases());
assertThat(e.getMessage(), equalTo("not all primary shards of [.geoip_databases] index are active"));
verifyNoInteractions(httpClient);
}

private static class MockClient extends NoOpClient {

private final Map<ActionType<?>, BiConsumer<? extends ActionRequest, ? extends ActionListener<?>>> handlers = new HashMap<>();
Expand Down

0 comments on commit 3585a6a

Please sign in to comment.