Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
jimczi committed Nov 28, 2018
1 parent 425e95f commit 22c0c7a
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 360 deletions.
Expand Up @@ -35,21 +35,19 @@ public class GetWatchResponse {
private final WatchStatus status;

private final BytesReference source;
private final XContentType xContentType;

/**
* Ctor for missing watch
*/
public GetWatchResponse(String id) {
this(id, Versions.NOT_FOUND, null, null, null);
this(id, Versions.NOT_FOUND, null, null);
}

public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source) {
this.id = id;
this.version = version;
this.status = status;
this.source = source;
this.xContentType = xContentType;
}

public String getId() {
Expand All @@ -73,7 +71,7 @@ public BytesReference getSource() {
}

public XContentType getContentType() {
return xContentType;
return XContentType.JSON;
}

@Override
Expand All @@ -84,13 +82,12 @@ public boolean equals(Object o) {
return version == that.version &&
Objects.equals(id, that.id) &&
Objects.equals(status, that.status) &&
Objects.equals(source, that.source) &&
xContentType == that.xContentType;
Objects.equals(source, that.source);
}

@Override
public int hashCode() {
return Objects.hash(id, status, source, xContentType, version);
return Objects.hash(id, status, source, version);
}

private static final ParseField ID_FIELD = new ParseField("_id");
Expand All @@ -104,9 +101,8 @@ public int hashCode() {
a -> {
boolean isFound = (boolean) a[1];
if (isFound) {
XContentBuilder builder = (XContentBuilder) a[4];
BytesReference source = BytesReference.bytes(builder);
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
BytesReference source = (BytesReference) a[4];
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source);
} else {
return new GetWatchResponse((String) a[0]);
}
Expand All @@ -122,7 +118,7 @@ public int hashCode() {
(parser, context) -> {
try (XContentBuilder builder = XContentBuilder.builder(parser.contentType().xContent())) {
builder.copyCurrentStructure(parser);
return builder;
return BytesReference.bytes(builder);
}
}, WATCH_FIELD);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.watcher;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.joda.time.DateTime;
Expand All @@ -44,19 +45,22 @@ public class WatchStatus {
private final DateTime lastMetCondition;
private final long version;
private final Map<String, ActionStatus> actions;
@Nullable private Map<String, String> headers;

public WatchStatus(long version,
State state,
ExecutionState executionState,
DateTime lastChecked,
DateTime lastMetCondition,
Map<String, ActionStatus> actions) {
Map<String, ActionStatus> actions,
Map<String, String> headers) {
this.version = version;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.actions = actions;
this.state = state;
this.executionState = executionState;
this.headers = headers;
}

public State state() {
Expand All @@ -79,7 +83,7 @@ public ActionStatus actionStatus(String actionId) {
return actions.get(actionId);
}

Map<String, ActionStatus> getActions() {
public Map<String, ActionStatus> getActions() {
return actions;
}

Expand Down Expand Up @@ -116,6 +120,7 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Map<String, ActionStatus> actions = null;
Map<String, String> headers = null;
long version = -1;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser::getTokenLocation);
Expand Down Expand Up @@ -176,13 +181,17 @@ public static WatchStatus parse(XContentParser parser) throws IOException {
throw new ElasticsearchParseException("could not parse watch status. expecting field [{}] to be an object, " +
"found [{}] instead", currentFieldName, token);
}
} else if (Field.HEADERS.match(currentFieldName, parser.getDeprecationHandler())) {
if (token == XContentParser.Token.START_OBJECT) {
headers = parser.mapStrings();
}
} else {
parser.skipChildren();
}
}

actions = actions == null ? emptyMap() : unmodifiableMap(actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions);
return new WatchStatus(version, state, executionState, lastChecked, lastMetCondition, actions, headers);
}

public static class State {
Expand Down Expand Up @@ -233,5 +242,6 @@ public interface Field {
ParseField ACTIONS = new ParseField("actions");
ParseField VERSION = new ParseField("version");
ParseField EXECUTION_STATE = new ParseField("execution_state");
ParseField HEADERS = new ParseField("headers");
}
}
Expand Up @@ -200,23 +200,26 @@ public void onFailure(Exception e) {
}

{
//tag::x-pack-get-watch-execute
//tag::get-watch-request
GetWatchRequest request = new GetWatchRequest("my_watch_id");
//end::get-watch-request

//tag::ack-watch-execute
GetWatchResponse response = client.watcher().getWatch(request, RequestOptions.DEFAULT);
//end::x-pack-get-watch-execute
//end::get-watch-request

//tag::x-pack-get-watch-response
//tag::get-watch-response
String watchId = response.getId(); // <1>
boolean found = response.isFound(); // <2>
long version = response.getVersion(); // <3>
WatchStatus status = response.getStatus(); // <4>
BytesReference source = response.getSource(); // <5>
//end::x-pack-get-watch-response
//end::get-watch-response
}

{
GetWatchRequest request = new GetWatchRequest("my_other_watch_id");
// tag::x-pack-get-watch-execute-listener
// tag::get-watch-execute-listener
ActionListener<GetWatchResponse> listener = new ActionListener<GetWatchResponse>() {
@Override
public void onResponse(GetWatchResponse response) {
Expand All @@ -228,15 +231,15 @@ public void onFailure(Exception e) {
// <2>
}
};
// end::x-pack-get-watch-execute-listener
// end::get-watch-execute-listener

// Replace the empty listener by a blocking listener in test
final CountDownLatch latch = new CountDownLatch(1);
listener = new LatchedActionListener<>(listener, latch);

// tag::x-pack-get-watch-execute-async
// tag::get-watch-execute-async
client.watcher().getWatchAsync(request, RequestOptions.DEFAULT, listener); // <1>
// end::x-pack-get-watch-execute-async
// end::get-watch-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
}
Expand Down

0 comments on commit 22c0c7a

Please sign in to comment.