Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NoOp engine for replicating closed indices #31745

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]IndexNameExpressionResolver.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaData.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataCreateIndexService.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataIndexStateService.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataIndexUpgradeService.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataMappingService.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]metadata[/\\]MetaDataUpdateSettingsService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ public void testAliasesNonExistentIndex() throws IOException {
assertThat(aliasExists(alias), equalTo(false));
}

@AwaitsFix(bugUrl = "needs fixing for async opening")
public void testOpenExistingIndex() throws IOException {
String index = "index";
createIndex(index, Settings.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@

- match: { test_index.shards.0.type: SNAPSHOT }
- match: { test_index.shards.0.stage: DONE }
- match: { test_index.shards.0.index.files.recovered: 0}
- match: { test_index.shards.0.index.size.recovered_in_bytes: 0}
- match: { test_index.shards.0.index.files.reused: 1}
- gt: { test_index.shards.0.index.size.reused_in_bytes: 0}
- match: { test_index.shards.0.index.files.recovered: 1}
- match: { test_index.shards.0.index.size.recovered_in_bytes: 230}
- match: { test_index.shards.0.index.files.reused: 0}
- match: { test_index.shards.0.index.size.reused_in_bytes: 0}

# Remove our snapshot
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@

- match:
$body: |
/^( \s+
/^(green \s+
close \s+
index1 \s+
([a-zA-Z0-9=/_+]|[\\\-]){22} \s+
\s+
\s+
\s+
\s+
\s+
\s*
1 \s+
0 \s+
0 \s+
0 \s+
(\d+|\d+[.]\d+)(kb|b) \s+
(\d+|\d+[.]\d+)(kb|b) \s*
)
$/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
"Basic test for index open/close":
- skip:
version: " - 7.0.99"
reason: awaitsfix because opening is async

- do:
indices.create:
index: test_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ setup:

---
"All indices":
- skip:
version: " - 7.0.99"
reason: awaitsfix because opening is async

- do:
indices.close:
index: _all
Expand All @@ -46,6 +50,10 @@ setup:

---
"Trailing wildcard":
- skip:
version: " - 7.0.99"
reason: awaitsfix because opening is async

- do:
indices.close:
index: test_*
Expand All @@ -69,6 +77,10 @@ setup:

---
"Only wildcard":
- skip:
version: " - 7.0.99"
reason: awaitsfix because opening is async

- do:
indices.close:
index: '*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ setup:

---
"Create a snapshot and then restore it":
- skip:
reason: "recovered/reused bytes differ for 6.x and 7.x"
version: " - 6.99.99"

- do:
snapshot.create:
Expand Down Expand Up @@ -53,7 +56,7 @@ setup:

- match: { test_index.shards.0.type: SNAPSHOT }
- match: { test_index.shards.0.stage: DONE }
- match: { test_index.shards.0.index.files.recovered: 0}
- match: { test_index.shards.0.index.size.recovered_in_bytes: 0}
- match: { test_index.shards.0.index.files.reused: 1}
- gt: { test_index.shards.0.index.size.reused_in_bytes: 0}
- match: { test_index.shards.0.index.files.recovered: 1}
- match: { test_index.shards.0.index.size.recovered_in_bytes: 230}
- match: { test_index.shards.0.index.files.reused: 0}
- match: { test_index.shards.0.index.size.reused_in_bytes: 0}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public ClusterHealthRequest indices(String... indices) {

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.lenientExpandOpen();
return IndicesOptions.lenientExpandAll();
}

public TimeValue timeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,16 @@ public enum Option {
new IndicesOptions(EnumSet.of(Option.ALLOW_NO_INDICES), EnumSet.of(WildcardStates.OPEN));
public static final IndicesOptions LENIENT_EXPAND_OPEN =
new IndicesOptions(EnumSet.of(Option.ALLOW_NO_INDICES, Option.IGNORE_UNAVAILABLE), EnumSet.of(WildcardStates.OPEN));
public static final IndicesOptions LENIENT_EXPAND_ALL =
new IndicesOptions(EnumSet.of(Option.ALLOW_NO_INDICES, Option.IGNORE_UNAVAILABLE),
EnumSet.of(WildcardStates.OPEN, WildcardStates.CLOSED));
public static final IndicesOptions STRICT_EXPAND_OPEN_CLOSED =
new IndicesOptions(EnumSet.of(Option.ALLOW_NO_INDICES), EnumSet.of(WildcardStates.OPEN, WildcardStates.CLOSED));
public static final IndicesOptions STRICT_EXPAND_OPEN_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.ALLOW_NO_INDICES, Option.FORBID_CLOSED_INDICES), EnumSet.of(WildcardStates.OPEN));
public static final IndicesOptions STRICT_SINGLE_INDEX_NO_EXPAND =
new IndicesOptions(EnumSet.of(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES),
EnumSet.noneOf(WildcardStates.class));
public static final IndicesOptions STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED =
new IndicesOptions(EnumSet.of(Option.FORBID_ALIASES_TO_MULTIPLE_INDICES, Option.FORBID_CLOSED_INDICES),
EnumSet.noneOf(WildcardStates.class));
Expand Down Expand Up @@ -393,6 +399,14 @@ public static IndicesOptions strictSingleIndexNoExpandForbidClosed() {
return STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

/**
* @return indices option that requires each specified index or alias to exist, doesn't expand wildcards and
* throws error if any of the aliases resolves to multiple indices
*/
public static IndicesOptions strictSingleIndexNoExpand() {
return STRICT_SINGLE_INDEX_NO_EXPAND;
}

/**
* @return indices options that ignores unavailable indices, expands wildcards only to open indices and
* allows that no indices are resolved from wildcard expressions (not returning an error).
Expand All @@ -401,6 +415,14 @@ public static IndicesOptions lenientExpandOpen() {
return LENIENT_EXPAND_OPEN;
}

/**
* @return indices options that ignores unavailable indices, expands wildcards to open and closed indices and
* allows that no indices are resolved from wildcard expressions (not returning an error).
*/
public static IndicesOptions lenientExpandAll() {
return LENIENT_EXPAND_ALL;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand All @@ -57,7 +56,8 @@
*/
public class MetaDataIndexStateService extends AbstractComponent {

public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
false, true, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);

private final ClusterService clusterService;

Expand Down Expand Up @@ -85,7 +85,8 @@ public void closeIndex(final CloseIndexClusterStateUpdateRequest request, final
}

final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
clusterService.submitStateUpdateTask("close-indices " + indicesAsString,
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
Expand All @@ -109,7 +110,7 @@ public ClusterState execute(ClusterState currentState) {
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);
logger.info("closing indices {}", indicesAsString);

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
Expand All @@ -122,20 +123,14 @@ public ClusterState execute(ClusterState currentState) {

ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();

RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData index : indicesToClose) {
rtBuilder.remove(index.getIndex().getName());
}

//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices closed [" + indicesAsString + "]");
// no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(ClusterState.builder(updatedState).build(), "indices closed " + indicesAsString);
}
});
}

public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
public void openIndex(final OpenIndexClusterStateUpdateRequest request,
final ActionListener<OpenIndexClusterStateUpdateResponse> listener) {
onlyOpenIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
Expand All @@ -153,13 +148,15 @@ public void openIndex(final OpenIndexClusterStateUpdateRequest request, final Ac
}, listener::onFailure));
}

