Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

System index reads in separate threadpool #57936

Merged
merged 33 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
c9c092b
System index reads in separate threadpool
jaymode Jun 3, 2020
7db9ec2
Merge branch 'master' into system_index_threadpool
jaymode Jun 11, 2020
e45e37a
leave get requests alone for now
jaymode Jun 11, 2020
4640cf2
minimal automaton
jaymode Jun 12, 2020
72f3a6a
refactor
jaymode Jun 12, 2020
37568d8
Revert "leave get requests alone for now"
jaymode Jun 15, 2020
1f48a70
threadpool shaping
jaymode Jun 18, 2020
cbb50bb
Merge branch 'master' into system_index_threadpool
jaymode Jul 13, 2020
42e4b8c
fixes
jaymode Jul 13, 2020
45bd609
Merge branch 'master' into system_index_threadpool
jaymode Jul 13, 2020
538d957
Merge branch 'master' into system_index_threadpool
jaymode Jul 27, 2020
64ebd20
upgrade existing indices to have system metadata
jaymode Jul 28, 2020
c1cfdf5
Merge branch 'master' into system_index_threadpool
jaymode Jul 29, 2020
e14bb7e
checkstyle
jaymode Jul 29, 2020
cffb69d
use indexsettings
jaymode Jul 30, 2020
e90f600
remove system indices from indices service
jaymode Aug 3, 2020
df944c4
index metadata version added
jaymode Aug 3, 2020
ffb9026
add KEY_SYSTEM
jaymode Aug 3, 2020
0152c12
remove method only used in test
jaymode Aug 3, 2020
36c67a7
SystemIndices updates
jaymode Aug 3, 2020
c493d52
Merge branch 'master' into system_index_threadpool
jaymode Aug 3, 2020
7961757
threadpool docs
jaymode Aug 3, 2020
d882e45
cleanup transportsearchaction
jaymode Aug 3, 2020
25bf51a
Merge branch 'master' into system_index_threadpool
jaymode Aug 3, 2020
9e43dcb
test and fixes for existing system indices
jaymode Aug 3, 2020
3924e88
add full cluster restart test
jaymode Aug 4, 2020
5059356
Fix validateDotIndex issue
gwbrown Aug 3, 2020
7306f79
Merge remote-tracking branch 'origin/master' into system_index_thread…
jaymode Aug 4, 2020
82edb62
Merge branch 'master' into system_index_threadpool
jaymode Aug 5, 2020
8e02d7e
Merge branch 'master' into system_index_threadpool
jaymode Aug 5, 2020
8d7db45
Merge branch 'master' into system_index_threadpool
jaymode Aug 5, 2020
d8bac67
Merge branch 'master' into system_index_threadpool
jaymode Aug 10, 2020
7ef470f
disable bwc tests
jaymode Aug 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,8 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs);
cbs,
false);
}

