Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the index type from internal watcher indexes #39761

Merged
merged 3 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
public final class TriggeredWatchStoreField {

public static final String INDEX_NAME = ".triggered_watches";
public static final String DOC_TYPE = "doc";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class Watch implements ToXContentObject {

public static final String INCLUDE_STATUS_KEY = "include_status";
public static final String INDEX = ".watches";
public static final String DOC_TYPE = "doc";

private final String id;
private final Trigger trigger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.priority": 900
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"trigger_event": {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.format": 6
},
"mappings": {
"doc": {
"_doc": {
"_meta": {
"watcher-history-version": "${xpack.watcher.template.version}"
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/core/src/main/resources/watches.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.priority": 800
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"status": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void setConfiguration(Configuration configuration) {
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
if (isWatchDocument(shardId.getIndexName(), operation.type())) {
if (isWatchDocument(shardId.getIndexName())) {
ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
Expand Down Expand Up @@ -150,7 +150,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName(), index.type())) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
}
Expand All @@ -166,7 +166,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
*/
@Override
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
if (isWatchDocument(shardId.getIndexName(), delete.type())) {
if (isWatchDocument(shardId.getIndexName())) {
triggerService.remove(delete.id());
}

Expand All @@ -177,11 +177,10 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
* Check if a supplied index and document matches the current configuration for watcher
*
* @param index The index to check for
* @param docType The document type
* @return true if this is a watch in the active watcher index, false otherwise
*/
private boolean isWatchDocument(String index, String docType) {
return configuration.isIndexAndActive(index) && docType.equals(Watch.DOC_TYPE);
private boolean isWatchDocument(String index) {
return configuration.isIndexAndActive(index);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ public static EmailAction.Builder emailAction(EmailTemplate email) {
return EmailAction.builder(email);
}

/**
* Types are deprecated and should not be used. use {@link #indexAction(String)}
*/
@Deprecated
public static IndexAction.Builder indexAction(String index, String type) {
return IndexAction.builder(index, type);
}

public static IndexAction.Builder indexAction(String index) {
return IndexAction.builder(index);
}

public static JiraAction.Builder jiraAction(String account, MapBuilder<String, Object> fields) {
return jiraAction(account, fields.immutableMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class IndexAction implements Action {

public static final String TYPE = "index";

@Nullable final String docType;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this file (IndexAction) were missed in #37594. I added it here to help us find what to remove when we completely get rid of types. Apologizes for conflating the two issues.

@Nullable @Deprecated final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final String executionTimeField;
Expand All @@ -40,6 +40,15 @@ public class IndexAction implements Action {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(IndexAction.class));
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in a watcher index action is deprecated.";

public IndexAction(@Nullable String index, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
* Document types are deprecated, use constructor without docType
*/
@Deprecated
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
Expand Down Expand Up @@ -188,10 +197,18 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}

/**
* Document types are deprecated, use {@link #builder(java.lang.String)}
*/
@Deprecated
public static Builder builder(String index, String docType) {
return new Builder(index, docType);
}

public static Builder builder(String index) {
return new Builder(index);
}

public static class Result extends Action.Result {

private final XContentSource response;
Expand Down Expand Up @@ -278,11 +295,20 @@ public static class Builder implements Action.Builder<IndexAction> {
ZoneId dynamicNameTimeZone;
RefreshPolicy refreshPolicy;

/**
* Document types are deprecated and should not be used. Use: {@link Builder#Builder(java.lang.String)}
*/
@Deprecated
private Builder(String index, String docType) {
this.index = index;
this.docType = docType;
}

private Builder(String index) {
this.index = index;
this.docType = null;
}

public Builder setDocId(String docId) {
this.docId = docId;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void updateWatchStatus(Watch watch) throws IOException {
.field(WatchField.STATUS.getPreferredName(), watch.status(), params)
.endObject();

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, watch.id());
updateRequest.doc(source);
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
Expand Down Expand Up @@ -501,7 +501,7 @@ public void executeTriggeredWatches(Collection<TriggeredWatch> triggeredWatches)
*/
private GetResponse getWatch(String id) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
GetRequest getRequest = new GetRequest(Watch.INDEX, id).preference(Preference.LOCAL.type()).realtime(true);
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
client.get(getRequest, future);
return future.actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws I
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME).id(triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
Expand All @@ -115,7 +114,7 @@ private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatche
* @param wid The ID os the triggered watch id
*/
public void delete(Wid wid) {
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, wid.value());
bulkProcessor.add(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

public class HistoryStore {

public static final String DOC_TYPE = "doc";

private static final Logger logger = LogManager.getLogger(HistoryStore.class);

private final BulkProcessor bulkProcessor;
Expand All @@ -47,7 +45,7 @@ public void put(WatchRecord watchRecord) throws Exception {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
request.opType(IndexRequest.OpType.CREATE);
bulkProcessor.add(request);
} catch (IOException ioe) {
Expand All @@ -64,7 +62,7 @@ public void forcePut(WatchRecord watchRecord) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
bulkProcessor.add(request);
} catch (IOException ioe) {
final WatchRecord wr = watchRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected void doExecute(AckWatchRequest request, ActionListener<AckWatchRespons
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId()));
} else {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand All @@ -99,7 +99,7 @@ protected void doExecute(AckWatchRequest request, ActionListener<AckWatchRespons
return;
}

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TransportActivateWatchAction(TransportService transportService, ActionFil
protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
try {
ZonedDateTime now = clock.instant().atZone(ZoneOffset.UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = activateWatchBuilder(request.isActivate(), now);
updateRequest.doc(builder);
Expand All @@ -72,7 +72,7 @@ protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWa

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(updateResponse -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public TransportDeleteWatchAction(TransportService transportService, ActionFilte

@Override
protected void doExecute(Task task, DeleteWatchRequest request, ActionListener<DeleteWatchResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, request.getId());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, deleteRequest,
ActionListener.<DeleteResponse>wrap(deleteResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public TransportExecuteWatchAction(TransportService transportService, ThreadPool
@Override
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
if (request.getId() != null) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public TransportGetWatchAction(TransportService transportService, ActionFilters

@Override
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void doExecute(PutWatchRequest request, ActionListener<PutWatchRespons
watch.toXContent(builder, DEFAULT_PARAMS);

if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getId());
if (request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
updateRequest.setIfSeqNo(request.getIfSeqNo());
updateRequest.setIfPrimaryTerm(request.getIfPrimaryTerm());
Expand All @@ -112,7 +112,7 @@ protected void doExecute(PutWatchRequest request, ActionListener<PutWatchRespons
}, listener::onFailure),
client::update);
} else {
IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
IndexRequest indexRequest = new IndexRequest(Watch.INDEX).id(request.getId());
indexRequest.source(builder);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testCanUseAnyConcreteIndexName() throws Exception {
createIndex(watchResultsIndex);

stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
startWatcher();

PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
Expand Down
Loading