Skip to content

Commit

Permalink
Fix leaking listeners bug on frozen tier (#85239) (#85301)
Browse files Browse the repository at this point in the history
We identified a bug in searchable snapshot code that could prevent some search queries to be completed. This bug results in search tasks lingering on a data_frozen node for hours or days, filling the search thread pool and blocking other search requests to be processed.

This bug can be reproduced on a data_frozen node with:

    multiple search queries requesting the same region from the shared cache
    the region corresponds to cached data not yet available in cache
    the searchable snapshot's thread pool that fetches cached data has no active workers
    the thread that will write the cached data (ie, the thread that picks up the Gap to fill) has restrictive permissions that prevents new thread to be created

When such a situation occurs, the thread pool in charge of executing the AbstractRunnable that fills the cache throws an exception (in our case, an AccessControlException) that is not caught and not logged. In consequence the Gap that should have been written in cache is never filled with data and the listeners waiting for the cached data to be available won't be called back, blocking the execution of the searches waiting for the data.

This pull request introduces an integration test that reproduce this situation:

java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "modifyThreadGroup")
	at java.security.AccessControlContext.checkPermission(AccessControlContext.java:485) ~[?:?]
	at java.security.AccessController.checkPermission(AccessController.java:1068) ~[?:?]
	at java.lang.SecurityManager.checkPermission(SecurityManager.java:416) ~[?:?]
	at org.elasticsearch.secure_sm.SecureSM.checkThreadGroupAccess(SecureSM.java:186) ~[?:8.2.0-SNAPSHOT]
	at org.elasticsearch.secure_sm.SecureSM.checkAccess(SecureSM.java:135) ~[?:8.2.0-SNAPSHOT]
	at java.lang.ThreadGroup.checkAccess(ThreadGroup.java:335) ~[?:?]
	at java.lang.Thread.<init>(Thread.java:422) ~[?:?]
	at java.lang.Thread.<init>(Thread.java:708) ~[?:?]
	at org.elasticsearch.common.util.concurrent.EsExecutors$EsThreadFactory.newThread(EsExecutors.java:274) ~[?:8.2.0-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:630) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:920) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1364) ~[?:?]
	at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:66) ~[?:8.2.0-SNAPSHOT]
	at org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService$CacheFileRegion.populateAndRead(FrozenCacheService.java:769) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService$FrozenCacheFile.populateAndRead(FrozenCacheService.java:896) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.xpack.searchablesnapshots.store.input.FrozenIndexInput.readWithoutBlobCache(FrozenIndexInput.java:143) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.xpack.searchablesnapshots.store.input.MetadataCachingIndexInput.doReadInternal(MetadataCachingIndexInput.java:104) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.xpack.searchablesnapshots.store.input.BaseSearchableSnapshotIndexInput.readInternal(BaseSearchableSnapshotIndexInput.java:112) [?:8.2.0-SNAPSHOT]
	at org.apache.lucene.store.BufferedIndexInput.refill(BufferedIndexInput.java:291) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.apache.lucene.store.BufferedIndexInput.readShort(BufferedIndexInput.java:243) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.apache.lucene.util.packed.DirectReader$DirectPackedReader16.get(DirectReader.java:302) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.apache.lucene.codecs.lucene90.Lucene90DocValuesProducer$20.ordValue(Lucene90DocValuesProducer.java:852) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.apache.lucene.index.SingletonSortedSetDocValues.advanceExact(SingletonSortedSetDocValues.java:81) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.elasticsearch.index.fielddata.FieldData$10.advanceExact(FieldData.java:336) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.script.field.AbstractKeywordDocValuesField.setNextDocId(AbstractKeywordDocValuesField.java:44) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.lookup.LeafDocLookup.getScriptFieldFactory(LeafDocLookup.java:74) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:89) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.lookup.LeafDocLookup.get(LeafDocLookup.java:27) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.painless.PainlessScript$Script.execute(doc['text.raw'].value.toString().length() > 0:5) [?:?]
	at org.elasticsearch.index.query.ScriptQueryBuilder$ScriptQuery$1$1.matches(ScriptQueryBuilder.java:193) [?:8.2.0-SNAPSHOT]
	at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreRange(Weight.java:281) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:254) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.elasticsearch.search.internal.CancellableBulkScorer.score(CancellableBulkScorer.java:45) [?:8.2.0-SNAPSHOT]
	at org.apache.lucene.search.BulkScorer.score(BulkScorer.java:38) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.elasticsearch.search.internal.ContextIndexSearcher.searchLeaf(ContextIndexSearcher.java:194) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.internal.ContextIndexSearcher.search(ContextIndexSearcher.java:167) [?:8.2.0-SNAPSHOT]
	at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:555) [?:9.1.0 5b522487ba8e0f1002b50a136817ca037aec9686 - jtibs - 2022-03-16 10:32:40]
	at org.elasticsearch.search.query.QueryPhase.searchWithCollector(QueryPhase.java:232) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.query.QueryPhase.executeInternal(QueryPhase.java:187) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.query.QueryPhase.execute(QueryPhase.java:88) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.SearchService.loadOrExecuteQueryPhase(SearchService.java:462) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:625) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.search.SearchService.lambda$executeQueryPhase$2(SearchService.java:487) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.action.ActionRunnable.lambda$supply$0(ActionRunnable.java:47) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:62) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:33) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:773) [?:8.2.0-SNAPSHOT]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:26) [?:8.2.0-SNAPSHOT]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]

