Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/138228.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 138228
summary: "Fix: Downsample returns appropriate error when target index gets deleted\
\ unexpectedly."
area: Downsampling
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc

/**
* This is the cluster state task executor for cluster state update actions.
* Visible for testing
*/
private static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR =
new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState)
throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}
static final SimpleBatchedExecutor<DownsampleClusterStateUpdateTask, Void> STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() {
@Override
public Tuple<ClusterState, Void> executeTask(DownsampleClusterStateUpdateTask task, ClusterState clusterState) throws Exception {
return Tuple.tuple(task.execute(clusterState), null);
}

@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};
@Override
public void taskSucceeded(DownsampleClusterStateUpdateTask task, Void unused) {
task.listener.onResponse(AcknowledgedResponse.TRUE);
}
};

@Inject
public TransportDownsampleAction(
Expand Down Expand Up @@ -1106,7 +1105,6 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
listener.onFailure(e);
}

Expand Down Expand Up @@ -1161,6 +1159,11 @@ public ClusterState execute(ClusterState currentState) {
logger.debug("Updating downsample index status for [{}]", downsampleIndexName);
final ProjectMetadata project = currentState.metadata().getProject(projectId);
final IndexMetadata downsampleIndex = project.index(downsampleIndexName);
if (downsampleIndex == null) {
throw new IllegalStateException(
"Failed to update downsample status because [" + downsampleIndexName + "] does not exist"
);
}
if (IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(downsampleIndex.getSettings()) == DownsampleTaskStatus.SUCCESS) {
return currentState;
}
Expand All @@ -1182,7 +1185,6 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime); // Downsampling has already completed in all shards.
actionListener.onFailure(e);
}

