Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce client feature tracking #31020

Merged
merged 11 commits into from Jun 1, 2018
Expand Up @@ -104,6 +104,8 @@ else if (readableBytes >= TcpHeader.HEADER_SIZE) {
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(in);
}
// now we decode the features
in.readStringArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have a version protection here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I pushed 0b39ce9.

// now we can decode the action name
sb.append(", action: ").append(in.readString());
}
Expand Down
Expand Up @@ -98,6 +98,8 @@ public abstract class TransportClient extends AbstractClient {
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);

public static final String TRANSPORT_CLIENT_FEATURE = "transport_client";

private static PluginsService newPluginService(final Settings settings, Collection<Class<? extends Plugin>> plugins) {
final Settings.Builder settingsBuilder = Settings.builder()
.put(TcpTransport.PING_SCHEDULE.getKey(), "5s") // enable by default the transport schedule ping interval
Expand Down Expand Up @@ -130,8 +132,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
+ "." + "transport_client", true).build();
final Settings settings =
Settings.builder()
.put(defaultSettings)
.put(pluginsService.updatedSettings())
.put(TcpTransport.FEATURE_PREFIX + "." + TRANSPORT_CLIENT_FEATURE, true)
.build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
Expand Down
66 changes: 61 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -61,6 +62,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
Expand Down Expand Up @@ -90,7 +92,51 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
/**
* An interface that implementors use when a class requires a client to maybe have a feature.
*/
public interface FeatureAware {

/**
* An optional feature that is required for the client to have.
*
* @return an empty optional if no feature is required otherwise a string representing the required feature
*/
default Optional<String> getRequiredFeature() {
return Optional.empty();
}

/**
* Tests whether or not the custom should be serialized. The criteria are:
* <ul>
* <li>the output stream must be at least the minimum supported version of the custom</li>
* <li>the output stream must have the feature required by the custom (if any) or not be a transport client</li>
* </ul>
* <p>
* That is, we only serialize customs to clients than can understand the custom based on the version of the client and the features
* that the client has. For transport clients we can be lenient in requiring a feature in which case we do not send the custom but
* for connected nodes we always require that the node has the required feature.
*
* @param out the output stream
* @param custom the custom to serialize
* @param <T> the type of the custom
* @return true if the custom should be serialized and false otherwise
*/
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
return false;
}
if (custom.getRequiredFeature().isPresent()) {
final String requiredFeature = custom.getRequiredFeature().get();
// if it is a transport client we are lenient yet for a connected node it must have the required feature
return out.hasFeature(requiredFeature) || out.hasFeature(TransportClient.TRANSPORT_CLIENT_FEATURE) == false;
}
return true;
}

}

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand All @@ -99,6 +145,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
default boolean isPrivate() {
return false;
}

}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -244,6 +291,15 @@ public String toString() {
sb.append("isa_ids ").append(indexMetaData.inSyncAllocationIds(shard)).append("\n");
}
}
if (metaData.customs().isEmpty() == false) {
sb.append("metadata customs:\n");
for (final ObjectObjectCursor<String, MetaData.Custom> cursor : metaData.customs()) {
final String type = cursor.key;
final MetaData.Custom custom = cursor.value;
sb.append(TAB).append(type).append(": ").append(custom);
}
sb.append("\n");
}
sb.append(blocks());
sb.append(nodes());
sb.append(routingTable());
Expand Down Expand Up @@ -691,14 +747,14 @@ public void writeTo(StreamOutput out) throws IOException {
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Expand Up @@ -24,6 +24,8 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterState.FeatureAware;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.cluster.DiffableUtils;
Expand Down Expand Up @@ -117,9 +119,10 @@ public enum XContentContext {
*/
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {

EnumSet<XContentContext> context();

}

public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
Expand Down Expand Up @@ -782,14 +785,14 @@ public void writeTo(StreamOutput out) throws IOException {
}
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
numberOfCustoms++;
}
}
out.writeVInt(numberOfCustoms);
for (ObjectCursor<Custom> cursor : customs.values()) {
if (out.getVersion().onOrAfter(cursor.value.getMinimalSupportedVersion())) {
for (final ObjectCursor<Custom> cursor : customs.values()) {
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
out.writeNamedWriteable(cursor.value);
}
}
Expand Down
Expand Up @@ -58,10 +58,12 @@
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -98,6 +100,7 @@ public abstract class StreamOutput extends OutputStream {
}

private Version version = Version.CURRENT;
private Set<String> features = Collections.emptySet();

/**
* The version of the node on the other side of this stream.
Expand All @@ -113,6 +116,14 @@ public void setVersion(Version version) {
this.version = version;
}

public boolean hasFeature(final String feature) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a java docs with some explanation of what the features are (or a link to where it's explained).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 26071ff.

return this.features.contains(feature);
}

public void setFeatures(final Set<String> features) {
this.features = Collections.unmodifiableSet(new HashSet<>(features));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert it's currently empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrapped this into 26071ff.

}

public long position() throws IOException {
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -379,6 +379,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING,
EsExecutors.PROCESSORS_SETTING,
ThreadContext.DEFAULT_HEADERS_SETTING,
TcpTransport.DEFAULT_FEATURES_SETTING,
Loggers.LOG_DEFAULT_LEVEL_SETTING,
Loggers.LOG_LEVEL_SETTING,
NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING,
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -189,7 +190,7 @@ public long getNumberOfTasksOnNode(String nodeId, String taskName) {

@Override
public Version getMinimalSupportedVersion() {
return Version.V_5_4_0;
return Version.V_6_3_0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot change this. This would mean that a mixed 6.2 x-pack / 6.3 x-pack cluster might drop its persistent tasks on the floor.
Instead I suggest to add another method to custom that says something like featureLessSince which returns an optional version (default is Optional.empty()).
We can then override this method for PersistentTasksCustomMetaData to return Version.V_6_3_0.
Finally we'll make shouldSerializeCustom aware of this new featureLessSince method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with @bleskes. Only revert this to return Version.V_5_4_0;, the rest should be covered by other PRs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a18f166.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can change this - 6.2 with xpack will have problems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a18f166.

}

@Override
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;

/**
Expand All @@ -79,6 +80,17 @@
*/
public abstract class Plugin implements Closeable {

/**
* A feature exposed by the plugin. This should be used if a plugin exposes {@link org.elasticsearch.cluster.ClusterState.Custom} or
* {@link MetaData.Custom}; see also {@link org.elasticsearch.cluster.ClusterState.FeatureAware}.
*
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
* customs
*/
protected Optional<String> getFeature() {
return Optional.empty();
}

/**
* Node level guice modules.
*/
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/elasticsearch/plugins/PluginsService.java
Expand Up @@ -41,8 +41,10 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -57,16 +59,17 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.io.FileSystemUtils.isAccessibleDirectory;

Expand Down Expand Up @@ -196,6 +199,7 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri

public Settings updatedSettings() {
Map<String, String> foundSettings = new HashMap<>();
final Map<String, String> features = new TreeMap<>();
final Settings.Builder builder = Settings.builder();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
Settings settings = plugin.v2().additionalSettings();
Expand All @@ -207,6 +211,23 @@ public Settings updatedSettings() {
}
}
builder.put(settings);
final Optional<String> maybeFeature = plugin.v2().getFeature();
if (maybeFeature.isPresent()) {
final String feature = maybeFeature.get();
if (features.containsKey(feature)) {
final String message = String.format(
Locale.ROOT,
"duplicate feature [%s] in plugin [%s], already added in [%s]",
feature,
plugin.v1().getName(),
features.get(feature));
throw new IllegalArgumentException(message);
}
features.put(feature, plugin.v1().getName());
}
}
for (final String feature : features.keySet()) {
builder.put(TcpTransport.FEATURE_PREFIX + "." + feature, true);
}
return builder.put(this.settings).build();
}
Expand Down