From 7d588c79dcfa48fdb74d7d92acfc1002248d68fd Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 4 Sep 2024 12:53:31 +0200 Subject: [PATCH 01/17] Track index commits internally acquired by the commits listener in CombinedDeletionPolicy --- .../index/engine/CombinedDeletionPolicy.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 22bab1742589e..5e2eee16fd67b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -43,6 +44,7 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; private final Map acquiredIndexCommits; // Number of references held against each commit point. + private final Set indexCommitsAcquiredByCommitsListener; interface CommitsListener { @@ -72,6 +74,7 @@ interface CommitsListener { this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; this.acquiredIndexCommits = new HashMap<>(); + this.indexCommitsAcquiredByCommitsListener = new HashSet<>(); } @Override @@ -114,7 +117,7 @@ public void onCommit(List commits) throws IOException { this.maxSeqNoOfNextSafeCommit = Long.parseLong(commits.get(keptPosition + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); } if (commitsListener != null && previousLastCommit != this.lastCommit) { - newCommit = acquireIndexCommit(false); + newCommit = acquireIndexCommit(false, true); } else { newCommit = null; } @@ -210,15 +213,26 @@ SafeCommitInfo getSafeCommitInfo() { * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. */ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + return acquireIndexCommit(acquiringSafeCommit, false); + } + + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredByCommitsListener) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - return wrapCommit(snapshotting); + if (acquiredByCommitsListener) { + indexCommitsAcquiredByCommitsListener.add(snapshotting); + } + return wrapCommit(snapshotting, acquiredByCommitsListener); } protected IndexCommit wrapCommit(IndexCommit indexCommit) { - return new SnapshotIndexCommit(indexCommit); + return wrapCommit(indexCommit, false); + } + + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredByCommitsListener) { + return new SnapshotIndexCommit(indexCommit, acquiredByCommitsListener); } /** @@ -227,7 +241,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit) { * @return true if the acquired commit can be clean up. */ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { - final IndexCommit releasingCommit = ((SnapshotIndexCommit) acquiredCommit).getIndexCommit(); + SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; + final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) : "Release non-acquired commit;" + "acquired commits [" @@ -242,6 +257,9 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); + if (snapshotIndexCommit.acquiredByCommitsListener) { + indexCommitsAcquiredByCommitsListener.remove(releasingCommit); + } assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. @@ -299,7 +317,8 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit * Checks whether the deletion policy is holding on to acquired index commits */ synchronized boolean hasAcquiredIndexCommits() { - return acquiredIndexCommits.isEmpty() == false; + return acquiredIndexCommits.isEmpty() == false + && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(indexCommitsAcquiredByCommitsListener::contains)); } /** @@ -320,8 +339,12 @@ public static String commitDescription(IndexCommit commit) throws IOException { * A wrapper of an index commit that prevents it from being deleted. */ private static class SnapshotIndexCommit extends FilterIndexCommit { - SnapshotIndexCommit(IndexCommit delegate) { + + private final boolean acquiredByCommitsListener; + + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredByCommitsListener) { super(delegate); + this.acquiredByCommitsListener = acquiredByCommitsListener; } @Override From 2fa372142804ba1f18d61dd012876690fbb26a83 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 4 Sep 2024 17:48:30 +0200 Subject: [PATCH 02/17] Track internal commits separately --- .../index/engine/CombinedDeletionPolicy.java | 58 ++++++++----------- .../engine/CombinedDeletionPolicyTests.java | 2 +- 2 files changed, 24 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 5e2eee16fd67b..4e86c0f7736af 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; import java.util.function.LongSupplier; -import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -43,8 +42,10 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; - private final Map acquiredIndexCommits; // Number of references held against each commit point. - private final Set indexCommitsAcquiredByCommitsListener; + // Number of references held against each commit point acquired externally. + private final Map externallyAcquiredIndexCommits; + // Number of references held against each commit point acquired by the commits listener. + private final Map internallyAcquiredIndexCommits; interface CommitsListener { @@ -73,8 +74,8 @@ interface CommitsListener { this.softDeletesPolicy = softDeletesPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; - this.acquiredIndexCommits = new HashMap<>(); - this.indexCommitsAcquiredByCommitsListener = new HashSet<>(); + this.externallyAcquiredIndexCommits = new HashMap<>(); + this.internallyAcquiredIndexCommits = new HashMap<>(); } @Override @@ -123,7 +124,8 @@ public void onCommit(List commits) throws IOException { } for (int i = 0; i < keptPosition; i++) { final IndexCommit commit = commits.get(i); - if (acquiredIndexCommits.containsKey(commit) == false) { + if (externallyAcquiredIndexCommits.containsKey(commit) == false + && internallyAcquiredIndexCommits.containsKey(commit) == false) { deleteCommit(commit); if (deletedCommits == null) { deletedCommits = new ArrayList<>(); @@ -216,23 +218,17 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { return acquireIndexCommit(acquiringSafeCommit, false); } - private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredByCommitsListener) { + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInernally) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; - acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - if (acquiredByCommitsListener) { - indexCommitsAcquiredByCommitsListener.add(snapshotting); - } - return wrapCommit(snapshotting, acquiredByCommitsListener); - } - - protected IndexCommit wrapCommit(IndexCommit indexCommit) { - return wrapCommit(indexCommit, false); + var commits = acquiredInernally ? internallyAcquiredIndexCommits : externallyAcquiredIndexCommits; + commits.merge(snapshotting, 1, Integer::sum); // increase refCount + return wrapCommit(snapshotting, acquiredInernally); } - protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredByCommitsListener) { - return new SnapshotIndexCommit(indexCommit, acquiredByCommitsListener); + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) { + return new SnapshotIndexCommit(indexCommit, acquiredInternally); } /** @@ -243,24 +239,16 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredByComm synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); - assert acquiredIndexCommits.containsKey(releasingCommit) - : "Release non-acquired commit;" - + "acquired commits [" - + acquiredIndexCommits - + "], releasing commit [" - + releasingCommit - + "]"; + var commits = snapshotIndexCommit.acquiredInternally ? internallyAcquiredIndexCommits : externallyAcquiredIndexCommits; + assert commits.containsKey(releasingCommit) + : "Release non-acquired commit;" + "acquired commits [" + commits + "], releasing commit [" + releasingCommit + "]"; // release refCount - final Integer refCount = acquiredIndexCommits.compute(releasingCommit, (key, count) -> { + final Integer refCount = commits.compute(releasingCommit, (key, count) -> { if (count == 1) { return null; } return count - 1; }); - if (snapshotIndexCommit.acquiredByCommitsListener) { - indexCommitsAcquiredByCommitsListener.remove(releasingCommit); - } - assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. return refCount == null && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false; @@ -317,8 +305,8 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit * Checks whether the deletion policy is holding on to acquired index commits */ synchronized boolean hasAcquiredIndexCommits() { - return acquiredIndexCommits.isEmpty() == false - && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(indexCommitsAcquiredByCommitsListener::contains)); + // We excplcetly check only external commits and disregard commits created by the commits listener. + return externallyAcquiredIndexCommits.isEmpty() == false; } /** @@ -340,11 +328,11 @@ public static String commitDescription(IndexCommit commit) throws IOException { */ private static class SnapshotIndexCommit extends FilterIndexCommit { - private final boolean acquiredByCommitsListener; + private final boolean acquiredInternally; - SnapshotIndexCommit(IndexCommit delegate, boolean acquiredByCommitsListener) { + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) { super(delegate); - this.acquiredByCommitsListener = acquiredByCommitsListener; + this.acquiredInternally = acquiredInternally; } @Override diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 176cb50f78e0f..ce30f1ef371d7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -350,7 +350,7 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { @Override synchronized boolean releaseCommit(IndexCommit acquiredCommit) { - return super.releaseCommit(wrapCommit(acquiredCommit)); + return super.releaseCommit(wrapCommit(acquiredCommit, true)); } }; From f3fafb63729b22bd215ca21732874bf426b5cc5b Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Thu, 5 Sep 2024 09:15:29 +0200 Subject: [PATCH 03/17] Add additional comment about the purpose of internallyAcquiredIndexCommits --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 4e86c0f7736af..a5ff8f5d2acb5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -45,6 +45,8 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { // Number of references held against each commit point acquired externally. private final Map externallyAcquiredIndexCommits; // Number of references held against each commit point acquired by the commits listener. + // We want to track them separately to be able to disregard them when checking for externally acquired index commits + // that haven't been released. private final Map internallyAcquiredIndexCommits; interface CommitsListener { From 466572ab17902d550d1092922352434128680016 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:17:55 +0200 Subject: [PATCH 04/17] Revert "Add additional comment about the purpose of internallyAcquiredIndexCommits" This reverts commit f3fafb63729b22bd215ca21732874bf426b5cc5b. --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index a5ff8f5d2acb5..4e86c0f7736af 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -45,8 +45,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { // Number of references held against each commit point acquired externally. private final Map externallyAcquiredIndexCommits; // Number of references held against each commit point acquired by the commits listener. - // We want to track them separately to be able to disregard them when checking for externally acquired index commits - // that haven't been released. private final Map internallyAcquiredIndexCommits; interface CommitsListener { From 53f9a1b1c39150d12071bd933a1b5a97b6413140 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:18:11 +0200 Subject: [PATCH 05/17] Revert "Track internal commits separately" This reverts commit 2fa372142804ba1f18d61dd012876690fbb26a83. --- .../index/engine/CombinedDeletionPolicy.java | 58 +++++++++++-------- .../engine/CombinedDeletionPolicyTests.java | 2 +- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 4e86c0f7736af..5e2eee16fd67b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.function.LongSupplier; +import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -42,10 +43,8 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; - // Number of references held against each commit point acquired externally. - private final Map externallyAcquiredIndexCommits; - // Number of references held against each commit point acquired by the commits listener. - private final Map internallyAcquiredIndexCommits; + private final Map acquiredIndexCommits; // Number of references held against each commit point. + private final Set indexCommitsAcquiredByCommitsListener; interface CommitsListener { @@ -74,8 +73,8 @@ interface CommitsListener { this.softDeletesPolicy = softDeletesPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; - this.externallyAcquiredIndexCommits = new HashMap<>(); - this.internallyAcquiredIndexCommits = new HashMap<>(); + this.acquiredIndexCommits = new HashMap<>(); + this.indexCommitsAcquiredByCommitsListener = new HashSet<>(); } @Override @@ -124,8 +123,7 @@ public void onCommit(List commits) throws IOException { } for (int i = 0; i < keptPosition; i++) { final IndexCommit commit = commits.get(i); - if (externallyAcquiredIndexCommits.containsKey(commit) == false - && internallyAcquiredIndexCommits.containsKey(commit) == false) { + if (acquiredIndexCommits.containsKey(commit) == false) { deleteCommit(commit); if (deletedCommits == null) { deletedCommits = new ArrayList<>(); @@ -218,17 +216,23 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { return acquireIndexCommit(acquiringSafeCommit, false); } - private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInernally) { + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredByCommitsListener) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; - var commits = acquiredInernally ? internallyAcquiredIndexCommits : externallyAcquiredIndexCommits; - commits.merge(snapshotting, 1, Integer::sum); // increase refCount - return wrapCommit(snapshotting, acquiredInernally); + acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount + if (acquiredByCommitsListener) { + indexCommitsAcquiredByCommitsListener.add(snapshotting); + } + return wrapCommit(snapshotting, acquiredByCommitsListener); + } + + protected IndexCommit wrapCommit(IndexCommit indexCommit) { + return wrapCommit(indexCommit, false); } - protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) { - return new SnapshotIndexCommit(indexCommit, acquiredInternally); + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredByCommitsListener) { + return new SnapshotIndexCommit(indexCommit, acquiredByCommitsListener); } /** @@ -239,16 +243,24 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); - var commits = snapshotIndexCommit.acquiredInternally ? internallyAcquiredIndexCommits : externallyAcquiredIndexCommits; - assert commits.containsKey(releasingCommit) - : "Release non-acquired commit;" + "acquired commits [" + commits + "], releasing commit [" + releasingCommit + "]"; + assert acquiredIndexCommits.containsKey(releasingCommit) + : "Release non-acquired commit;" + + "acquired commits [" + + acquiredIndexCommits + + "], releasing commit [" + + releasingCommit + + "]"; // release refCount - final Integer refCount = commits.compute(releasingCommit, (key, count) -> { + final Integer refCount = acquiredIndexCommits.compute(releasingCommit, (key, count) -> { if (count == 1) { return null; } return count - 1; }); + if (snapshotIndexCommit.acquiredByCommitsListener) { + indexCommitsAcquiredByCommitsListener.remove(releasingCommit); + } + assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. return refCount == null && releasingCommit.equals(safeCommit) == false && releasingCommit.equals(lastCommit) == false; @@ -305,8 +317,8 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit * Checks whether the deletion policy is holding on to acquired index commits */ synchronized boolean hasAcquiredIndexCommits() { - // We excplcetly check only external commits and disregard commits created by the commits listener. - return externallyAcquiredIndexCommits.isEmpty() == false; + return acquiredIndexCommits.isEmpty() == false + && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(indexCommitsAcquiredByCommitsListener::contains)); } /** @@ -328,11 +340,11 @@ public static String commitDescription(IndexCommit commit) throws IOException { */ private static class SnapshotIndexCommit extends FilterIndexCommit { - private final boolean acquiredInternally; + private final boolean acquiredByCommitsListener; - SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) { + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredByCommitsListener) { super(delegate); - this.acquiredInternally = acquiredInternally; + this.acquiredByCommitsListener = acquiredByCommitsListener; } @Override diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index ce30f1ef371d7..176cb50f78e0f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -350,7 +350,7 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { @Override synchronized boolean releaseCommit(IndexCommit acquiredCommit) { - return super.releaseCommit(wrapCommit(acquiredCommit, true)); + return super.releaseCommit(wrapCommit(acquiredCommit)); } }; From 321281fb9e09311d1b7c58c2397796dae39b7f16 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:40:33 +0200 Subject: [PATCH 06/17] Track internal commits separately --- .../index/engine/CombinedDeletionPolicy.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 5e2eee16fd67b..e4507abf5eb97 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -44,7 +44,9 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { private final SoftDeletesPolicy softDeletesPolicy; private final LongSupplier globalCheckpointSupplier; private final Map acquiredIndexCommits; // Number of references held against each commit point. - private final Set indexCommitsAcquiredByCommitsListener; + // Index commits internally acquired by the commits listener. We want to track them separately to be able to disregard them + // when checking for externally acquired index commits that haven't been released + private final Set internallyAcquiredIndexCommits; interface CommitsListener { @@ -74,7 +76,7 @@ interface CommitsListener { this.globalCheckpointSupplier = globalCheckpointSupplier; this.commitsListener = commitsListener; this.acquiredIndexCommits = new HashMap<>(); - this.indexCommitsAcquiredByCommitsListener = new HashSet<>(); + this.internallyAcquiredIndexCommits = new HashSet<>(); } @Override @@ -216,23 +218,23 @@ synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { return acquireIndexCommit(acquiringSafeCommit, false); } - private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredByCommitsListener) { + private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, boolean acquiredInternally) { assert safeCommit != null : "Safe commit is not initialized yet"; assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - if (acquiredByCommitsListener) { - indexCommitsAcquiredByCommitsListener.add(snapshotting); + if (acquiredInternally) { + internallyAcquiredIndexCommits.add(snapshotting); } - return wrapCommit(snapshotting, acquiredByCommitsListener); + return wrapCommit(snapshotting, acquiredInternally); } protected IndexCommit wrapCommit(IndexCommit indexCommit) { return wrapCommit(indexCommit, false); } - protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredByCommitsListener) { - return new SnapshotIndexCommit(indexCommit, acquiredByCommitsListener); + protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredInternally) { + return new SnapshotIndexCommit(indexCommit, acquiredInternally); } /** @@ -257,8 +259,9 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); - if (snapshotIndexCommit.acquiredByCommitsListener) { - indexCommitsAcquiredByCommitsListener.remove(releasingCommit); + if (snapshotIndexCommit.acquiredInternally) { + boolean removed = internallyAcquiredIndexCommits.remove(releasingCommit); + assert removed : "Try to release a non-acquired internal commit [" + releasingCommit + "]"; } assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; @@ -317,8 +320,9 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit * Checks whether the deletion policy is holding on to acquired index commits */ synchronized boolean hasAcquiredIndexCommits() { + // We explicitly check only external commits and disregard internal commits acquired by the commits listener return acquiredIndexCommits.isEmpty() == false - && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(indexCommitsAcquiredByCommitsListener::contains)); + && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(internallyAcquiredIndexCommits::contains)); } /** @@ -340,11 +344,11 @@ public static String commitDescription(IndexCommit commit) throws IOException { */ private static class SnapshotIndexCommit extends FilterIndexCommit { - private final boolean acquiredByCommitsListener; + private final boolean acquiredInternally; - SnapshotIndexCommit(IndexCommit delegate, boolean acquiredByCommitsListener) { + SnapshotIndexCommit(IndexCommit delegate, boolean acquiredInternally) { super(delegate); - this.acquiredByCommitsListener = acquiredByCommitsListener; + this.acquiredInternally = acquiredInternally; } @Override From a29454def676a494db73e2785bd6334ed2eedbb6 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:42:25 +0200 Subject: [PATCH 07/17] Rename to hasExternallyAcquiredIndexCommits --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 +- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++-- .../java/org/elasticsearch/index/engine/EngineTestCase.java | 4 ++-- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 2 +- .../xpack/ccr/repository/CcrRestoreSourceServiceTests.java | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index e4507abf5eb97..d26c5d7ad113b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -319,7 +319,7 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit /** * Checks whether the deletion policy is holding on to acquired index commits */ - synchronized boolean hasAcquiredIndexCommits() { + synchronized boolean hasExternallyAcquiredIndexCommits() { // We explicitly check only external commits and disregard internal commits acquired by the commits listener return acquiredIndexCommits.isEmpty() == false && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(internallyAcquiredIndexCommits::contains)); diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9743ee977a8c4..8cf8b5d812f2b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,8 +669,8 @@ Translog getTranslog() { } // Package private for testing purposes only - boolean hasAcquiredIndexCommits() { - return combinedDeletionPolicy.hasAcquiredIndexCommits(); + boolean hasExternallyAcquiredIndexCommits() { + return combinedDeletionPolicy.hasExternallyAcquiredIndexCommits(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8412e9e250885..1081795950efd 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo))); } - public static boolean hasAcquiredIndexCommits(Engine engine) { + public static boolean hasExternallyAcquiredIndexCommits(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.hasAcquiredIndexCommits(); + return internalEngine.hasExternallyAcquiredIndexCommits(); } public static final class PrimaryTermSupplier implements LongSupplier { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 38b8dfecc0b5e..ae3c94b5faa7e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1368,7 +1368,7 @@ private void assertNoAcquiredIndexCommit() throws Exception { if (engine instanceof InternalEngine) { assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", - EngineTestCase.hasAcquiredIndexCommits(engine) + EngineTestCase.hasExternallyAcquiredIndexCommits(engine) ); } } catch (AlreadyClosedException ignored) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 99344f22bae31..038c19ae2cb7a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false)); } - assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertTrue(EngineTestCase.hasExternallyAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); - assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertFalse(EngineTestCase.hasExternallyAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); closeShards(indexShard); // Exception will be thrown if file is not closed. From 98a8df87865a0418f9ac5507cfe937de302ecb0a Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:42:58 +0200 Subject: [PATCH 08/17] Add a comment about hasExternallyAcquiredIndexCommits --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index d26c5d7ad113b..78c1fd2d2b0a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -317,7 +317,7 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit } /** - * Checks whether the deletion policy is holding on to acquired index commits + * Checks whether the deletion policy is holding on to externally acquired index commits */ synchronized boolean hasExternallyAcquiredIndexCommits() { // We explicitly check only external commits and disregard internal commits acquired by the commits listener From f0193a69699c7d584dda494ec2ed1f321fd1f973 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:45:21 +0200 Subject: [PATCH 09/17] Update error message about non-internal commits --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 78c1fd2d2b0a1..e7b56f193a5bf 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -261,7 +261,7 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { }); if (snapshotIndexCommit.acquiredInternally) { boolean removed = internallyAcquiredIndexCommits.remove(releasingCommit); - assert removed : "Try to release a non-acquired internal commit [" + releasingCommit + "]"; + assert removed : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; } assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; From c6d86625968377309a9828e118a57fbf95e52502 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 01:46:25 +0200 Subject: [PATCH 10/17] Simplify hasExternallyAcquiredIndexCommits --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index e7b56f193a5bf..3ddcb3a6370a0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -321,8 +321,7 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit */ synchronized boolean hasExternallyAcquiredIndexCommits() { // We explicitly check only external commits and disregard internal commits acquired by the commits listener - return acquiredIndexCommits.isEmpty() == false - && acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(internallyAcquiredIndexCommits::contains)); + return acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(internallyAcquiredIndexCommits::contains)); } /** From 2bbfdb825199f8d6766e17b620297e81ae711d28 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 10:28:53 +0200 Subject: [PATCH 11/17] Add a check for the amount of references on the commit --- .../index/engine/CombinedDeletionPolicy.java | 21 +++++++++++++------ .../index/engine/InternalEngine.java | 4 ++-- .../index/engine/EngineTestCase.java | 4 ++-- .../test/InternalTestCluster.java | 2 +- .../CcrRestoreSourceServiceTests.java | 4 ++-- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 3ddcb3a6370a0..eda0b8f3c6f15 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.common.lucene.FilterIndexCommit; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -28,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.function.LongSupplier; -import java.util.function.Predicate; import java.util.stream.Collectors; /** @@ -223,7 +223,7 @@ private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - if (acquiredInternally) { + if (Assertions.ENABLED && acquiredInternally) { internallyAcquiredIndexCommits.add(snapshotting); } return wrapCommit(snapshotting, acquiredInternally); @@ -259,7 +259,7 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); - if (snapshotIndexCommit.acquiredInternally) { + if (Assertions.ENABLED && snapshotIndexCommit.acquiredInternally) { boolean removed = internallyAcquiredIndexCommits.remove(releasingCommit); assert removed : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; } @@ -319,9 +319,18 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit /** * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasExternallyAcquiredIndexCommits() { - // We explicitly check only external commits and disregard internal commits acquired by the commits listener - return acquiredIndexCommits.keySet().stream().anyMatch(Predicate.not(internallyAcquiredIndexCommits::contains)); + synchronized boolean hasAcquiredIndexCommits() { + if (Assertions.ENABLED) { + // We explicitly check only external commits and disregard internal commits acquired by the commits listener + for (var e : acquiredIndexCommits.entrySet()) { + if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { + return true; + } + } + return false; + } else { + return acquiredIndexCommits.isEmpty() == false; + } } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8cf8b5d812f2b..9743ee977a8c4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,8 +669,8 @@ Translog getTranslog() { } // Package private for testing purposes only - boolean hasExternallyAcquiredIndexCommits() { - return combinedDeletionPolicy.hasExternallyAcquiredIndexCommits(); + boolean hasAcquiredIndexCommits() { + return combinedDeletionPolicy.hasAcquiredIndexCommits(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 1081795950efd..8412e9e250885 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo))); } - public static boolean hasExternallyAcquiredIndexCommits(Engine engine) { + public static boolean hasAcquiredIndexCommits(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.hasExternallyAcquiredIndexCommits(); + return internalEngine.hasAcquiredIndexCommits(); } public static final class PrimaryTermSupplier implements LongSupplier { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index ae3c94b5faa7e..38b8dfecc0b5e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1368,7 +1368,7 @@ private void assertNoAcquiredIndexCommit() throws Exception { if (engine instanceof InternalEngine) { assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", - EngineTestCase.hasExternallyAcquiredIndexCommits(engine) + EngineTestCase.hasAcquiredIndexCommits(engine) ); } } catch (AlreadyClosedException ignored) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 038c19ae2cb7a..99344f22bae31 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false)); } - assertTrue(EngineTestCase.hasExternallyAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); - assertFalse(EngineTestCase.hasExternallyAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); closeShards(indexShard); // Exception will be thrown if file is not closed. From a54f30542521e16002f8b8a9f0d173ef1d02340b Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 12:21:17 +0200 Subject: [PATCH 12/17] Use hasAcquiredIndexCommitsForTesting --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 +- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++-- .../java/org/elasticsearch/index/engine/EngineTestCase.java | 4 ++-- .../main/java/org/elasticsearch/test/InternalTestCluster.java | 2 +- .../xpack/ccr/repository/CcrRestoreSourceServiceTests.java | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index eda0b8f3c6f15..f3e6685169d68 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -319,7 +319,7 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit /** * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasAcquiredIndexCommits() { + synchronized boolean hasAcquiredIndexCommitsForTesting() { if (Assertions.ENABLED) { // We explicitly check only external commits and disregard internal commits acquired by the commits listener for (var e : acquiredIndexCommits.entrySet()) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9743ee977a8c4..7c456f55ac8ad 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -669,8 +669,8 @@ Translog getTranslog() { } // Package private for testing purposes only - boolean hasAcquiredIndexCommits() { - return combinedDeletionPolicy.hasAcquiredIndexCommits(); + boolean hasAcquiredIndexCommitsForTesting() { + return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8412e9e250885..5387108592b10 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -1440,10 +1440,10 @@ public static void waitForOpsToComplete(InternalEngine engine, long seqNo) throw assertBusy(() -> assertThat(engine.getLocalCheckpointTracker().getProcessedCheckpoint(), greaterThanOrEqualTo(seqNo))); } - public static boolean hasAcquiredIndexCommits(Engine engine) { + public static boolean hasAcquiredIndexCommitsForTesting(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have snapshotted commits, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; - return internalEngine.hasAcquiredIndexCommits(); + return internalEngine.hasAcquiredIndexCommitsForTesting(); } public static final class PrimaryTermSupplier implements LongSupplier { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 38b8dfecc0b5e..6c52f493c135c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1368,7 +1368,7 @@ private void assertNoAcquiredIndexCommit() throws Exception { if (engine instanceof InternalEngine) { assertFalse( indexShard.routingEntry().toString() + " has unreleased snapshotted index commits", - EngineTestCase.hasAcquiredIndexCommits(engine) + EngineTestCase.hasAcquiredIndexCommitsForTesting(engine) ); } } catch (AlreadyClosedException ignored) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 99344f22bae31..f577ccd4e5a44 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -215,9 +215,9 @@ public void testGetSessionDoesNotLeakFileIfClosed() throws IOException { sessionReader.readFileBytes(files.get(1).name(), MockBigArrays.NON_RECYCLING_INSTANCE.newByteArray(10, false)); } - assertTrue(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertTrue(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); restoreSourceService.closeSession(sessionUUID); - assertFalse(EngineTestCase.hasAcquiredIndexCommits(IndexShardTestCase.getEngine(indexShard))); + assertFalse(EngineTestCase.hasAcquiredIndexCommitsForTesting(IndexShardTestCase.getEngine(indexShard))); closeShards(indexShard); // Exception will be thrown if file is not closed. From 1c55fd1caf72357a00b96d524cac33ac49216418 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 9 Sep 2024 12:21:47 +0200 Subject: [PATCH 13/17] Remove extra if in hasAcquiredCommitsForTesting --- .../index/engine/CombinedDeletionPolicy.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index f3e6685169d68..0418a34210d55 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -320,17 +320,13 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit * Checks whether the deletion policy is holding on to externally acquired index commits */ synchronized boolean hasAcquiredIndexCommitsForTesting() { - if (Assertions.ENABLED) { - // We explicitly check only external commits and disregard internal commits acquired by the commits listener - for (var e : acquiredIndexCommits.entrySet()) { - if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { - return true; - } + // We explicitly check only external commits and disregard internal commits acquired by the commits listener + for (var e : acquiredIndexCommits.entrySet()) { + if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { + return true; } - return false; - } else { - return acquiredIndexCommits.isEmpty() == false; } + return false; } /** From 3fedaae8ac5898f3f9447ff96fde91e9b6e72014 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 10 Sep 2024 09:01:11 +0200 Subject: [PATCH 14/17] Update server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java Co-authored-by: Yang Wang --- .../org/elasticsearch/index/engine/CombinedDeletionPolicy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 0418a34210d55..13ab84c6aa598 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -243,7 +243,7 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern * @return true if the acquired commit can be clean up. */ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { - SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; + final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) : "Release non-acquired commit;" From c6d986cf5a59585b90e54e6b0c0d5aa5f101924b Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 10 Sep 2024 09:01:19 +0200 Subject: [PATCH 15/17] Update server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java Co-authored-by: Yang Wang --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 13ab84c6aa598..ee10cb49a162c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -259,10 +259,7 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); - if (Assertions.ENABLED && snapshotIndexCommit.acquiredInternally) { - boolean removed = internallyAcquiredIndexCommits.remove(releasingCommit); - assert removed : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; - } + assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit) : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit. From 494bf43baeaf787444c216bd175ffb8ee6fae995 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 10 Sep 2024 09:01:26 +0200 Subject: [PATCH 16/17] Update server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java Co-authored-by: Yang Wang --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index ee10cb49a162c..661cb7f5c0ec2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -223,9 +223,7 @@ private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - if (Assertions.ENABLED && acquiredInternally) { - internallyAcquiredIndexCommits.add(snapshotting); - } + assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting) : "commit [" + snapshotting + "] already added"; return wrapCommit(snapshotting, acquiredInternally); } From 89224cef58ffda8dab0295b12ad539bc174ef4b2 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 10 Sep 2024 09:30:43 +0200 Subject: [PATCH 17/17] Run spotless --- .../elasticsearch/index/engine/CombinedDeletionPolicy.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 661cb7f5c0ec2..43b0c27d30580 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -13,7 +13,6 @@ import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.SegmentInfos; import org.elasticsearch.common.lucene.FilterIndexCommit; -import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; @@ -223,7 +222,8 @@ private synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit, assert lastCommit != null : "Last commit is not initialized yet"; final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; acquiredIndexCommits.merge(snapshotting, 1, Integer::sum); // increase refCount - assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting) : "commit [" + snapshotting + "] already added"; + assert acquiredInternally == false || internallyAcquiredIndexCommits.add(snapshotting) + : "commit [" + snapshotting + "] already added"; return wrapCommit(snapshotting, acquiredInternally); } @@ -257,7 +257,8 @@ synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { } return count - 1; }); - assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit) : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; + assert snapshotIndexCommit.acquiredInternally == false || internallyAcquiredIndexCommits.remove(releasingCommit) + : "Trying to release a commit [" + releasingCommit + "] that hasn't been previously acquired internally"; assert refCount == null || refCount > 0 : "Number of references for acquired commit can not be negative [" + refCount + "]"; // The commit can be clean up only if no refCount and it is neither the safe commit nor last commit.