Expand Down Expand Up @@ -1255,8 +1257,8 @@ public void onResponse(final AcknowledgedResponse response) {

@Override
public void onFailure(Exception e) {
recordSuccessMetrics(startTime);
logger.debug("Downsampling measured successfully", e);
recordFailureMetrics(startTime);
logger.debug("Downsampling failure measured successfully", e);
this.actionListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -56,14 +57,14 @@
import org.elasticsearch.xpack.core.downsample.DownsampleShardIndexerStatus;
import org.elasticsearch.xpack.core.downsample.DownsampleShardPersistentTaskState;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -173,12 +174,11 @@ public void setUp() throws Exception {
doAnswer(mockBroadcastResponse).when(indicesAdminClient).refresh(any(), any());
doAnswer(mockBroadcastResponse).when(indicesAdminClient).flush(any(), any());

// Mocks for updating downsampling metadata
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(randomBoolean() ? AcknowledgedResponse.TRUE : AcknowledgedResponse.FALSE);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());

// Mocks for mapping retrieval & merging
when(indicesService.createIndexMapperServiceForValidation(any())).thenReturn(mapperService);
Expand Down Expand Up @@ -219,11 +219,6 @@ public void testDownsampling() {

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
updateTask.listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(taskQueue).submitTask(startsWith("create-downsample-index"), any(), any());
Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
Expand All @@ -243,6 +238,7 @@ public void testDownsampling() {
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand Down Expand Up @@ -273,6 +269,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);
assertSuccessfulUpdateDownsampleStatus(clusterState);

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand All @@ -291,7 +288,7 @@ public void testDownsamplingWithShortCircuitAfterCreation() {
verifyIndexFinalisation();
}

public void testDownsamplingWithShortCircuitDuringCreation() throws IOException {
public void testDownsamplingWithShortCircuitDuringCreation() {
var projectMetadata = ProjectMetadata.builder(projectId)
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
.build();
Expand All @@ -315,6 +312,7 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
)
.build()
);
assertSuccessfulUpdateDownsampleStatus(clusterService.state());

PlainActionFuture<AcknowledgedResponse> listener = new PlainActionFuture<>();
action.masterOperation(
Expand All @@ -333,6 +331,76 @@ public void testDownsamplingWithShortCircuitDuringCreation() throws IOException
verifyIndexFinalisation();
}

public void testDownsamplingWhenTargetIndexGetsDeleted() {
var projectMetadata = ProjectMetadata.builder(projectId)
.put(createSourceIndexMetadata(sourceIndex, primaryShards, replicaShards))
.build();

var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.putProjectMetadata(projectMetadata)
.blocks(ClusterBlocks.builder().addIndexBlock(projectId, sourceIndex, IndexMetadata.INDEX_WRITE_BLOCK))
.build();

when(projectResolver.getProjectMetadata(any(ClusterState.class))).thenReturn(projectMetadata);

Answer<Void> mockPersistentTask = invocation -> {
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = invocation.getArgument(4);
PersistentTasksCustomMetadata.PersistentTask<?> task1 = mock(PersistentTasksCustomMetadata.PersistentTask.class);
when(task1.getId()).thenReturn(randomAlphaOfLength(10));
DownsampleShardPersistentTaskState runningTaskState = new DownsampleShardPersistentTaskState(
DownsampleShardIndexerStatus.COMPLETED,
null
);
when(task1.getState()).thenReturn(runningTaskState);
listener.onResponse(task1);
return null;
};
doAnswer(mockPersistentTask).when(persistentTaskService).sendStartRequest(anyString(), anyString(), any(), any(), any());
doAnswer(mockPersistentTask).when(persistentTaskService).waitForPersistentTaskCondition(any(), anyString(), any(), any(), any());
doAnswer(invocation -> {
var listener = invocation.getArgument(1, TransportDownsampleAction.UpdateDownsampleIndexSettingsActionListener.class);
listener.onResponse(AcknowledgedResponse.TRUE);
return null;
}).when(indicesAdminClient).updateSettings(any(), any());

doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeHandlingResults(
clusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask),
task1 -> {},
TransportDownsampleAction.DownsampleClusterStateUpdateTask::onFailure
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
IllegalStateException error = safeAwaitFailure(
IllegalStateException.class,
AcknowledgedResponse.class,
listener -> action.masterOperation(
task,
new DownsampleAction.Request(
ESTestCase.TEST_REQUEST_TIMEOUT,
sourceIndex,
targetIndex,
TimeValue.ONE_HOUR,
new DownsampleConfig(new DateHistogramInterval("5m"), randomSamplingMethod())
),
clusterState,
listener
)
);
assertThat(
error.getMessage(),
Matchers.startsWith("Failed to update downsample status because [" + targetIndex + "] does not exist")
);
verify(downsampleMetrics, never()).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.FAILED));
verify(indicesAdminClient).refresh(any(), any());
verify(indicesAdminClient, never()).flush(any(), any());
verify(indicesAdminClient, never()).forceMerge(any(), any());
}

private void verifyIndexFinalisation() {
verify(downsampleMetrics).recordOperation(anyLong(), eq(DownsampleMetrics.ActionStatus.SUCCESS));
verify(indicesAdminClient).refresh(any(), any());
Expand Down Expand Up @@ -476,4 +544,21 @@ public void testGetSupportedMetrics() {
assertThat(supported.defaultMetric(), is("max"));
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
}

private void assertSuccessfulUpdateDownsampleStatus(ClusterState clusterState) {
var projectMetadata = ProjectMetadata.builder(clusterState.metadata().getProject(projectId))
.put(createSourceIndexMetadata(targetIndex, primaryShards, replicaShards))
.build();

var updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadata).build();
doAnswer(invocation -> {
var updateTask = invocation.getArgument(1, TransportDownsampleAction.DownsampleClusterStateUpdateTask.class);
ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
updatedClusterState,
TransportDownsampleAction.STATE_UPDATE_TASK_EXECUTOR,
List.of(updateTask)
);
return null;
}).when(taskQueue).submitTask(startsWith("update-downsample-metadata"), any(), any());
}
}