Skip to content

Commit

Permalink
Allow registering compatible handlers (#64423)
Browse files Browse the repository at this point in the history
Adding an infrastructure to allow for registration of Compatible Handlers.
Compatible handlers are RestHandlers used for handling rest request from old version clients ( CURRENT-1 version). They might be registered under an endpoint that was removed or changed in CURRENT version (different path, method or an endpoint completely removed).
But they also can be registered under the same endpoint (same path, method as the RestHandler in CURRENT)
RestHandler's endpoint is at the moment 2dimensional - a method and a path.

This PR adds a 3rd dimension - a version.

Registration:
RestHandler declares a new compatibleWithVersion method, which will be overridden by Compatible Handlers and returning a Version.CURRENT -1. By default the method returns Version.CURRENT
compatibleWithVersion is used when iterating over handlers within RestController#registerHandler. The returned value is used to set a version on MethodHandlers

Lookup:
An interface CompatibleVersion is introduced in order to abstract a logic to calculate a compatible version requested by a user.
It is not implemented in this PR. A simplified, always returning Version.CURRENT implementation is used.
Within RestController, a version is calculated with the use of CompatibleVersion, then the lookup for MethodHandlers is performed (the logic is the same)
Once it is find, an additional lookup for a RestHandler for requested version is made.

The requested version has to be also passed down to XContentBuilder in order to allow for per version serialisation logic

relates #51816
  • Loading branch information
pgomulka committed Nov 16, 2020
1 parent c2864e3 commit 618d8bc
Show file tree
Hide file tree
Showing 14 changed files with 389 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
*/
public final class XContentBuilder implements Closeable, Flushable {

private byte compatibleMajorVersion;

/**
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
* <p>
Expand Down Expand Up @@ -1004,6 +1006,25 @@ public XContentBuilder copyCurrentStructure(XContentParser parser) throws IOExce
return this;
}

/**
* Sets a version used for serialising a response compatible with a previous version.
*/
public XContentBuilder withCompatibleMajorVersion(byte compatibleMajorVersion) {
assert this.compatibleMajorVersion == 0 : "Compatible version has already been set";
if (compatibleMajorVersion == 0) {
throw new IllegalArgumentException("Compatible major version must not be equal to 0");
}
this.compatibleMajorVersion = compatibleMajorVersion;
return this;
}

/**
* Returns a version used for serialising a response compatible with a previous version.
*/
public byte getCompatibleMajorVersion() {
return compatibleMajorVersion;
}

@Override
public void flush() throws IOException {
generator.flush();
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/elasticsearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private static Version fromStringSlow(String version) {
public final byte build;
public final org.apache.lucene.util.Version luceneVersion;
private final String toString;
private final int previousMajorId;

Version(int id, org.apache.lucene.util.Version luceneVersion) {
this.id = id;
Expand All @@ -269,6 +270,7 @@ private static Version fromStringSlow(String version) {
this.build = (byte) (id % 100);
this.luceneVersion = Objects.requireNonNull(luceneVersion);
this.toString = major + "." + minor + "." + revision;
this.previousMajorId = major > 0 ? (major - 1) * 1000000 + 99 : major;
}

public boolean after(Version version) {
Expand Down Expand Up @@ -392,6 +394,22 @@ public boolean isCompatible(Version version) {
return compatible;
}

/**
* Returns the minimum version that can be used for compatible REST API
*/
public Version minimumRestCompatibilityVersion() {
return Version.CURRENT.previousMajor();
}

/**
* Returns a first major version previous to the version stored in this object.
* I.e 8.1.0 will return 7.0.0
*/
public Version previousMajor() {
return Version.fromId(previousMajorId);
}


@SuppressForbidden(reason = "System.out.*")
public static void main(String[] args) {
final String versionOutput = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.rest.CompatibleVersion;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestHeaderDefinition;
Expand Down Expand Up @@ -419,7 +420,8 @@ public class ActionModule extends AbstractModule {
public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices) {
CircuitBreakerService circuitBreakerService, UsageService usageService, SystemIndices systemIndices,
CompatibleVersion compatibleVersion) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexScopedSettings = indexScopedSettings;
Expand Down Expand Up @@ -451,7 +453,7 @@ public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpr
indicesAliasesRequestRequestValidators = new RequestValidators<>(
actionPlugins.stream().flatMap(p -> p.indicesAliasesRequestValidators().stream()).collect(Collectors.toList()));

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService, compatibleVersion);
}


Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.CompatibleVersion;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
Expand Down Expand Up @@ -541,7 +542,8 @@ protected Node(final Environment initialEnvironment,

ActionModule actionModule = new ActionModule(settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices);
threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService, systemIndices,
getRestCompatibleFunction());
modules.add(actionModule);

final RestController restController = actionModule.getRestController();
Expand Down Expand Up @@ -716,6 +718,15 @@ protected Node(final Environment initialEnvironment,
}
}

/**
* @return A function that can be used to determine the requested REST compatible version
* package scope for testing
*/
CompatibleVersion getRestCompatibleFunction() {
// TODO PG Until compatible version plugin is implemented, return current version.
return CompatibleVersion.CURRENT_VERSION;
}

protected TransportService newTransportService(Settings settings, Transport transport, ThreadPool threadPool,
TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
Expand Down
35 changes: 35 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/CompatibleVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.rest;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ParsedMediaType;

/**
* An interface used to specify a function that returns a compatible API version.
* This function abstracts how the version calculation is provided (for instance from plugin).
*/
@FunctionalInterface
public interface CompatibleVersion {
Version get(@Nullable ParsedMediaType acceptHeader, @Nullable ParsedMediaType contentTypeHeader, boolean hasContent);

CompatibleVersion CURRENT_VERSION = (acceptHeader, contentTypeHeader, hasContent) -> Version.CURRENT;
}
30 changes: 21 additions & 9 deletions server/src/main/java/org/elasticsearch/rest/MethodHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@

package org.elasticsearch.rest;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.Version;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs.
* Encapsulate multiple handlers for the same path, allowing different handlers for different HTTP verbs and versions.
*/
final class MethodHandlers {

private final String path;
private final Map<RestRequest.Method, RestHandler> methodHandlers;
private final Map<RestRequest.Method, Map<Version, RestHandler>> methodHandlers;

MethodHandlers(String path, RestHandler handler, RestRequest.Method... methods) {
this.path = path;
this.methodHandlers = new HashMap<>(methods.length);
for (RestRequest.Method method : methods) {
methodHandlers.put(method, handler);
methodHandlers.computeIfAbsent(method, k -> new HashMap<>())
.put(handler.compatibleWithVersion(), handler);
}
}

Expand All @@ -47,7 +48,8 @@ final class MethodHandlers {
*/
MethodHandlers addMethods(RestHandler handler, RestRequest.Method... methods) {
for (RestRequest.Method method : methods) {
RestHandler existing = methodHandlers.putIfAbsent(method, handler);
RestHandler existing = methodHandlers.computeIfAbsent(method, k -> new HashMap<>())
.putIfAbsent(handler.compatibleWithVersion(), handler);
if (existing != null) {
throw new IllegalArgumentException("Cannot replace existing handler for [" + path + "] for method: " + method);
}
Expand All @@ -56,11 +58,21 @@ MethodHandlers addMethods(RestHandler handler, RestRequest.Method... methods) {
}

/**
* Returns the handler for the given method or {@code null} if none exists.
* Returns the handler for the given method and version.
*
* If a handler for given version do not exist, a handler for Version.CURRENT will be returned.
* The reasoning behind is that in a minor a new API could be added passively, therefore new APIs are compatible
* (as opposed to non-compatible/breaking)
* or {@code null} if none exists.
*/
@Nullable
RestHandler getHandler(RestRequest.Method method) {
return methodHandlers.get(method);
RestHandler getHandler(RestRequest.Method method, Version version) {
Map<Version, RestHandler> versionToHandlers = methodHandlers.get(method);
if (versionToHandlers == null) {
return null; //method not found
}
final RestHandler handler = versionToHandlers.get(version);
return handler == null ? versionToHandlers.get(Version.CURRENT) : handler;

}

/**
Expand Down
39 changes: 29 additions & 10 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -90,11 +91,14 @@ public class RestController implements HttpServerTransport.Dispatcher {
/** Rest headers that are copied to internal requests made during a rest request. */
private final Set<RestHeaderDefinition> headersToCopy;
private final UsageService usageService;
private CompatibleVersion compatibleVersion;

public RestController(Set<RestHeaderDefinition> headersToCopy, UnaryOperator<RestHandler> handlerWrapper,
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService) {
NodeClient client, CircuitBreakerService circuitBreakerService, UsageService usageService,
CompatibleVersion compatibleVersion) {
this.headersToCopy = headersToCopy;
this.usageService = usageService;
this.compatibleVersion = compatibleVersion;
if (handlerWrapper == null) {
handlerWrapper = h -> h; // passthrough if no wrapper set
}
Expand Down Expand Up @@ -168,6 +172,10 @@ protected void registerHandler(RestRequest.Method method, String path, RestHandl
}

private void registerHandlerNoWrap(RestRequest.Method method, String path, RestHandler maybeWrappedHandler) {
final Version version = maybeWrappedHandler.compatibleWithVersion();
assert Version.CURRENT.minimumRestCompatibilityVersion() == version || Version.CURRENT == version
: "REST API compatibility is only supported for version " + Version.CURRENT.minimumRestCompatibilityVersion().major;

handlers.insertOrUpdate(path, new MethodHandlers(path, maybeWrappedHandler, method),
(mHandlers, newMHandler) -> mHandlers.addMethods(maybeWrappedHandler, method));
}
Expand Down Expand Up @@ -220,7 +228,8 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
}
}

private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler, Version compatibleVersion)
throws Exception {
final int contentLength = request.contentLength();
if (contentLength > 0) {
final XContentType xContentType = request.getXContentType();
Expand All @@ -242,7 +251,7 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength, compatibleVersion);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
Expand Down Expand Up @@ -318,6 +327,9 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
final String rawPath = request.rawPath();
final String uri = request.uri();
final RestRequest.Method requestMethod;

Version compatibleVersion = this.compatibleVersion.
get(request.getParsedAccept(), request.getParsedContentType(), request.hasContent());
try {
// Resolves the HTTP method and fails if the method is invalid
requestMethod = request.method();
Expand All @@ -329,14 +341,14 @@ private void tryAllHandlers(final RestRequest request, final RestChannel channel
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(requestMethod);
handler = handlers.getHandler(requestMethod, compatibleVersion);
}
if (handler == null) {
if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
return;
}
} else {
dispatchRequest(request, channel, handler);
dispatchRequest(request, channel, handler, compatibleVersion);
return;
}
}
Expand Down Expand Up @@ -454,33 +466,40 @@ private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService;
private final int contentLength;
private final Version compatibleVersion;
private final AtomicBoolean closed = new AtomicBoolean();

ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength,
Version compatibleVersion) {
this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService;
this.contentLength = contentLength;
this.compatibleVersion = compatibleVersion;
}

@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
return delegate.newBuilder()
.withCompatibleMajorVersion(compatibleVersion.major);
}

@Override
public XContentBuilder newErrorBuilder() throws IOException {
return delegate.newErrorBuilder();
return delegate.newErrorBuilder()
.withCompatibleMajorVersion(compatibleVersion.major);
}

@Override
public XContentBuilder newBuilder(@Nullable XContentType xContentType, boolean useFiltering) throws IOException {
return delegate.newBuilder(xContentType, useFiltering);
return delegate.newBuilder(xContentType, useFiltering)
.withCompatibleMajorVersion(compatibleVersion.major);
}

@Override
public XContentBuilder newBuilder(XContentType xContentType, XContentType responseContentType, boolean useFiltering)
throws IOException {
return delegate.newBuilder(xContentType, responseContentType, useFiltering);
return delegate.newBuilder(xContentType, responseContentType, useFiltering)
.withCompatibleMajorVersion(compatibleVersion.major);
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/rest/RestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.rest;

import org.elasticsearch.Version;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.MediaType;
import org.elasticsearch.common.xcontent.MediaTypeRegistry;
Expand Down Expand Up @@ -106,6 +107,16 @@ default MediaTypeRegistry<? extends MediaType> validAcceptMediaTypes() {
return XContentType.MEDIA_TYPE_REGISTRY;
}

/**
* Returns a version a handler is compatible with.
* This version is then used to math a handler with a request that specified a version.
* If no version is specified, handler is assumed to be compatible with <code>Version.CURRENT</code>
* @return a version
*/
default Version compatibleWithVersion() {
return Version.CURRENT;
}

class Route {

private final String path;
Expand Down

0 comments on commit 618d8bc

Please sign in to comment.