Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7d588c7
Track index commits internally acquired by the commits listener in Co…
arteam Sep 4, 2024
2fa3721
Track internal commits separately
arteam Sep 4, 2024
f3fafb6
Add additional comment about the purpose of internallyAcquiredIndexCo…
arteam Sep 5, 2024
b098d43
Merge remote-tracking branch 'origin/main' into track-index-commits-a…
arteam Sep 5, 2024
466572a
Revert "Add additional comment about the purpose of internallyAcquire…
arteam Sep 8, 2024
53f9a1b
Revert "Track internal commits separately"
arteam Sep 8, 2024
f8bff08
Merge remote-tracking branch 'origin/main' into track-index-commits-a…
arteam Sep 8, 2024
321281f
Track internal commits separately
arteam Sep 8, 2024
a29454d
Rename to hasExternallyAcquiredIndexCommits
arteam Sep 8, 2024
98a8df8
Add a comment about hasExternallyAcquiredIndexCommits
arteam Sep 8, 2024
f0193a6
Update error message about non-internal commits
arteam Sep 8, 2024
c6d8662
Simplify hasExternallyAcquiredIndexCommits
arteam Sep 8, 2024
2bbfdb8
Add a check for the amount of references on the commit
arteam Sep 9, 2024
a54f305
Use hasAcquiredIndexCommitsForTesting
arteam Sep 9, 2024
1c55fd1
Remove extra if in hasAcquiredCommitsForTesting
arteam Sep 9, 2024
a5d451c
Merge branch 'main' into track-index-commits-acquired-by-commits-list…
elasticmachine Sep 9, 2024
3fedaae
Update server/src/main/java/org/elasticsearch/index/engine/CombinedDe…
arteam Sep 10, 2024
c6d986c
Update server/src/main/java/org/elasticsearch/index/engine/CombinedDe…
arteam Sep 10, 2024
494bf43
Update server/src/main/java/org/elasticsearch/index/engine/CombinedDe…
arteam Sep 10, 2024
89224ce
Run spotless
arteam Sep 10, 2024
5acb94b
Merge remote-tracking branch 'origin/main' into track-index-commits-a…
arteam Sep 10, 2024
d156514
Merge branch 'main' into track-index-commits-acquired-by-commits-list…
elasticmachine Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final SoftDeletesPolicy softDeletesPolicy;
private final LongSupplier globalCheckpointSupplier;
private final Map<IndexCommit, Integer> acquiredIndexCommits; // Number of references held against each commit point.
// Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them
// when checking for externally acquired index commits that haven't been released
private final Set<IndexCommit> internallyAcquiredIndexCommits;

interface CommitsListener {

Expand Down Expand Up @@ -72,6 +75,7 @@ interface CommitsListener {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.commitsListener = commitsListener;
this.acquiredIndexCommits = new HashMap<>();
this.internallyAcquiredIndexCommits = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ public void onCommit(List<? extends IndexCommit> commits) throws IOException {
this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO));
}
if (commitsListener != null && previousLastCommit != this.lastCommit) {
newCommit = acquireIndexCommit(false);
newCommit = acquireIndexCommit(false, true);
} else {
newCommit = null;
}
Expand Down Expand Up @@ -210,15 +214,25 @@ SafeCommitInfo getSafeCommitInfo() {
* @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point.
*/
synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) {
return acquireIndexCommit(acquiringSafeCommit, false);
}

private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) {
assert safeCommit != null : "Safe commit is not initialized yet";
assert lastCommit != null : "Last commit is not initialized yet";
final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit;
acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount
return wrapCommit(snapshotting);
assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting)
: "commit [" + snapshotting + "] already added";
return wrapCommit(snapshotting, acquiredInternally);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit) {
return new SnapshotIndexCommit(indexCommit);
return wrapCommit(indexCommit, false);
}

protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) {
return new SnapshotIndexCommit(indexCommit, acquiredInternally);
}

/**
Expand All @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) {
* @return true if the acquired commit can be clean up.
*/
synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit();
final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit;
final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit();
assert acquiredIndexCommits.containsKey(releasingCommit)
: "Release non-acquired commit;"
+ "acquired commits ["
Expand All @@ -242,6 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) {
}
return count - 1;
});
assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit)
: "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally";

assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]";
// The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.
Expand Down Expand Up @@ -296,10 +313,16 @@ private static Set<String> listOfNewFileNames(IndexCommit previous, IndexCommit
}

/**
* Checks whether the deletion policy is holding on to acquired index commits
* Checks whether the deletion policy is holding on to externally acquired index commits
*/
synchronized boolean hasAcquiredIndexCommits() {
return acquiredIndexCommits.isEmpty() == false;
synchronized boolean hasAcquiredIndexCommitsForTesting() {
// We explicitly check only external commits and disregard internal commits acquired by the commits listener
for (var e : acquiredIndexCommits.entrySet()) {
if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) {
return true;
}
}
return false;
}

/**
Expand All @@ -320,8 +343,12 @@ public static String commitDescription(IndexCommit commit) throws IOException {
* A wrapper of an index commit that prevents it from being deleted.
*/
private static class SnapshotIndexCommit extends FilterIndexCommit {
SnapshotIndexCommit(IndexCommit delegate) {

private final boolean acquiredInternally;

SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) {
super(delegate);
this.acquiredInternally = acquiredInternally;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ Translog getTranslog() {
}

// Package private for testing purposes only
boolean hasAcquiredIndexCommits() {
return combinedDeletionPolicy.hasAcquiredIndexCommits();
boolean hasAcquiredIndexCommitsForTesting() {
return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw
assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo)));
}

public static boolean hasAcquiredIndexCommits(Engine engine) {
public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) {
assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.hasAcquiredIndexCommits();
return internalEngine.hasAcquiredIndexCommitsForTesting();
}

public static final class PrimaryTermSupplier implements LongSupplier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ private void assertNoAcquiredIndexCommit() throws Exception {
if (engine instanceof InternalEngine) {
assertFalse(
indexShard.routingEntry().toString() + " has unreleased snapshotted index commits",
EngineTestCase.hasAcquiredIndexCommits(engine)
EngineTestCase.hasAcquiredIndexCommitsForTesting(engine)
);
}
} catch (AlreadyClosedException ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException {
sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false));
}

assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));
restoreSourceService.closeSession(sessionUUID);
assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard)));
assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard)));

closeShards(indexShard);
// Exception will be thrown if file is not closed.
Expand Down