Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to store results for long running tasks #17928

Merged
merged 1 commit into from May 27, 2016

Conversation

Projects
None yet
4 participants
@imotov
Copy link
Member

commented Apr 22, 2016

The results of the tasks are stored in a special index .results

@nik9000

View changes

core/src/main/java/org/elasticsearch/action/support/TransportAction.java Outdated
@@ -246,4 +250,34 @@ public void onFailure(Throwable e) {
listener.onFailure(e);
}
}

private class PersistentActionListener<Response extends ActionResponse> implements ActionListener<Response> {

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

Probably want javadoc for this.

@nik9000

View changes

core/src/main/java/org/elasticsearch/action/support/TransportAction.java Outdated
taskManager.persistResult(task, response, listener);
} catch (Throwable e) {
listener.onFailure(e);
return;

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

Return statements on both of these methods seem not that useful?

@@ -222,6 +298,7 @@ public void removeBan(TaskId parentTaskId) {

@Override
public void clusterChanged(ClusterChangedEvent event) {
lastDiscoveryNodes = event.state().getNodes();

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

This seems important! Like we should have had it before?

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Answering my own question - no - we didn't need it before. I should read more closely next time.

* Stores the task result
*/
public <Response extends ActionResponse> void persistResult(Task task, Response response, ActionListener<Response> listener) {
DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

These two methods feel kind of copy and paste-ish. They aren't really, but when you scan them they look similar.....

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

I still feel like there ought to be a way to make these methods look less copy-and-paste-ish. They just set off my copy-and-paste blindness even though they aren't copied and pasted.

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Sorry, yeah, I think I'll just get over it. Ignore this chain of comments.

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskResult.java Outdated
public TaskResult(TaskInfo taskInfo, Throwable e) throws IOException {
ToXContent.Params params = ToXContent.EMPTY_PARAMS;
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.startObject();

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

Optional: darkon has this style that I like where you start a new block every time you startObject or startArray and it makes these much more readable!

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskResultsService.java Outdated
*/
public class TaskResultsService extends AbstractLifecycleComponent<TaskResultsService> implements ClusterStateListener {

public static final String TASK_RESULT_INDEX = ".results";

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

I hadn't thought about fixing the destination index - I'd thought that that was something the user would specify when they asked to persist the task. Like ?save_result=destidx/desttype(/destid)?.

This comment has been minimized.

Copy link
@imotov

imotov Apr 22, 2016

Author Member

Yes, that would be nice, but we wouldn't be able to use templates then.... Maybe we should switch to manual index creation instead of using templates? I think it might make things less hacky.

@nik9000

View changes

core/src/main/resources/org/elasticsearch/tasks/task-results-index-template.json Outdated
"settings" : {
"number_of_shards" : 1,
"number_of_replicas" : 1,
"index.priority" : 2147483647

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

This seems like a really high priority! I guess it is ok because it should be small?

@nik9000

View changes

core/src/main/resources/org/elasticsearch/tasks/task-results-index-template.json Outdated
"order" : 2147483647,
"settings" : {
"number_of_shards" : 1,
"number_of_replicas" : 1,

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

Both of these seem like something that'd be useful to tune in really esoteric use cases. I argue 2 or 3 replicas is a better default though because we expect the index to be small in the usual use case and we expect people to really want to have their results when they need them.

registerTaskManageListeners(TestTaskPlugin.TestTaskAction.NAME); // we need this to get task id of the process

// Start non-blocking test task
TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).setShouldPersistResult(true).setShouldBlock(false).get();

This comment has been minimized.

Copy link
@nik9000

nik9000 Apr 22, 2016

Contributor

Maybe ActionResponse should have the persisted location/TaskId in it?

This comment has been minimized.

Copy link
@imotov

imotov May 19, 2016

Author Member

I am not sure I see the point. If you get ActionResponse - you already have the result. Why would you ever want to re-read exactly the same result from the index?

This comment has been minimized.

Copy link
@nik9000

nik9000 May 19, 2016

Contributor

I made that comment when I was thinking of the feature in a broader, maybe misguided sense. It isn't valid given the conversation we had about this only being a thing with wait_for_completion=false.

@nik9000

This comment has been minimized.

Copy link
Contributor

commented Apr 22, 2016

Left some small things and one big one: I think that users should be able to specify where the task is persisted. I think instead of a template for the index we should document something and say "we think this is how most task persistence should be shaped, but it is up to you". Basically I want the index to be "the user's index" rather than ours. After all, they have to clean it up over time and, hell, maybe they want to persist some types of things into a different index.

@imotov imotov removed the review label Apr 22, 2016

@imotov

This comment has been minimized.

Copy link
Member Author

commented Apr 22, 2016

After some additional discussions with @nik9000, we decided to require users to specify an index name if they want to save the result. In order to accommodate that we will need to create the index and mapping explicitly if they don't exist instead of relying on the index template.

@clintongormley

This comment has been minimized.

Copy link
Member

commented May 7, 2016

After some additional discussions with @nik9000, we decided to require users to specify an index name if they want to save the result. In order to accommodate that we will need to create the index and mapping explicitly if they don't exist instead of relying on the index template.

I'm not sure I agree with requiring a custom index here. Things should work out of the box. The task mgmt API should be able to fetch the status of finished tasks from the index directly, in which case it needs to know which index to talk to.

Let's talk about this when @imotov is back.

@nik9000

This comment has been minimized.

Copy link
Contributor

commented May 7, 2016

Let's talk about this when @imotov is back.

Yeah! I'm going to dump my reasoning here for later in case I forget!

My problem with storing task results into a special index is that it feel a bit too magical. Especially if we automatically save the results for all ?wait_for_completion=false stuff into an index. Triple especially if we have the tasks API automatically GET from the index if the task is done. That index will grow unchecked and we'll have to introduce automatic deletes for old task results and and and. I'm worried about it, ok?

To me, forcing the user to write something like ?save=index/type will make them own the index. Sure, we'll create the index for them if it isn't created and we'll apply some sensible default mapping or something, but we create indexes on the fly when users write to them all the time. It'll be the user's index. They named it, they manage it. Stuff only gets saved to it when they ask for it explicitly.

I also like making ?save explicit because you can do it without ?wait_for_completion=false. Like you could ?save .1% of searches. Or you could ?save every reindex just so you have a carbon copy replica of it. It seems safe because it isn't automatic and the we're saving before we return the request to the user so they see the slowdown it causes.

@imotov imotov force-pushed the imotov:store-results-of-long-running-tasks branch May 19, 2016

@imotov

This comment has been minimized.

Copy link
Member Author

commented May 20, 2016

@nik9000 I have pushed changes that reflects our recent discussion with @clintongormley. Could you take another look when you have a chance?

@imotov imotov added the review label May 23, 2016

@imotov imotov referenced this pull request May 23, 2016

Closed

Task Management #15117

12 of 12 tasks complete
@nik9000

View changes

core/src/main/java/org/elasticsearch/action/support/TransportAction.java Outdated
try {
taskManager.persistResult(task, e, listener);
} catch (Throwable e1) {
listener.onFailure(e1);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

s/listener/delegate/? I read this and immediately thought "infinite loop!" because this thing already is a listener. I know it is silly though.

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/Task.java Outdated
@@ -132,4 +135,16 @@ public Status getStatus() {
}

public interface Status extends ToXContent, NamedWriteable {}

public TaskResult result(DiscoveryNode node, Throwable error) throws IOException {
return new TaskResult(taskInfo(node, false), error);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Why not true? I think it'd be nice to have the detailed info. Actually I pretty much always want it all the time.

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/Task.java Outdated
if (response instanceof ToXContent) {
return new TaskResult(taskInfo(node, false), (ToXContent) response);
} else {
throw new IllegalStateException("response has to implement ToXContent for persistence");

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

This just ends up in the log if wait_for_completion=false, right? I mean, I want to make sure it ends up in there at ERROR level or something, because I think it is always a bug, right?

This comment has been minimized.

Copy link
@imotov

imotov May 23, 2016

Author Member

Cannot log here because I don't have access to the logger on the task level, but I will fix logging in the caller to log this exception properly.

@@ -133,6 +133,10 @@ public final void execute(Task task, Request request, ActionListener<Response> l
return;
}

if (task != null && request.getShouldPersistResult()) {
listener = new PersistentActionListener<>(taskManager, task, listener);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

I wonder if we can assert here that the response will always implement ToXContent. I think assert action.newResponse() instanceof ToXContent will work?

This comment has been minimized.

Copy link
@imotov

imotov May 23, 2016

Author Member

newResponse is not defined on the TransportAction level, it is only defined for some specific Actions such as TransportMasterNodeAction. So, I don't think I can really implement this assert here.

DiscoveryNode localNode = lastDiscoveryNodes.getLocalNode();
if (localNode == null) {
// too early to persist anything, shouldn't really be here - just pass the error along
listener.onFailure(error);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Maybe log a WARN or something?

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskManager.java Outdated
try {
taskResult = task.result(localNode, error);
} catch (IOException ex) {
logger.warn("couldn't persist error ", error);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Maybe you should includes ex in the WARN here and still pass the original error along? Or stick the original error in addSuppressedException?

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskResult.java Outdated
*/
public class TaskResult {

private final XContentType contentType = Requests.INDEX_CONTENT_TYPE;

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Is this just a constant? Maybe we don't need the field at all?

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskResultsService.java Outdated
private Settings taskResultIndexSettings() {
return Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

"auto_expand_replicas": 2 instead? That way you don't end up with a yellow cluster on one node and you end up with a fairly large amount of paranoia if you have three nodes? I'm ambivalent for the 2 vs 1 replica thing but I like auto_expand_replicas here.

"cancellable": {
"type": "boolean"
},
"id": {

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Maybe we don't need the id at all here? It is part of the document's id, right?

This comment has been minimized.

Copy link
@imotov

imotov May 23, 2016

Author Member

id is a part of a standard TaskInfo serialization. I was initially thinking about creating a custom serialization of TaskInfo for persistence and removing id from it, but then I thought that it might be useful here since it's a long field, which can be useful for sorting since it provides a nice, unambiguous ordering of tasks within a node.

"id": {
"type": "long"
},
"parent_id": {

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

We're not enabling parent/child on the index so this might be confusing. It is the right name for it, but it still might be confusing.

This comment has been minimized.

Copy link
@imotov

imotov May 23, 2016

Author Member

Same here. This field is just a part of TaskInfo serialization.

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/TaskResult.java Outdated
taskId = taskInfo.getTaskId();
}

public TaskResult(TaskInfo taskInfo, String string) throws IOException {

This comment has been minimized.

Copy link
@nik9000

nik9000 May 23, 2016

Contributor

Unused maybe remove it?

@nik9000

This comment has been minimized.

Copy link
Contributor

commented May 23, 2016

I left a few points that are open for discussion but I think it is basically done. Sorry to take so long before reviewing!

@imotov

This comment has been minimized.

Copy link
Member Author

commented May 23, 2016

Pushed changes to address the latest comments and suggestions.

@nik9000

This comment has been minimized.

Copy link
Contributor

commented May 24, 2016

Pushed changes to address the latest comments and suggestions.

I'll read again soon!

@nik9000

View changes

core/src/main/java/org/elasticsearch/tasks/Task.java Outdated

public TaskResult result(DiscoveryNode node, ActionResponse response) throws IOException {
if (response instanceof ToXContent) {
return new TaskResult(taskInfo(node, false), (ToXContent) response);

This comment has been minimized.

Copy link
@nik9000

nik9000 May 24, 2016

Contributor

This one is false but the one above is true. I'd prefer both to be true.

This comment has been minimized.

Copy link
@imotov

imotov May 24, 2016

Author Member

Thanks! That's a typo. It should be true in all cases.

@nik9000

This comment has been minimized.

Copy link
Contributor

commented May 24, 2016

I made a comment about one last thing I'd like to change but otherwise LGTM.

Add ability to store results for long running tasks
The results of the tasks are stored in a special index .results

@imotov imotov force-pushed the imotov:store-results-of-long-running-tasks branch to fb763c1 May 27, 2016

@imotov imotov merged commit fb763c1 into elastic:master May 27, 2016

1 check passed

CLA Commit author is a member of Elasticsearch
Details
@clintongormley

This comment has been minimized.

Copy link
Member

commented Jun 1, 2016

@imotov we need documentation about how to clean out the .results index please

@Mpdreamz

This comment has been minimized.

Copy link
Member

commented Jun 17, 2016

Do we have plans to backport this to 2.x ?

@imotov

This comment has been minimized.

Copy link
Member Author

commented Jun 18, 2016

@Mpdreamz no, not at the moment. Do you have a particular need for this to be backported to 2.x?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.