Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HLRC: Add get watch API #35531

Merged
merged 14 commits into from Nov 30, 2018
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.client.watcher.ActivateWatchResponse;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.AckWatchResponse;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.GetWatchResponse;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
Expand Down Expand Up @@ -129,6 +131,34 @@ public void putWatchAsync(PutWatchRequest request, RequestOptions options,
PutWatchResponse::fromXContent, listener, emptySet());
}

/**
* Gets a watch from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetWatchResponse getWatch(GetWatchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, emptySet());
}

/**
* Asynchronously gets a watch into the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void getWatchAsync(GetWatchRequest request, RequestOptions options,
ActionListener<GetWatchResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, listener, emptySet());
}

/**
* Deactivate an existing watch
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-deactivate-watch.html">
Expand Down
Expand Up @@ -28,12 +28,13 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;

final class WatcherRequestConverters {

Expand Down Expand Up @@ -76,6 +77,16 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
return request;
}


static Request getWatch(GetWatchRequest getWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "watcher", "watch")
.addPathPart(getWatchRequest.getId())
.build();

return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request deactivateWatch(DeactivateWatchRequest deactivateWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;

/**
* The request to get the watch by name (id)
*/
public final class GetWatchRequest implements Validatable {

private final String id;

public GetWatchRequest(String watchId) {
validateId(watchId);
this.id = watchId;
}

private void validateId(String id) {
ValidationException exception = new ValidationException();
if (id == null) {
exception.addValidationError("watch id is missing");
} else if (PutWatchRequest.isValidId(id) == false) {
exception.addValidationError("watch id contains whitespace");
}
if (exception.validationErrors().isEmpty() == false) {
throw exception;
}
}

/**
* @return The name of the watch to retrieve
*/
public String getId() {
return id;
}
}
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class GetWatchResponse {
private final String id;
private final long version;
private final WatchStatus status;

private final BytesReference source;
private final XContentType xContentType;

/**
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comment: it would be nice to add a comment to this constructor explaining that it is for a 'missing watch', as opposed to the one below.

this(id, Versions.NOT_FOUND, null, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
}

public String getId() {
return id;
}

public long getVersion() {
return version;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}

public WatchStatus getStatus() {
return status;
}

/**
* Returns the {@link XContentType} of the source
*/
public XContentType getContentType() {
return xContentType;
}

/**
* Returns the serialized watch
*/
public BytesReference getSource() {
return source;
}

/**
* Returns the source as a map
*/
public Map<String, Object> getSourceAsMap() {
return source == null ? null : XContentHelper.convertToMap(source, false, getContentType()).v2();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(xContentType, that.xContentType) &&
Objects.equals(source, that.source);
}

@Override
public int hashCode() {
return Objects.hash(id, status, source, version);
}

private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

private static ConstructingObjectParser<GetWatchResponse, Void> PARSER =
new ConstructingObjectParser<>("get_watch_response", true,
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
return builder;
}
}, WATCH_FIELD);
}

public static GetWatchResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.watcher;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.joda.time.DateTime;
Expand All @@ -44,19 +45,22 @@ public class WatchStatus {
private final DateTime lastMetCondition;
private final long version;
private final Map<String, ActionStatus> actions;
@Nullable private Map<String, String> headers;

public WatchStatus(long version,
State state,
ExecutionState executionState,
DateTime lastChecked,
DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions,
Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}

public State state() {
Expand All @@ -79,6 +83,10 @@ public ActionStatus actionStatus(String actionId) {
return actions.get(actionId);
}

public Map<String, ActionStatus> getActions() {
return actions;
}

public long version() {
return version;
}
Expand All @@ -87,6 +95,10 @@ public ExecutionState getExecutionState() {
return executionState;
}

public Map<String, String> getHeaders() {
return headers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -98,7 +110,8 @@ public boolean equals(Object o) {
Objects.equals(lastMetCondition, that.lastMetCondition) &&
Objects.equals(version, that.version) &&
Objects.equals(executionState, that.executionState) &&
Objects.equals(actions, that.actions);
Objects.equals(actions, that.actions) &&
Objects.equals(headers, that.headers);
}

@Override
Expand All @@ -112,6 +125,7 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
Map<String, String> headers = null;
long version = -1;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Expand Down Expand Up @@ -172,13 +186,17 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
throw new ElasticsearchParseException("could not parse watch status. expecting field [{}] to be an object, " +
"found [{}] instead", currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}

actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}

public static class State {
Expand Down Expand Up @@ -214,6 +232,8 @@ public static State parse(XContentParser parser) throws IOException {
active = parser.booleanValue();
} else if (Field.TIMESTAMP.match(currentFieldName, parser.getDeprecationHandler())) {
timestamp = parseDate(currentFieldName, parser);
} else {
parser.skipChildren();
}
}
return new State(active, timestamp);
Expand All @@ -229,5 +249,6 @@ public interface Field {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}