Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog/95987.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 95987
summary: Fix reused/recovered bytes for files that are only partially recovered from
cache
area: Snapshot/Restore
type: bug
issues:
- 95970
- 95994
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@

public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase {

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/95987")
public void testCreateAndRestoreSearchableSnapshot() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TestRepositoryPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/95994")
public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exception {
final String fsRepoName = randomAlphaOfLength(10);
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,32 @@ private void prewarmCache(ActionListener<Void> listener, Supplier<Boolean> cance

final AtomicLong prefetchedBytes = new AtomicLong(0L);
try (var fileListener = new RefCountingListener(ActionListener.runBefore(completionListener.acquire().map(v -> {
// we don't support files to be reported as partially recovered from disk and partially from the blob store, but
// this is something that can happen for fully mounted searchable snapshots. It is possible that prewarming
// prefetched nothing if a concurrent search was executing (and cached the data) or if the data were fetched from
// the blob cache system index.
if (prefetchedBytes.get() == 0L) {
recoveryState.markIndexFileAsReused(file.physicalName());
} else {
recoveryState.getIndex().addRecoveredFromSnapshotBytesToFile(file.physicalName(), prefetchedBytes.get());
recoveryState.getIndex().addRecoveredFromSnapshotBytesToFile(file.physicalName(), file.length());
}
return v;
}), () -> IOUtils.closeWhileHandlingException(input)))) {

if (input instanceof CachedBlobContainerIndexInput cachedIndexInput) {
if (cachedIndexInput.getPersistentCacheInitialLength() == file.length()) {
logger.trace(
() -> format(
"%s file [%s] is already available in cache (%d bytes)",
shardId,
file.physicalName(),
file.length()
)
);
continue;
}
}

for (int p = 0; p < file.numberOfParts(); p++) {
final int part = p;
prewarmTaskRunner.enqueueTask(fileListener.acquire().map(releasable -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ protected void readWithoutBlobCache(ByteBuffer b) throws Exception {
assert bytesRead == length : bytesRead + " vs " + length;
}

/**
* @return Returns the number of bytes already cached for the file in the cold persistent cache
*/
public long getPersistentCacheInitialLength() throws Exception {
return cacheFileReference.get().getInitialLength();
}

/**
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
*
Expand Down