Skip to content

Commit

Permalink
No mapper service and index caches for replicated closed indices (#40423
Browse files Browse the repository at this point in the history
)

Replicated closed indices can't be indexed into or searched, and therefore don't need a shard with
full indexing and search capabilities allocated. We can save on a lot of heap memory for those
indices by not allocating a mapper service and caching infrastructure (which preallocates a constant
amount per instance). Before this change, a 1GB ES instance could host 250 replicated closed
metricbeat indices (each index with one shard). After this change, the same instance can host 7300
replicated closed metricbeat instances (not that this would be a recommended configuration). Most
of the remaining memory is in the cluster state and the IndexSettings object.
  • Loading branch information
ywelsch committed Mar 27, 2019
1 parent 730dad6 commit 03394b8
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ public static Type defaultStoreType(final boolean allowMmap) {
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
Expand Down Expand Up @@ -395,7 +396,7 @@ public IndexService newIndexService(
} else {
queryCache = new DisabledQueryCache(indexSettings);
}
return new IndexService(indexSettings, environment, xContentRegistry,
return new IndexService(indexSettings, indexCreationContext, environment, xContentRegistry,
new SimilarityService(indexSettings, scriptService, similarities),
shardStoreDeleter, analysisRegistry, engineFactory, circuitBreakerService, bigArrays, threadPool, scriptService,
client, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry,
Expand Down
56 changes: 38 additions & 18 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust

public IndexService(
IndexSettings indexSettings,
IndexCreationContext indexCreationContext,
NodeEnvironment nodeEnv,
NamedXContentRegistry xContentRegistry,
SimilarityService similarityService,
Expand All @@ -162,21 +163,36 @@ public IndexService(
this.similarityService = similarityService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.circuitBreakerService = circuitBreakerService;
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
);
} else {
if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE &&
indexCreationContext == IndexCreationContext.CREATE_INDEX) { // metadata verification needs a mapper service
this.mapperService = null;
this.indexFieldData = null;
this.indexSortSupplier = () -> null;
this.bitsetFilterCache = null;
this.warmer = null;
this.indexCache = null;
} else {
this.mapperService = new MapperService(indexSettings, registry.build(indexSettings), xContentRegistry, similarityService,
mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null));
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
this.indexSortSupplier = () -> indexSettings.getIndexSortConfig().buildIndexSort(
mapperService::fullName,
indexFieldData::getForField
);
} else {
this.indexSortSupplier = () -> null;
}
indexFieldData.setListener(new FieldDataCacheListener(this));
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
}

this.shardStoreDeleter = shardStoreDeleter;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
Expand All @@ -185,10 +201,6 @@ public IndexService(
this.eventListener = eventListener;
this.nodeEnv = nodeEnv;
this.indexStore = indexStore;
indexFieldData.setListener(new FieldDataCacheListener(this));
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(threadPool, indexFieldData, bitsetFilterCache.createListener(threadPool));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = Objects.requireNonNull(engineFactory);
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
Expand All @@ -202,6 +214,11 @@ public IndexService(
updateFsyncTaskIfNecessary();
}

public enum IndexCreationContext {
CREATE_INDEX,
META_DATA_VERIFICATION
}

public int numberOfShards() {
return shards.size();
}
Expand Down Expand Up @@ -548,7 +565,10 @@ List<SearchOperationListener> getSearchOperationListener() { // pkg private for

@Override
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
return mapperService().updateMapping(currentIndexMetaData, newIndexMetaData);
if (mapperService == null) {
return false;
}
return mapperService.updateMapping(currentIndexMetaData, newIndexMetaData);
}

private class StoreCloseListener implements Store.OnClose {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2493,8 +2493,9 @@ private EngineConfig newEngineConfig() {
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig,
mapperService != null ? mapperService.indexAnalyzer() : null,
similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache != null ? indexCache.query() : null, cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
Expand Down Expand Up @@ -3077,7 +3078,9 @@ public void afterRefresh(boolean didRefresh) throws IOException {

private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService);
final DocumentMapper noopDocumentMapper = mapperService != null ?
new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService) :
null;
return new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.elasticsearch.index.IndexService.IndexCreationContext.META_DATA_VERIFICATION;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;

public class IndicesService extends AbstractLifecycleComponent
Expand Down Expand Up @@ -491,7 +493,7 @@ public void onStoreClosed(ShardId shardId) {
finalListeners.add(oldShardsStats);
final IndexService indexService =
createIndexService(
"create index",
CREATE_INDEX,
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
Expand All @@ -513,7 +515,7 @@ public void onStoreClosed(ShardId shardId) {
/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason,
private synchronized IndexService createIndexService(IndexService.IndexCreationContext indexCreationContext,
IndexMetaData indexMetaData,
IndicesQueryCache indicesQueryCache,
IndicesFieldDataCache indicesFieldDataCache,
Expand All @@ -526,7 +528,7 @@ private synchronized IndexService createIndexService(final String reason,
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
reason);
indexCreationContext);

final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), indexStoreFactories);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
Expand All @@ -537,6 +539,7 @@ private synchronized IndexService createIndexService(final String reason,
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(
indexCreationContext,
nodeEnv,
xContentRegistry,
this,
Expand Down Expand Up @@ -615,7 +618,7 @@ public synchronized void verifyIndexMetadata(IndexMetaData metaData, IndexMetaDa
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service =
createIndexService("metadata verification", metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
createIndexService(META_DATA_VERIFICATION, metaData, indicesQueryCache, indicesFieldDataCache, emptyList());
closeables.add(() -> service.close("metadata verification", false));
service.mapperService().merge(metaData, MapperService.MergeReason.MAPPING_RECOVERY);
if (metaData.equals(metaDataUpdate) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.function.Function;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -148,8 +149,8 @@ public void tearDown() throws Exception {
}

private IndexService newIndexService(IndexModule module) throws IOException {
return module.newIndexService(nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays, threadPool,
scriptService, null, indicesQueryCache, mapperRegistry,
return module.newIndexService(CREATE_INDEX, nodeEnvironment, xContentRegistry(), deleter, circuitBreakerService, bigArrays,
threadPool, scriptService, null, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, listener), writableRegistry());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ public static List<Translog.Operation> readAllOperationsInLucene(Engine engine,
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
if (mapper == null || mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false
|| (engine instanceof InternalEngine) == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public void onIndexModule(IndexModule module) {
throw new IllegalArgumentException("permission filters are not allowed to use the current timestamp");

}, null),
indexService.cache().bitsetFilterCache(),
indexService.cache() != null ? indexService.cache().bitsetFilterCache() : null,
indexService.getThreadPool().getThreadContext(), getLicenseState(),
indexService.getScriptService()));
/* We need to forcefully overwrite the query cache implementation to use security's opt out query cache implementation.
Expand Down

0 comments on commit 03394b8

Please sign in to comment.