From 76e8b65eb619d58f7ff3af1a98f349feeb6e2ded Mon Sep 17 00:00:00 2001 From: Kirill Sizov Date: Tue, 14 Apr 2026 17:25:27 +0300 Subject: [PATCH] IGNITE-28543 Throttle VacuumTxStateReplicaRequests --- .../tx/impl/PersistentTxStateVacuumizer.java | 85 ++++++--- .../impl/PersistentTxStateVacuumizerTest.java | 178 ++++++++++++++++++ 2 files changed, 239 insertions(+), 24 deletions(-) create mode 100644 modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java index 159483b0a9a..21b681ec893 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +61,9 @@ public class PersistentTxStateVacuumizer { private static final String VACUUM_THROTTLE_KEY = "vacuum-failed"; + /** Maximum number of transaction IDs per vacuum request to avoid serialization timeouts. */ + static final int VACUUM_BATCH_SIZE = 1000; + private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory(); private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); @@ -134,30 +138,13 @@ public CompletableFuture vacuumPersistentTxStates return nullCompletedFuture(); } - VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest() - .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()) - .groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId)) - .transactionIds(filteredTxIds) - .build(); - - return replicaService.invoke(localNode, request).whenComplete((v, e) -> { - if (e == null) { - successful.addAll(filteredTxIds); - vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size()); - } else if (expectedException(e)) { - // We can log the exceptions without further handling because failed requests' txns are not added - // to the set of successful and will be retried. PrimaryReplicaMissException can be considered as - // a part of regular flow and doesn't need to be logged. NodeStoppingException should be ignored as - // vacuumization will be retried after restart. - LOG.debug("Failed to vacuum tx states from the persistent storage.", e); - } else { - // In general, even though this vacuum round has completed unsuccessfully, - // due to ReplicationTimeoutException for instance, - // it does not mean that correctness is violated, and we need to shutdown the node. - // Perhaps the next attempt will be successful. - throttledLogger.warn(VACUUM_THROTTLE_KEY, "Failed to vacuum tx states from the persistent storage.", e); - } - }); + return sendBatchedVacuumRequests( + replicaMeta.getStartTime().longValue(), + commitPartitionId, + filteredTxIds, + successful, + vacuumizedPersistentTxnStatesCount + ); } else { successful.addAll(txs.stream().map(v -> v.txId).collect(toSet())); @@ -172,6 +159,56 @@ public CompletableFuture vacuumPersistentTxStates .handle((unused, unusedEx) -> new PersistentTxStateVacuumResult(successful, vacuumizedPersistentTxnStatesCount.get())); } + private CompletableFuture sendBatchedVacuumRequests( + long enlistmentConsistencyToken, + ZonePartitionId commitPartitionId, + Set txIds, + Set successful, + AtomicInteger vacuumizedCount + ) { + List> batchFutures = new ArrayList<>(); + Iterator it = txIds.iterator(); + + while (it.hasNext()) { + Set batch = new HashSet<>(Math.min(VACUUM_BATCH_SIZE, txIds.size())); + + for (int j = 0; j < VACUUM_BATCH_SIZE && it.hasNext(); j++) { + batch.add(it.next()); + } + + batchFutures.add(sendVacuumBatch(enlistmentConsistencyToken, commitPartitionId, batch, successful, vacuumizedCount)); + } + + return allOf(batchFutures); + } + + private CompletableFuture sendVacuumBatch( + long enlistmentConsistencyToken, + ZonePartitionId commitPartitionId, + Set batch, + Set successful, + AtomicInteger vacuumizedCount + ) { + VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest() + .enlistmentConsistencyToken(enlistmentConsistencyToken) + .groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId)) + .transactionIds(batch) + .build(); + + return replicaService.invoke(localNode, request).whenComplete((v, e) -> { + if (e == null) { + successful.addAll(batch); + vacuumizedCount.addAndGet(batch.size()); + } else if (expectedException(e)) { + // Failed requests' txns are not added to the set of successful and will be retried. + LOG.debug("Failed to vacuum tx states from the persistent storage.", e); + } else { + throttledLogger.warn(VACUUM_THROTTLE_KEY, + "Failed to vacuum tx states from the persistent storage.", e); + } + }); + } + private static boolean expectedException(Throwable e) { return hasCause(e, PrimaryReplicaMissException.class, diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java new file mode 100644 index 00000000000..a6386c227aa --- /dev/null +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VACUUM_BATCH_SIZE; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.TestClockService; +import org.apache.ignite.internal.network.InternalClusterNode; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult; +import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx; +import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class PersistentTxStateVacuumizerTest extends BaseIgniteAbstractTest { + @Mock + private ReplicaService replicaService; + + @Mock + private InternalClusterNode localNode; + + @Mock + private PlacementDriver placementDriver; + + @Captor + private ArgumentCaptor requestCaptor; + + private PersistentTxStateVacuumizer vacuumizer; + + @BeforeEach + void setUp() { + UUID localNodeId = UUID.randomUUID(); + when(localNode.id()).thenReturn(localNodeId); + when(localNode.name()).thenReturn("test-node"); + + TestReplicaMetaImpl replicaMeta = new TestReplicaMetaImpl(localNode.name(), localNodeId); + + when(placementDriver.getPrimaryReplica(any(), any())) + .thenReturn(completedFuture(replicaMeta)); + lenient().when(replicaService.invoke(any(InternalClusterNode.class), requestCaptor.capture())) + .thenReturn(completedFuture(null)); + + vacuumizer = new PersistentTxStateVacuumizer( + replicaService, + localNode, + new TestClockService(new HybridClockImpl()), + placementDriver + ); + } + + @Test + void smallBatchSentAsSingleRequest() { + int count = 10; + Map> txIds = createTxIds(count); + + PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join(); + + assertEquals(count, result.vacuumizedPersistentTxnStatesCount); + assertEquals(count, result.txnsToVacuum.size()); + assertEquals(1, requestCaptor.getAllValues().size()); + assertEquals(count, requestCaptor.getValue().transactionIds().size()); + } + + @Test + void largeBatchIsSplitIntoMultipleRequests() { + int count = VACUUM_BATCH_SIZE * 3 + 1; + Map> txIds = createTxIds(count); + + PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join(); + + assertEquals(count, result.vacuumizedPersistentTxnStatesCount); + assertEquals(count, result.txnsToVacuum.size()); + + // Should be split into 4 requests. + assertEquals(4, requestCaptor.getAllValues().size()); + + // Each request should have at most VACUUM_BATCH_SIZE tx IDs. + assertThat( + requestCaptor.getAllValues().stream().map(r -> r.transactionIds().size()).toList(), + everyItem(lessThanOrEqualTo(VACUUM_BATCH_SIZE)) + ); + + // All tx IDs should be covered. + Set allSentIds = new HashSet<>(); + for (VacuumTxStateReplicaRequest req : requestCaptor.getAllValues()) { + allSentIds.addAll(req.transactionIds()); + } + assertEquals(count, allSentIds.size()); + } + + @Test + void partialBatchFailureDoesNotAffectOtherBatches() { + int count = VACUUM_BATCH_SIZE * 2; + Map> txIds = createTxIds(count); + + // First invocation succeeds, second fails. + when(replicaService.invoke(any(InternalClusterNode.class), any(VacuumTxStateReplicaRequest.class))) + .thenReturn(completedFuture(null)) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("test failure"))); + + PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join(); + + // Only the first batch's tx IDs should be in the successful set. + assertEquals(VACUUM_BATCH_SIZE, result.vacuumizedPersistentTxnStatesCount); + assertEquals(VACUUM_BATCH_SIZE, result.txnsToVacuum.size()); + } + + @Test + void txsWithoutCleanupTimestampAreSuccessfulWithoutRequest() { + ZonePartitionId partitionId = new ZonePartitionId(1, 0); + Set txs = new HashSet<>(); + + // Tx without cleanup timestamp — should go directly to successful. + UUID txWithoutCleanup = UUID.randomUUID(); + txs.add(new VacuumizableTx(txWithoutCleanup, null)); + + Map> txIds = Map.of(partitionId, txs); + + PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join(); + + assertTrue(result.txnsToVacuum.contains(txWithoutCleanup)); + assertEquals(0, result.vacuumizedPersistentTxnStatesCount); + // No requests should be sent since the only tx has no cleanup timestamp. + assertEquals(0, requestCaptor.getAllValues().size()); + } + + private static Map> createTxIds(int count) { + ZonePartitionId partitionId = new ZonePartitionId(1, 0); + Set txs = new HashSet<>(); + + for (int i = 0; i < count; i++) { + txs.add(new VacuumizableTx(UUID.randomUUID(), 1L)); + } + + return Map.of(partitionId, txs); + } +}