This permission issue has been fixed yesterday by #85180 but I still think it is valuable to have an integration test that combin scripts and searchable snapshot. In addition to this, this pull request changes the searchable snapshot code to always fail any Gap that was not correctly submitted for execution: this way the current search (and any other searches that are waiting for the cached data to be available) returns a search failure and not block indefinitely. In order to catch any future similar issue we also log and assert any exception that could be thrown when submitting the task to fill the required gaps.
  • Loading branch information
tlrx committed Mar 24, 2022
1 parent 99a6092 commit 9fbfc05
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 65 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/85239.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85239
summary: Fix leaking listeners bug on frozen tier
area: Snapshot/Restore
type: bug
issues: []
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ testClusters.matching { it.name == "integTest" }.configureEach {

setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'
}

tasks.register("azureThirdPartyTest") {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/gcs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ testClusters.matching { it.name == "integTest" }.configureEach {
setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'
setting 'xpack.security.enabled', 'false'
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/minio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ testClusters.matching { it.name == "integTest" }.configureEach {
setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'
setting 'xpack.security.enabled', 'false'
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ testClusters.configureEach {

setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'

setting 'xpack.security.enabled', 'true'
user username: 'admin', password: 'admin-password', role: 'superuser'
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ testClusters.matching { it.name == "integTest" }.configureEach {
setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'
setting 'xpack.security.enabled', 'false'
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/qa/url/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ testClusters.matching { it.name == "integTest" }.configureEach {
setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.searchable_snapshots.cache_fetch_async_thread_pool.keep_alive', '0ms'
setting 'xpack.security.enabled', 'false'
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,28 +362,34 @@ public Future<Integer> populateAndRead(
);

for (SparseFileTracker.Gap gap : gaps) {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (reference.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
gap.onCompletion();
markAsNeedsFSync();
} finally {
reference.decRef();
try {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (reference.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
writer.fillCacheRange(reference.fileChannel, gap.start(), gap.end(), gap::onProgress);
gap.onCompletion();
markAsNeedsFSync();
} finally {
reference.decRef();
}
}
}

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected exception when submitting task to fill gap [{}]", gap), e);
assert false : e;
gap.onFailure(e);
}
}
} catch (Exception e) {
releaseAndFail(future, decrementRef, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -759,36 +760,42 @@ StepListener<Integer> populateAndRead(
final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(rangeToWrite, rangeToRead, rangeListener);

for (SparseFileTracker.Gap gap : gaps) {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (CacheFileRegion.this.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
final long start = gap.start();
assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this;
writer.fillCacheRange(
fileChannel,
physicalStartOffset() + gap.start(),
gap.start(),
gap.end() - gap.start(),
progress -> gap.onProgress(start + progress)
);
writeCount.increment();
} finally {
decRef();
try {
executor.execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
if (CacheFileRegion.this.tryIncRef() == false) {
throw new AlreadyClosedException("Cache file channel has been released and closed");
}
try {
ensureOpen();
final long start = gap.start();
assert regionOwners[sharedBytesPos].get() == CacheFileRegion.this;
writer.fillCacheRange(
fileChannel,
physicalStartOffset() + gap.start(),
gap.start(),
gap.end() - gap.start(),
progress -> gap.onProgress(start + progress)
);
writeCount.increment();
} finally {
decRef();
}
gap.onCompletion();
}
gap.onCompletion();
}

@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
gap.onFailure(e);
}
});
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected exception when submitting task to fill gap [{}]", gap), e);
assert false : e;
gap.onFailure(e);
}
}
} catch (Exception e) {
releaseAndFail(listener, decrementRef, e);
Expand Down

0 comments on commit 9fbfc05

Please sign in to comment.