Skip to content

Commit

Permalink
Backporting RTF
Browse files Browse the repository at this point in the history
Backporting pull requests opensearch-project#2089 and opensearch-project#3982

Signed-off-by: PritLadani <pritkladani@gmail.com>
  • Loading branch information
PritLadani committed Sep 6, 2022
1 parent 6cabc6a commit 20c43cd
Show file tree
Hide file tree
Showing 47 changed files with 2,432 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> resourceStats = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -141,6 +142,14 @@ public Map<String, Object> getStatus() {
return status;
}

void setResourceStats(Map<String, Object> resourceStats) {
this.resourceStats.putAll(resourceStats);
}

public Map<String, Object> getResourceStats() {
return resourceStats;
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -160,6 +169,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancellable, new ParseField("cancellable"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -177,7 +187,8 @@ && isCancellable() == taskInfo.isCancellable()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
}

@Override
Expand All @@ -192,7 +203,8 @@ public int hashCode() {
isCancellable(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getResourceStats()
);
}

Expand Down Expand Up @@ -222,6 +234,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", resource_stats="
+ resourceStats
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.TaskResourceStats;
import org.opensearch.tasks.TaskResourceUsage;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -57,7 +60,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats"))
.test();
}

Expand Down Expand Up @@ -94,7 +97,19 @@ static TaskInfo randomTaskInfo() {
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
: Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5));
return new TaskInfo(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, parentTaskId, headers);
return new TaskInfo(
taskId,
type,
action,
description,
status,
startTime,
runningTimeNanos,
cancellable,
parentTaskId,
headers,
randomResourceStats()
);
}

private static TaskId randomTaskId() {
Expand All @@ -114,4 +129,14 @@ private static RawTaskStatus randomRawTaskStatus() {
throw new IllegalStateException(e);
}
}

private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
randomIntBetween(5, 10),
false,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public void testRethrottleSuccessfulResponse() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down Expand Up @@ -165,7 +166,8 @@ public void testRethrottleWithSomeSucceeded() {
0,
true,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ public void onTaskUnregistered(Task task) {}

@Override
public void waitForTaskCompletion(Task task) {}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}
});
}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
Expand Down Expand Up @@ -659,6 +662,9 @@ public void waitForTaskCompletion(Task task) {
waitForWaitingToStart.countDown();
}

@Override
public void taskExecutionStarted(Task task, Boolean closeableInvoked) {}

@Override
public void onTaskRegistered(Task task) {}

Expand Down Expand Up @@ -901,7 +907,19 @@ public void testNodeNotFoundButTaskFound() throws Exception {
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(new TaskId("fake", 1), "test", "test", "", null, 0, 0, false, TaskId.EMPTY_TASK_ID, Collections.emptyMap()),
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
new ActionListener<Void>() {
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_1_3_3 = new Version(1030399, org.apache.lucene.util.Version.LUCENE_8_10_1);
public static final Version V_1_3_4 = new Version(1030499, org.apache.lucene.util.Version.LUCENE_8_10_1);
public static final Version V_1_3_5 = new Version(1030599, org.apache.lucene.util.Version.LUCENE_8_10_1);
public static final Version V_2_0_0 = new Version(2000099, org.apache.lucene.util.Version.LUCENE_8_10_1); // TODO: Need to change the
// lucene version to
// LUCENE_9_1_0
public static final Version CURRENT = V_1_3_5;

public static Version readVersion(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -60,8 +61,15 @@ public static long waitForCompletionTimeout(TimeValue timeout) {

private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);

private final TaskResourceTrackingService taskResourceTrackingService;

@Inject
public TransportListTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
public TransportListTasksAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
TaskResourceTrackingService taskResourceTrackingService
) {
super(
ListTasksAction.NAME,
clusterService,
Expand All @@ -72,6 +80,7 @@ public TransportListTasksAction(ClusterService clusterService, TransportService
TaskInfo::new,
ThreadPool.Names.MANAGEMENT
);
this.taskResourceTrackingService = taskResourceTrackingService;
}

@Override
Expand Down Expand Up @@ -101,6 +110,8 @@ protected void processTasks(ListTasksRequest request, Consumer<Task> operation)
}
taskManager.waitForTaskCompletion(task, timeoutNanos);
});
} else {
operation = operation.andThen(taskResourceTrackingService::refreshResourceStats);
}
super.processTasks(request, operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public SearchShardTask(long id, String type, String action, String description,
super(id, type, action, description, parentTaskId, headers);
}

@Override
public boolean supportsResourceTracking() {
return true;
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public final String getDescription() {
return descriptionSupplier.get();
}

@Override
public boolean supportsResourceTracking() {
return true;
}

/**
* Attach a {@link SearchProgressListener} to this task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.ActionResponse;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.tasks.TaskId;
Expand Down Expand Up @@ -88,31 +89,39 @@ public final Task execute(Request request, ActionListener<Response> listener) {
*/
final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
final Task task;

try {
task = taskManager.register("transport", actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);

ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(response);
}
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(e);
}
}
}
});
});
} finally {
storedContext.close();
}

return task;
}

Expand All @@ -129,25 +138,30 @@ public final Task execute(Request request, TaskListener<Response> listener) {
unregisterChildNode.close();
throw e;
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
ThreadContext.StoredContext storedContext = taskManager.taskExecutionStarted(task);
try {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onResponse(task, response);
}
}
}

@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
} finally {
listener.onFailure(task, e);
}
}
}
});
});
} finally {
storedContext.close();
}
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -394,6 +395,7 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Loading

0 comments on commit 20c43cd

Please sign in to comment.