private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request,
final ActionListener<ClusterStateUpdateResponse> listener) {
if (request.indices() == null || request.indices().length == 0) {
throw new IllegalArgumentException("Index name is required");
}

final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("open-indices " + indicesAsString, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
clusterService.submitStateUpdateTask("open-indices " + indicesAsString,
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
Expand All @@ -179,7 +176,7 @@ public ClusterState execute(ClusterState currentState) {
return currentState;
}

logger.info("opening indices [{}]", indicesAsString);
logger.info("opening indices {}", indicesAsString);

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
Expand All @@ -204,15 +201,8 @@ public ClusterState execute(ClusterState currentState) {

ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();

RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable());
for (IndexMetaData index : indicesToOpen) {
rtBuilder.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(index.getIndex()));
}

//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices opened [" + indicesAsString + "]");
return allocationService.reroute(ClusterState.builder(updatedState).build(), "indices opened " + indicesAsString);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,32 +497,27 @@ public Builder updateNumberOfReplicas(int numberOfReplicas, String... indices) {
}

public Builder addAsNew(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsNew(indexMetaData);
add(indexRoutingBuilder);
}
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsNew(indexMetaData);
add(indexRoutingBuilder);
return this;
}

public Builder addAsRecovery(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsRecovery(indexMetaData);
add(indexRoutingBuilder);
}
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsRecovery(indexMetaData);
add(indexRoutingBuilder);
return this;
}

public Builder addAsFromDangling(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsFromDangling(indexMetaData);
add(indexRoutingBuilder);
}
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsFromDangling(indexMetaData);
add(indexRoutingBuilder);
return this;
}

// TODO: I think we may not need this method any more, check about removing it
public Builder addAsFromCloseToOpen(IndexMetaData indexMetaData) {
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
*/
public interface EngineFactory {

Engine newReadWriteEngine(EngineConfig config);
Engine newReadWriteEngine(EngineConfig config, boolean closed);

}
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

// Package private for testing purposes only
Translog getTranslog() {
public Translog getTranslog() {
ensureOpen();
return translog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@

public class InternalEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return new InternalEngine(config);
public Engine newReadWriteEngine(EngineConfig config, boolean closed) {
if (closed) {
return new NoOpEngine(config);
} else {
return new InternalEngine(config);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* Directory so that the last commit's user data can be read for the historyUUID
* and last committed segment info.
*/
final class NoOpEngine extends Engine {
public final class NoOpEngine extends Engine {

private static final Translog.Snapshot EMPTY_TRANSLOG_SNAPSHOT = new Translog.Snapshot() {
@Override
Expand All @@ -79,7 +79,7 @@ public void close() {
private final String historyUUID;
private final SegmentInfos lastCommittedSegmentInfos;

NoOpEngine(EngineConfig engineConfig) {
public NoOpEngine(EngineConfig engineConfig) {
super(engineConfig);

store.incRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -146,6 +147,11 @@ public Request(final ShardId shardId) {
super(shardId);
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpand();
}

@Override
public String toString() {
return "GlobalCheckpointSyncAction.Request{" +
Expand Down
Loading