Skip to content

Commit

Permalink
Refactor tasks to improve APM support (#87917)
Browse files Browse the repository at this point in the history
Part of #84369. Split out from #87696. Rework how some work is executed
by creating child tasks for them, so that when traced by APM, it results
in more meaningful parent and child tasks in the UI. It also improves
how Elasticsearch is modelling the work.
  • Loading branch information
pugnascotia committed Jul 5, 2022
1 parent 2309eb2 commit ca7c21f
Show file tree
Hide file tree
Showing 49 changed files with 640 additions and 225 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.indices.recovery;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.stream.StreamSupport;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TaskRecoveryIT extends ESIntegTestCase {

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TaskRecoveryIT.EngineTestPlugin.class);
}

/**
* Checks that the parent / child task hierarchy is correct for tasks that are initiated by a recovery task.
* We use an engine plugin that stalls translog recovery, which gives us the opportunity to inspect the
* task hierarchy.
*/
public void testTaskForOngoingRecovery() throws Exception {
String indexName = "test";
internalCluster().startMasterOnlyNode();
String nodeWithPrimary = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.include._name", nodeWithPrimary)
)
);
try {
String nodeWithReplica = internalCluster().startDataOnlyNode();

// Create an index so that there is something to recover
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", nodeWithPrimary + "," + nodeWithReplica)
)
);
// Translog recovery is stalled, so we can inspect the running tasks.
assertBusy(() -> {
List<TaskInfo> primaryTasks = client().admin()
.cluster()
.prepareListTasks(nodeWithPrimary)
.setActions(PeerRecoverySourceService.Actions.START_RECOVERY)
.get()
.getTasks();
assertThat("Expected a single primary task", primaryTasks.size(), equalTo(1));
List<TaskInfo> replicaTasks = client().admin()
.cluster()
.prepareListTasks(nodeWithReplica)
.setActions(PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG)
.get()
.getTasks();
assertThat("Expected a single replica task", replicaTasks.size(), equalTo(1));
assertThat(
"Replica task's parent task ID was incorrect",
replicaTasks.get(0).parentTaskId(),
equalTo(primaryTasks.get(0).taskId())
);
});
} finally {
// Release the EngineTestPlugin, which will allow translog recovery to complete
StreamSupport.stream(internalCluster().getInstances(PluginsService.class).spliterator(), false)
.flatMap(ps -> ps.filterPlugins(EnginePlugin.class).stream())
.map(EngineTestPlugin.class::cast)
.forEach(EngineTestPlugin::release);
}
ensureGreen(indexName);
}

/**
* An engine plugin that defers translog recovery until the engine is released via {@link #release()}.
*/
public static class EngineTestPlugin extends Plugin implements EnginePlugin {
private final CountDownLatch latch = new CountDownLatch(1);

public void release() {
latch.countDown();
}

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
return Optional.of(config -> new InternalEngine(config) {

@Override
public void skipTranslogRecovery() {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
super.skipTranslogRecovery();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.service.BatchSummary;
import org.elasticsearch.tasks.Task;

/**
* Represents a cluster state update computed by the {@link org.elasticsearch.cluster.service.MasterService} for publication to the cluster.
Expand All @@ -24,6 +25,7 @@ public class ClusterStatePublicationEvent {
private final BatchSummary summary;
private final ClusterState oldState;
private final ClusterState newState;
private final Task task;
private final long computationTimeMillis;
private final long publicationStartTimeMillis;
private volatile long publicationContextConstructionElapsedMillis = NOT_SET;
Expand All @@ -35,12 +37,14 @@ public ClusterStatePublicationEvent(
BatchSummary summary,
ClusterState oldState,
ClusterState newState,
Task task,
long computationTimeMillis,
long publicationStartTimeMillis
) {
this.summary = summary;
this.oldState = oldState;
this.newState = newState;
this.task = task;
this.computationTimeMillis = computationTimeMillis;
this.publicationStartTimeMillis = publicationStartTimeMillis;
}
Expand All @@ -57,6 +61,10 @@ public ClusterState getNewState() {
return newState;
}

public Task getTask() {
return task;
}

public long getComputationTimeMillis() {
return computationTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.TransportException;
Expand Down Expand Up @@ -284,6 +285,7 @@ public class PublicationContext extends AbstractRefCounted {
private final DiscoveryNodes discoveryNodes;
private final ClusterState newState;
private final ClusterState previousState;
private final Task task;
private final boolean sendFullVersion;

// All the values of these maps have one ref for the context (while it's open) and one for each in-flight message.
Expand All @@ -294,6 +296,7 @@ public class PublicationContext extends AbstractRefCounted {
discoveryNodes = clusterStatePublicationEvent.getNewState().nodes();
newState = clusterStatePublicationEvent.getNewState();
previousState = clusterStatePublicationEvent.getOldState();
task = clusterStatePublicationEvent.getTask();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
}

Expand Down Expand Up @@ -421,10 +424,11 @@ private void sendClusterState(
return;
}
try {
transportService.sendRequest(
transportService.sendChildRequest(
destination,
PUBLISH_STATE_ACTION_NAME,
new BytesTransportRequest(bytes, destination.getVersion()),
task,
STATE_REQUEST_OPTIONS,
new ActionListenerResponseHandler<>(
ActionListener.runAfter(listener, bytes::decRef),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;

public class ClusterService extends AbstractLifecycleComponent {
Expand Down Expand Up @@ -55,11 +56,11 @@ public class ClusterService extends AbstractLifecycleComponent {

private RerouteService rerouteService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, TaskManager taskManager) {
this(
settings,
clusterSettings,
new MasterService(settings, clusterSettings, threadPool),
new MasterService(settings, clusterSettings, threadPool, taskManager),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
);
}
Expand Down Expand Up @@ -262,5 +263,4 @@ public <T extends ClusterStateTaskListener> void submitStateUpdateTask(
) {
masterService.submitStateUpdateTask(source, task, config, executor);
}

}

0 comments on commit ca7c21f

Please sign in to comment.