private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ protected Writeable.Reader<GetResponse> getResponseReader() {

@Override
protected String getExecutor(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
shardId);
final ClusterState clusterState = clusterService.state();
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
return ThreadPool.Names.SYSTEM_READ;
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
return ThreadPool.Names.SEARCH_THROTTLED;
} else {
return super.getExecutor(request, shardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,13 @@ protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, Sha

@Override
protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getIndexSettings().isSearchThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : super.getExecutor(request,
shardId);
final ClusterState clusterState = clusterService.state();
if (clusterState.metadata().index(shardId.getIndex()).isSystem()) {
return ThreadPool.Names.SYSTEM_READ;
} else if (indicesService.indexServiceSafe(shardId.getIndex()).getIndexSettings().isSearchThrottled()) {
return ThreadPool.Names.SEARCH_THROTTLED;
} else {
return super.getExecutor(request, shardId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,8 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, indices, shardIterators.size());
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState,
boolean onlySystemIndices = Arrays.stream(indices).allMatch(index -> clusterState.metadata().index(index.getName()).isSystem());
searchAsyncAction(task, searchRequest, onlySystemIndices, shardIterators, timeProvider, connectionLookup, clusterState,
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}

Expand Down Expand Up @@ -584,6 +585,7 @@ static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShards
}

private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction(SearchTask task, SearchRequest searchRequest,
boolean onlySystemIndices,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
Expand All @@ -594,14 +596,16 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
ActionListener<SearchResponse> listener,
boolean preFilter,
SearchResponse.Clusters clusters) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
final Executor executor = onlySystemIndices ?
threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
if (preFilter) {
return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators,
timeProvider, clusterState, task, (iter) -> {
AbstractSearchAsyncAction<? extends SearchPhaseResult> action = searchAsyncAction(
task,
searchRequest,
onlySystemIndices,
iter,
timeProvider,
connectionLookup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {

private final ActiveShardCount waitForActiveShards;
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;

private IndexMetadata(
final Index index,
Expand All @@ -410,7 +411,8 @@ private IndexMetadata(
final int routingNumShards,
final int routingPartitionSize,
final ActiveShardCount waitForActiveShards,
final ImmutableOpenMap<String, RolloverInfo> rolloverInfos) {
final ImmutableOpenMap<String, RolloverInfo> rolloverInfos,
final boolean isSystem) {

this.index = index;
this.version = version;
Expand Down Expand Up @@ -442,6 +444,7 @@ private IndexMetadata(
this.routingPartitionSize = routingPartitionSize;
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = rolloverInfos;
this.isSystem = isSystem;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -662,6 +665,9 @@ public boolean equals(Object o) {
if (rolloverInfos.equals(that.rolloverInfos) == false) {
return false;
}
if (isSystem != that.isSystem) {
return false;
}
return true;
}

Expand All @@ -679,6 +685,7 @@ public int hashCode() {
result = 31 * result + Arrays.hashCode(primaryTerms);
result = 31 * result + inSyncAllocationIds.hashCode();
result = 31 * result + rolloverInfos.hashCode();
result = 31 * result + Boolean.hashCode(isSystem);
return result;
}

Expand Down Expand Up @@ -718,6 +725,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
private final Diff<ImmutableOpenMap<String, DiffableStringMap>> customData;
private final Diff<ImmutableOpenIntMap<Set<String>>> inSyncAllocationIds;
private final Diff<ImmutableOpenMap<String, RolloverInfo>> rolloverInfos;
private final boolean isSystem;

IndexMetadataDiff(IndexMetadata before, IndexMetadata after) {
index = after.index.getName();
Expand All @@ -735,6 +743,7 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
inSyncAllocationIds = DiffableUtils.diff(before.inSyncAllocationIds, after.inSyncAllocationIds,
DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
rolloverInfos = DiffableUtils.diff(before.rolloverInfos, after.rolloverInfos, DiffableUtils.getStringKeySerializer());
isSystem = after.isSystem;
}

IndexMetadataDiff(StreamInput in) throws IOException {
Expand All @@ -761,6 +770,11 @@ private static class IndexMetadataDiff implements Diff<IndexMetadata> {
DiffableUtils.StringSetValueSerializer.getInstance());
rolloverInfos = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), RolloverInfo::new,
RolloverInfo::readDiffFrom);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
isSystem = in.readBoolean();
} else {
isSystem = false;
}
}

@Override
Expand All @@ -781,6 +795,9 @@ public void writeTo(StreamOutput out) throws IOException {
customData.writeTo(out);
inSyncAllocationIds.writeTo(out);
rolloverInfos.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(isSystem);
}
}

@Override
Expand All @@ -799,6 +816,7 @@ public IndexMetadata apply(IndexMetadata part) {
builder.customMetadata.putAll(customData.apply(part.customData));
builder.inSyncAllocationIds.putAll(inSyncAllocationIds.apply(part.inSyncAllocationIds));
builder.rolloverInfos.putAll(rolloverInfos.apply(part.rolloverInfos));
builder.system(isSystem);
return builder.build();
}
}
Expand Down Expand Up @@ -841,6 +859,9 @@ public static IndexMetadata readFrom(StreamInput in) throws IOException {
for (int i = 0; i < rolloverAliasesSize; i++) {
builder.putRolloverInfo(new RolloverInfo(in));
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
builder.system(in.readBoolean());
}
return builder.build();
}

Expand Down Expand Up @@ -879,6 +900,13 @@ public void writeTo(StreamOutput out) throws IOException {
for (ObjectCursor<RolloverInfo> cursor : rolloverInfos.values()) {
cursor.value.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(isSystem);
}
}

public boolean isSystem() {
return isSystem;
}

public static Builder builder(String index) {
Expand All @@ -905,6 +933,7 @@ public static class Builder {
private final ImmutableOpenIntMap.Builder<Set<String>> inSyncAllocationIds;
private final ImmutableOpenMap.Builder<String, RolloverInfo> rolloverInfos;
private Integer routingNumShards;
private boolean isSystem;

public Builder(String index) {
this.index = index;
Expand All @@ -913,6 +942,7 @@ public Builder(String index) {
this.customMetadata = ImmutableOpenMap.builder();
this.inSyncAllocationIds = ImmutableOpenIntMap.builder();
this.rolloverInfos = ImmutableOpenMap.builder();
this.isSystem = false;
}

public Builder(IndexMetadata indexMetadata) {
Expand All @@ -930,6 +960,7 @@ public Builder(IndexMetadata indexMetadata) {
this.routingNumShards = indexMetadata.routingNumShards;
this.inSyncAllocationIds = ImmutableOpenIntMap.builder(indexMetadata.inSyncAllocationIds);
this.rolloverInfos = ImmutableOpenMap.builder(indexMetadata.rolloverInfos);
this.isSystem = indexMetadata.isSystem;
}

public Builder index(String index) {
Expand Down Expand Up @@ -1133,6 +1164,15 @@ private void initializePrimaryTerms() {
Arrays.fill(primaryTerms, SequenceNumbers.UNASSIGNED_PRIMARY_TERM);
}

public Builder system(boolean system) {
this.isSystem = system;
return this;
}

public boolean isSystem() {
return isSystem;
}

public IndexMetadata build() {
ImmutableOpenMap.Builder<String, AliasMetadata> tmpAliases = aliases;
Settings tmpSettings = settings;
Expand Down Expand Up @@ -1237,7 +1277,8 @@ public IndexMetadata build() {
getRoutingNumShards(),
routingPartitionSize,
waitForActiveShards,
rolloverInfos.build());
rolloverInfos.build(),
isSystem);
}

public static void toXContent(IndexMetadata indexMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException {
Expand Down Expand Up @@ -1337,6 +1378,7 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
cursor.value.toXContent(builder, params);
}
builder.endObject();
builder.field("system", indexMetadata.isSystem);
gwbrown marked this conversation as resolved.
Show resolved Hide resolved

builder.endObject();
}
Expand Down Expand Up @@ -1464,6 +1506,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
builder.aliasesVersion(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());
} else if ("system".equals(currentFieldName)) {
builder.system(parser.booleanValue());
} else {
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class MetadataCreateIndexService {
private final IndexScopedSettings indexScopedSettings;
private final ActiveShardsObserver activeShardsObserver;
private final NamedXContentRegistry xContentRegistry;
private final Collection<SystemIndexDescriptor> systemIndexDescriptors;
private final SystemIndices systemIndices;
private final ShardLimitValidator shardLimitValidator;
private final boolean forbidPrivateIndexSettings;

Expand All @@ -139,7 +140,7 @@ public MetadataCreateIndexService(
final IndexScopedSettings indexScopedSettings,
final ThreadPool threadPool,
final NamedXContentRegistry xContentRegistry,
final Collection<SystemIndexDescriptor> systemIndexDescriptors,
final SystemIndices systemIndices,
final boolean forbidPrivateIndexSettings) {
this.settings = settings;
this.clusterService = clusterService;
Expand All @@ -150,7 +151,7 @@ public MetadataCreateIndexService(
this.indexScopedSettings = indexScopedSettings;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.xContentRegistry = xContentRegistry;
this.systemIndexDescriptors = systemIndexDescriptors;
this.systemIndices = systemIndices;
this.forbidPrivateIndexSettings = forbidPrivateIndexSettings;
this.shardLimitValidator = shardLimitValidator;
}
Expand Down Expand Up @@ -180,14 +181,12 @@ public void validateIndexName(String index, ClusterState state) {
/**
* Validates (if this index has a dot-prefixed name) whether it follows the rules for dot-prefixed indices.
* @param index The name of the index in question
* @param state The current cluster state
* @param isHidden Whether or not this is a hidden index
*/
public void validateDotIndex(String index, ClusterState state, @Nullable Boolean isHidden) {
public boolean validateDotIndex(String index, @Nullable Boolean isHidden) {
boolean isSystem = false;
if (index.charAt(0) == '.') {
List<SystemIndexDescriptor> matchingDescriptors = systemIndexDescriptors.stream()
.filter(descriptor -> descriptor.matchesIndexPattern(index))
.collect(toList());
Collection<SystemIndexDescriptor> matchingDescriptors = systemIndices.findMatchingDescriptors(index);
if (matchingDescriptors.isEmpty() && (isHidden == null || isHidden == Boolean.FALSE)) {
deprecationLogger.deprecate("index_name_starts_with_dot",
"index name [{}] starts with a dot '.', in the next major version, index names " +
Expand All @@ -204,8 +203,11 @@ public void validateDotIndex(String index, ClusterState state, @Nullable Boolean
// Throw AssertionError if assertions are enabled, or a regular exception otherwise:
assert false : errorMessage.toString();
throw new IllegalStateException(errorMessage.toString());
} else {
isSystem = true;
}
}
return isSystem;
}

/**
Expand Down Expand Up @@ -410,7 +412,8 @@ private ClusterState applyCreateIndexWithTemporaryService(final ClusterState cur
final IndexMetadata indexMetadata;
try {
indexMetadata = buildIndexMetadata(request.index(), aliases, indexService.mapperService()::documentMapper,
temporaryIndexMeta.getSettings(), temporaryIndexMeta.getRoutingNumShards(), sourceMetadata);
temporaryIndexMeta.getSettings(), temporaryIndexMeta.getRoutingNumShards(), sourceMetadata,
temporaryIndexMeta.isSystem());
} catch (Exception e) {
logger.info("failed to build index metadata [{}]", request.index());
throw e;
Expand All @@ -435,7 +438,7 @@ private IndexMetadata buildAndValidateTemporaryIndexMetadata(final ClusterState
final int routingNumShards) {

final boolean isHiddenAfterTemplates = IndexMetadata.INDEX_HIDDEN_SETTING.get(aggregatedIndexSettings);
validateDotIndex(request.index(), currentState, isHiddenAfterTemplates);
final boolean isSystem = validateDotIndex(request.index(), isHiddenAfterTemplates);

// remove the setting it's temporary and is only relevant once we create the index
final Settings.Builder settingsBuilder = Settings.builder().put(aggregatedIndexSettings);
Expand All @@ -445,6 +448,7 @@ private IndexMetadata buildAndValidateTemporaryIndexMetadata(final ClusterState
final IndexMetadata.Builder tmpImdBuilder = IndexMetadata.builder(request.index());
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
tmpImdBuilder.system(isSystem);

// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetadata tempMetadata = tmpImdBuilder.build();
Expand Down Expand Up @@ -773,7 +777,14 @@ static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<Clust
static IndexMetadata buildIndexMetadata(String indexName, List<AliasMetadata> aliases,
Supplier<DocumentMapper> documentMapperSupplier, Settings indexSettings, int routingNumShards,
@Nullable IndexMetadata sourceMetadata) {
return buildIndexMetadata(indexName, aliases, documentMapperSupplier, indexSettings, routingNumShards, sourceMetadata, false);
}

gwbrown marked this conversation as resolved.
Show resolved Hide resolved
static IndexMetadata buildIndexMetadata(String indexName, List<AliasMetadata> aliases,
Supplier<DocumentMapper> documentMapperSupplier, Settings indexSettings, int routingNumShards,
@Nullable IndexMetadata sourceMetadata, boolean isSystem) {
IndexMetadata.Builder indexMetadataBuilder = createIndexMetadataBuilder(indexName, sourceMetadata, indexSettings, routingNumShards);
indexMetadataBuilder.system(isSystem);
// now, update the mappings with the actual source
Map<String, MappingMetadata> mappingsMetadata = new HashMap<>();
DocumentMapper mapper = documentMapperSupplier.get();
Expand Down
Loading