Skip to content

Commit

Permalink
[7.x] Data stream admin actions are now index-level actions
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jul 10, 2020
1 parent 7fa9cf6 commit e01d73c
Show file tree
Hide file tree
Showing 15 changed files with 353 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
],
"parts":{
"name":{
"type":"string",
"description":"The name or wildcard expression of the requested data streams"
"type":"list",
"description":"A comma-separated list of data streams to get; use `*` to get all data streams"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ setup:
catch: missing

- match: { status: 404 }
- match: { error.root_cause.0.type: "resource_not_found_exception" }
- match: { error.root_cause.0.type: "index_not_found_exception" }

- do:
indices.get_data_stream:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void testMixedAutoCreate() throws Exception {
bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));

GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"*"});
GetDataStreamAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
assertThat(getDataStreamsResponse.getDataStreams(), hasSize(4));
getDataStreamsResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName()));
Expand Down Expand Up @@ -294,7 +294,7 @@ public void testAutoCreateV1TemplateNoDataStream() {
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));

GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"*"});
GetDataStreamAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
assertThat(getDataStreamsResponse.getDataStreams(), hasSize(0));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testBasicScenario() throws Exception {
createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar");
client().admin().indices().createDataStream(createDataStreamRequest).get();

GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"*"});
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
getDataStreamResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName()));
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(2));
Expand Down Expand Up @@ -302,7 +302,7 @@ public void testComposableTemplateOnlyMatchingWithDataStreamName() throws Except
verifyDocs(dataStreamName, numDocs, 1, 1);

String backingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"*"});
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
Expand Down Expand Up @@ -530,7 +530,7 @@ public void testTimestampFieldCustomAttributes() throws Exception {

CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
client().admin().indices().createDataStream(createDataStreamRequest).get();
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("logs-foobar");
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[]{"logs-foobar"});
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-foobar"));
Expand Down Expand Up @@ -682,7 +682,7 @@ public void testGetDataStream() throws Exception {
indexDocs("metrics-foo", "@timestamp", numDocsFoo);

GetDataStreamAction.Response response =
client().admin().indices().getDataStreams(new GetDataStreamAction.Request("metrics-foo")).actionGet();
client().admin().indices().getDataStreams(new GetDataStreamAction.Request(new String[]{"metrics-foo"})).actionGet();
assertThat(response.getDataStreams().size(), is(1));
GetDataStreamAction.Response.DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0);
assertThat(metricsFooDataStream.getDataStream().getName(), is("metrics-foo"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public void testSnapshotAndRestore() throws Exception {
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get();
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(
new GetDataStreamAction.Request(new String[]{"ds"})).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
Expand Down Expand Up @@ -155,7 +156,8 @@ public void testSnapshotAndRestoreAll() throws Exception {
assertEquals(1, hits.length);
assertEquals(DOCUMENT_SOURCE, hits[0].getSourceAsMap());

GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get();
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(
new GetDataStreamAction.Request(new String[]{"ds"})).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
Expand Down Expand Up @@ -188,7 +190,8 @@ public void testRename() throws Exception {
.setRenameReplacement("ds2")
.get();

GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(
new GetDataStreamAction.Request(new String[]{"ds2"})).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
Expand Down Expand Up @@ -227,7 +230,7 @@ public void testBackingIndexIsNotRenamedWhenRestoringDataStream() {

assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK));

GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request("ds");
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(new String[]{"ds"});
GetDataStreamAction.Response response = client.admin().indices().getDataStreams(getDSRequest).actionGet();
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
}
Expand Down Expand Up @@ -261,13 +264,13 @@ public void testDataStreamAndBackingIndidcesAreRenamedUsingRegex() {
assertThat(restoreSnapshotResponse.status(), is(RestStatus.OK));

// assert "ds" was restored as "test-ds" and the backing index has a valid name
GetDataStreamAction.Request getRenamedDS = new GetDataStreamAction.Request("test-ds");
GetDataStreamAction.Request getRenamedDS = new GetDataStreamAction.Request(new String[]{"test-ds"});
GetDataStreamAction.Response response = client.admin().indices().getDataStreams(getRenamedDS).actionGet();
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(),
is(DataStream.getDefaultBackingIndexName("test-ds", 1L)));

// data stream "ds" should still exist in the system
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request("ds");
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request(new String[]{"ds"});
response = client.admin().indices().getDataStreams(getDSRequest).actionGet();
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
}
Expand All @@ -293,7 +296,8 @@ public void testWildcards() throws Exception {

assertEquals(RestStatus.OK, restoreSnapshotResponse.status());

GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(
new GetDataStreamAction.Request(new String[]{"ds2"})).get();
assertEquals(1, ds.getDataStreams().size());
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
Expand Down Expand Up @@ -340,7 +344,7 @@ public void testDataStreamNotRestoredWhenIndexRequested() throws Exception {

assertEquals(RestStatus.OK, restoreSnapshotResponse.status());

GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request("ds");
GetDataStreamAction.Request getRequest = new GetDataStreamAction.Request(new String[]{"ds"});
expectThrows(ResourceNotFoundException.class, () -> client.admin().indices().getDataStreams(getRequest).actionGet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -52,7 +54,7 @@ private CreateDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}

public static class Request extends AcknowledgedRequest<Request> {
public static class Request extends AcknowledgedRequest<Request> implements IndicesRequest {

private final String name;

Expand Down Expand Up @@ -92,6 +94,16 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(name);
}

@Override
public String[] indices() {
return new String[]{name};
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
}

public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
Expand Down Expand Up @@ -70,9 +71,9 @@ private DeleteDataStreamAction() {
super(NAME, AcknowledgedResponse::new);
}

public static class Request extends MasterNodeRequest<Request> {
public static class Request extends MasterNodeRequest<Request> implements IndicesRequest.Replaceable {

private final String[] names;
private String[] names;

public Request(String[] names) {
this.names = Objects.requireNonNull(names);
Expand Down Expand Up @@ -110,6 +111,29 @@ public boolean equals(Object o) {
public int hashCode() {
return Arrays.hashCode(names);
}

@Override
public String[] indices() {
return names;
}

@Override
public IndicesOptions indicesOptions() {
// this doesn't really matter since data stream name resolution isn't affected by IndicesOptions and
// a data stream's backing indices are retrieved from its metadata
return IndicesOptions.fromOptions(false, true, true, true, false, false, true, false);
}

@Override
public boolean includeDataStreams() {
return true;
}

@Override
public IndicesRequest indices(String... indices) {
this.names = indices;
return this;
}
}

public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
Expand Down Expand Up @@ -175,16 +199,6 @@ static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexServi
snapshottingDataStreams.addAll(SnapshotsService.snapshottingDataStreams(currentState, dataStreams));
}

if (dataStreams.isEmpty()) {
// if only a match-all pattern was specified and no data streams were found because none exist, do not
// fail with data stream missing exception
if (request.names.length == 1 && Regex.isMatchAllPattern(request.names[0])) {
return currentState;
}
throw new ResourceNotFoundException("data_streams matching [" + Strings.arrayToCommaDelimitedString(request.names) +
"] not found");
}

if (snapshottingDataStreams.isEmpty() == false) {
throw new SnapshotInProgressException("Cannot delete data streams that are being snapshotted: " + snapshottingDataStreams +
". Try again after snapshot finishes or cancel the currently running snapshot.");
Expand Down

0 comments on commit e01d73c

Please sign in to comment.