Skip to content

Commit

Permalink
HLRC: Add get watch API (#35531)
Browse files Browse the repository at this point in the history
This changes adds the support for the get watch API in the high level rest client.
  • Loading branch information
jimczi committed Nov 30, 2018
1 parent fa3d365 commit 5c7b2c5
Show file tree
Hide file tree
Showing 15 changed files with 676 additions and 15 deletions.
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.client.watcher.ActivateWatchResponse;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.AckWatchResponse;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.GetWatchResponse;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
Expand Down Expand Up @@ -129,6 +131,34 @@ public void putWatchAsync(PutWatchRequest request, RequestOptions options,
PutWatchResponse::fromXContent, listener, emptySet());
}

/**
* Gets a watch from the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public GetWatchResponse getWatch(GetWatchRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, emptySet());
}

/**
* Asynchronously gets a watch into the cluster
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-get-watch.html">
* the docs</a> for more.
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void getWatchAsync(GetWatchRequest request, RequestOptions options,
ActionListener<GetWatchResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request, WatcherRequestConverters::getWatch, options,
GetWatchResponse::fromXContent, listener, emptySet());
}

/**
* Deactivate an existing watch
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-api-deactivate-watch.html">
Expand Down
Expand Up @@ -28,12 +28,13 @@
import org.elasticsearch.client.watcher.DeactivateWatchRequest;
import org.elasticsearch.client.watcher.ActivateWatchRequest;
import org.elasticsearch.client.watcher.AckWatchRequest;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.GetWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;
import org.elasticsearch.client.watcher.StartWatchServiceRequest;
import org.elasticsearch.client.watcher.StopWatchServiceRequest;
import org.elasticsearch.client.watcher.WatcherStatsRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.client.watcher.DeleteWatchRequest;
import org.elasticsearch.client.watcher.PutWatchRequest;

final class WatcherRequestConverters {

Expand Down Expand Up @@ -76,6 +77,16 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
return request;
}


static Request getWatch(GetWatchRequest getWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack", "watcher", "watch")
.addPathPart(getWatchRequest.getId())
.build();

return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request deactivateWatch(DeactivateWatchRequest deactivateWatchRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_xpack")
Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;

/**
* The request to get the watch by name (id)
*/
public final class GetWatchRequest implements Validatable {

private final String id;

public GetWatchRequest(String watchId) {
validateId(watchId);
this.id = watchId;
}

private void validateId(String id) {
ValidationException exception = new ValidationException();
if (id == null) {
exception.addValidationError("watch id is missing");
} else if (PutWatchRequest.isValidId(id) == false) {
exception.addValidationError("watch id contains whitespace");
}
if (exception.validationErrors().isEmpty() == false) {
throw exception;
}
}

/**
* @return The name of the watch to retrieve
*/
public String getId() {
return id;
}
}
@@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.watcher;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class GetWatchResponse {
private final String id;
private final long version;
private final WatchStatus status;

private final BytesReference source;
private final XContentType xContentType;

/**
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
}

public String getId() {
return id;
}

public long getVersion() {
return version;
}

public boolean isFound() {
return version != Versions.NOT_FOUND;
}

public WatchStatus getStatus() {
return status;
}

/**
* Returns the {@link XContentType} of the source
*/
public XContentType getContentType() {
return xContentType;
}

/**
* Returns the serialized watch
*/
public BytesReference getSource() {
return source;
}

/**
* Returns the source as a map
*/
public Map<String, Object> getSourceAsMap() {
return source == null ? null : XContentHelper.convertToMap(source, false, getContentType()).v2();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetWatchResponse that = (GetWatchResponse) o;
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(xContentType, that.xContentType) &&
Objects.equals(source, that.source);
}

@Override
public int hashCode() {
return Objects.hash(id, status, source, version);
}

private static final ParseField ID_FIELD = new ParseField("_id");
private static final ParseField FOUND_FIELD = new ParseField("found");
private static final ParseField VERSION_FIELD = new ParseField("_version");
private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField WATCH_FIELD = new ParseField("watch");

private static ConstructingObjectParser<GetWatchResponse, Void> PARSER =
new ConstructingObjectParser<>("get_watch_response", true,
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
} else {
return new GetWatchResponse((String) a[0]);
}
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(parser, context) -> {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
return builder;
}
}, WATCH_FIELD);
}

public static GetWatchResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
}
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.watcher;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.joda.time.DateTime;
Expand All @@ -44,19 +45,22 @@ public class WatchStatus {
private final DateTime lastMetCondition;
private final long version;
private final Map<String, ActionStatus> actions;
@Nullable private Map<String, String> headers;

public WatchStatus(long version,
State state,
ExecutionState executionState,
DateTime lastChecked,
DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions,
Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}

public State state() {
Expand All @@ -79,6 +83,10 @@ public ActionStatus actionStatus(String actionId) {
return actions.get(actionId);
}

public Map<String, ActionStatus> getActions() {
return actions;
}

public long version() {
return version;
}
Expand All @@ -87,6 +95,10 @@ public ExecutionState getExecutionState() {
return executionState;
}

public Map<String, String> getHeaders() {
return headers;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -98,7 +110,8 @@ public boolean equals(Object o) {
Objects.equals(lastMetCondition, that.lastMetCondition) &&
Objects.equals(version, that.version) &&
Objects.equals(executionState, that.executionState) &&
Objects.equals(actions, that.actions);
Objects.equals(actions, that.actions) &&
Objects.equals(headers, that.headers);
}

@Override
Expand All @@ -112,6 +125,7 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
Map<String, String> headers = null;
long version = -1;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Expand Down Expand Up @@ -172,13 +186,17 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
throw new ElasticsearchParseException("could not parse watch status. expecting field [{}] to be an object, " +
"found [{}] instead", currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}

actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}

public static class State {
Expand Down Expand Up @@ -214,6 +232,8 @@ public static State parse(XContentParser parser) throws IOException {
active = parser.booleanValue();
} else if (Field.TIMESTAMP.match(currentFieldName, parser.getDeprecationHandler())) {
timestamp = parseDate(currentFieldName, parser);
} else {
parser.skipChildren();
}
}
return new State(active, timestamp);
Expand All @@ -229,5 +249,6 @@ public interface Field {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}

0 comments on commit 5c7b2c5

Please sign in to comment.