Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Persistent Tasks implementations version and feature aware #31045

Merged
merged 26 commits into from
Jun 3, 2018
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e1fbfaf
Introduce client feature tracking
jasontedor May 30, 2018
82ed4d4
Remove irrelevant lines from test
jasontedor Jun 1, 2018
6d529fe
Add comment on use of TreeSet
jasontedor Jun 1, 2018
66f2ea9
Remove unneeded parser in test
jasontedor Jun 1, 2018
25a861c
Remove unneeded fields
jasontedor Jun 1, 2018
0b39ce9
Guard feature de-serialization by version
jasontedor Jun 1, 2018
26071ff
Add Javadocs
jasontedor Jun 1, 2018
8be176c
Randomly add feature to test
jasontedor Jun 1, 2018
1f66829
Change prefix of feature setting
jasontedor Jun 1, 2018
4b87ed8
add version and feature support to persistent tasks
bleskes Jun 1, 2018
aeee8ff
add version and feature support to persistent tasks
bleskes Jun 1, 2018
18a9a6f
add version and feature support to persistent tasks
bleskes Jun 1, 2018
35463b3
unit tests
bleskes Jun 1, 2018
dbc5076
Merge remote-tracking branch 'upstream/master' into persistent_tasks_…
bleskes Jun 1, 2018
1a31b99
Merge remote-tracking branch 'upstream/master' into persistent_tasks_…
bleskes Jun 2, 2018
e6e5413
require params
bleskes Jun 2, 2018
5e6de98
fix tests
bleskes Jun 2, 2018
e481804
Merge branch 'master' into persistent_tasks_features
bleskes Jun 2, 2018
ce99bbe
java docs
bleskes Jun 2, 2018
1f83926
feedback
bleskes Jun 2, 2018
5bc8a0a
min compat version
bleskes Jun 3, 2018
9870fdd
compatibleFutureVersion
bleskes Jun 3, 2018
7b82a5b
inline
bleskes Jun 3, 2018
2aa7a85
v_5_4_0 back
bleskes Jun 3, 2018
ef015fa
V_5.x -> minimum compat
bleskes Jun 3, 2018
307b714
fix testSnapshotDeletionsInProgressSerialization
bleskes Jun 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -122,7 +122,7 @@ default Optional<String> getRequiredFeature() {
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
Expand Down Expand Up @@ -748,13 +748,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@

package org.elasticsearch.cluster;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;

/**
* Diff that also support NamedWriteable interface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could should be modified to VersionedNamedWriteable now.

*/
public interface NamedDiffable<T> extends Diffable<T>, NamedWriteable {
/**
* The minimal version of the recipient this custom object can be sent to
*/
default Version getMinimalSupportedVersion() {
return Version.CURRENT.minimumIndexCompatibilityVersion();
}
public interface NamedDiffable<T> extends Diffable<T>, VersionedNamedWriteable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -382,6 +383,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_0_0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unfortunate that this has to be littered everywhere and anyway it will become a lie when we remove the 5.x constants from the codebase on some future cleanup. Why not use Version#minimumCompatibilityVersion as a default implementation on the interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt that people should really think about these things and not just rely on defaults. Another aspect is that this change currently means it is littered everywhere in this PR but when people introduce a new VersionedNamedWritable the have to set something can't use a default. When 5.x is phased out we can replace it with Version#minimumCompatibilityVersion (I can do so now if you want) but I think it should be done on a case by case basis. Defaults are dangerous here imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Let's go with Version#minimumCompatibilityVersion everywhere now though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced all occurrences of V5_0_0. Let me know if you want the other 5.x replaced as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's get those here too.

}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Custom.class, TYPE, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about littering.

return Version.V_5_0_0;
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(Custom.class, TYPE, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.ParseField;
Expand All @@ -34,8 +35,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -44,7 +43,6 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* A collection of tombstones for explicitly marking indices as deleted in the cluster state.
Expand Down Expand Up @@ -97,6 +95,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about littering.

return Version.V_5_0_0;
}

@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.API_AND_GATEWAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,13 +786,13 @@ public void writeTo(StreamOutput out) throws IOException {
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
if (FeatureAware.shouldSerialize(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
Expand Down Expand Up @@ -103,6 +104,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about littering.

return Version.V_5_0_0;
}

public RepositoriesMetaData(StreamInput in) throws IOException {
RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()];
for (int i = 0; i < repository.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.io.stream;

import org.elasticsearch.Version;

/**
* A {@link NamedWriteable} that has a minimum version associated with it.
*/
public interface VersionedNamedWriteable extends NamedWriteable {

/**
* Returns the name of the writeable object
*/
String getWriteableName();

/**
* The minimal version of the recipient this object can be sent to
*/
Version getMinimalSupportedVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
Expand Down Expand Up @@ -69,6 +70,11 @@ public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about littering.

return Version.V_5_0_0;
}

public Map<String, PipelineConfiguration> getPipelines() {
return pipelines;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public <Params extends PersistentTaskParams> void executeTask(@Nullable Params params,
public <Params extends PersistentTaskParams> void executeTask(Params params,
@Nullable Task.Status status,
AllocatedPersistentTask task,
PersistentTasksExecutor<Params> executor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package org.elasticsearch.persistent;

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends NamedWriteable, ToXContentObject {
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware {

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
Expand Down Expand Up @@ -65,7 +64,7 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
* @param taskParams the task's parameters
* @param listener the listener that will be called when task is started
*/
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, @Nullable Params taskParams,
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String taskName, Params taskParams,
ActionListener<PersistentTask<?>> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
Expand Down Expand Up @@ -225,7 +224,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
* @return a new {@link Assignment}
*/
private <Params extends PersistentTaskParams> Assignment createAssignment(final String taskName,
final @Nullable Params taskParams,
final Params taskParams,
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -190,7 +190,7 @@ public long getNumberOfTasksOnNode(String nodeId, String taskName) {

@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_4_0;
return Version.V_6_3_0;
}

@Override
Expand Down Expand Up @@ -264,7 +264,6 @@ public static class PersistentTask<P extends PersistentTaskParams> implements Wr
private final String id;
private final long allocationId;
private final String taskName;
@Nullable
private final P params;
@Nullable
private final Status status;
Expand Down Expand Up @@ -314,7 +313,11 @@ public PersistentTask(StreamInput in) throws IOException {
id = in.readString();
allocationId = in.readLong();
taskName = in.readString();
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
params = (P) in.readNamedWriteable(PersistentTaskParams.class);
} else {
params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class);
}
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
Expand All @@ -325,7 +328,11 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeLong(allocationId);
out.writeString(taskName);
out.writeOptionalNamedWriteable(params);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeNamedWriteable(params);
} else {
out.writeOptionalNamedWriteable(params);
}
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
Expand Down Expand Up @@ -500,7 +507,10 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(lastAllocationId);
out.writeMap(tasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
Map<String, PersistentTask<?>> filteredTasks = tasks.values().stream()
.filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams()))
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
}

public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Predicate;
Expand Down Expand Up @@ -118,7 +118,7 @@ protected String getDescription(PersistentTask<Params> taskInProgress) {
* NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
* indicate that the persistent task has finished.
*/
protected abstract void nodeOperation(AllocatedPersistentTask task, @Nullable Params params, @Nullable Task.Status status);
protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status);

public String getExecutor() {
return executor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
Expand Down Expand Up @@ -69,7 +68,7 @@ public PersistentTasksService(Settings settings, ClusterService clusterService,
*/
public <Params extends PersistentTaskParams> void sendStartRequest(final String taskId,
final String taskName,
final @Nullable Params taskParams,
final Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked")
final ActionListener<PersistentTask<?>> wrappedListener =
Expand Down
Loading