Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -60,6 +61,9 @@ public class PersistentTxStateVacuumizer {

private static final String VACUUM_THROTTLE_KEY = "vacuum-failed";

/** Maximum number of transaction IDs per vacuum request to avoid serialization timeouts. */
static final int VACUUM_BATCH_SIZE = 1000;

private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();

private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
Expand Down Expand Up @@ -134,30 +138,13 @@ public CompletableFuture<PersistentTxStateVacuumResult> vacuumPersistentTxStates
return nullCompletedFuture();
}

VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId))
.transactionIds(filteredTxIds)
.build();

return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
if (e == null) {
successful.addAll(filteredTxIds);
vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
} else if (expectedException(e)) {
// We can log the exceptions without further handling because failed requests' txns are not added
// to the set of successful and will be retried. PrimaryReplicaMissException can be considered as
// a part of regular flow and doesn't need to be logged. NodeStoppingException should be ignored as
// vacuumization will be retried after restart.
LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
} else {
// In general, even though this vacuum round has completed unsuccessfully,
// due to ReplicationTimeoutException for instance,
// it does not mean that correctness is violated, and we need to shutdown the node.
// Perhaps the next attempt will be successful.
throttledLogger.warn(VACUUM_THROTTLE_KEY, "Failed to vacuum tx states from the persistent storage.", e);
}
});
return sendBatchedVacuumRequests(
replicaMeta.getStartTime().longValue(),
commitPartitionId,
filteredTxIds,
successful,
vacuumizedPersistentTxnStatesCount
);
} else {
successful.addAll(txs.stream().map(v -> v.txId).collect(toSet()));

Expand All @@ -172,6 +159,56 @@ public CompletableFuture<PersistentTxStateVacuumResult> vacuumPersistentTxStates
.handle((unused, unusedEx) -> new PersistentTxStateVacuumResult(successful, vacuumizedPersistentTxnStatesCount.get()));
}

private CompletableFuture<Void> sendBatchedVacuumRequests(
long enlistmentConsistencyToken,
ZonePartitionId commitPartitionId,
Set<UUID> txIds,
Set<UUID> successful,
AtomicInteger vacuumizedCount
) {
List<CompletableFuture<?>> batchFutures = new ArrayList<>();
Iterator<UUID> it = txIds.iterator();

while (it.hasNext()) {
Set<UUID> batch = new HashSet<>(Math.min(VACUUM_BATCH_SIZE, txIds.size()));

for (int j = 0; j < VACUUM_BATCH_SIZE && it.hasNext(); j++) {
batch.add(it.next());
}

batchFutures.add(sendVacuumBatch(enlistmentConsistencyToken, commitPartitionId, batch, successful, vacuumizedCount));
}

return allOf(batchFutures);
}

private CompletableFuture<?> sendVacuumBatch(
long enlistmentConsistencyToken,
ZonePartitionId commitPartitionId,
Set<UUID> batch,
Set<UUID> successful,
AtomicInteger vacuumizedCount
) {
VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
.enlistmentConsistencyToken(enlistmentConsistencyToken)
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId))
.transactionIds(batch)
.build();

return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
if (e == null) {
successful.addAll(batch);
vacuumizedCount.addAndGet(batch.size());
} else if (expectedException(e)) {
// Failed requests' txns are not added to the set of successful and will be retried.
LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
} else {
throttledLogger.warn(VACUUM_THROTTLE_KEY,
"Failed to vacuum tx states from the persistent storage.", e);
}
});
}

private static boolean expectedException(Throwable e) {
return hasCause(e,
PrimaryReplicaMissException.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.tx.impl;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VACUUM_BATCH_SIZE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult;
import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class PersistentTxStateVacuumizerTest extends BaseIgniteAbstractTest {
@Mock
private ReplicaService replicaService;

@Mock
private InternalClusterNode localNode;

@Mock
private PlacementDriver placementDriver;

@Captor
private ArgumentCaptor<VacuumTxStateReplicaRequest> requestCaptor;

private PersistentTxStateVacuumizer vacuumizer;

@BeforeEach
void setUp() {
UUID localNodeId = UUID.randomUUID();
when(localNode.id()).thenReturn(localNodeId);
when(localNode.name()).thenReturn("test-node");

TestReplicaMetaImpl replicaMeta = new TestReplicaMetaImpl(localNode.name(), localNodeId);

when(placementDriver.getPrimaryReplica(any(), any()))
.thenReturn(completedFuture(replicaMeta));
lenient().when(replicaService.invoke(any(InternalClusterNode.class), requestCaptor.capture()))
.thenReturn(completedFuture(null));

vacuumizer = new PersistentTxStateVacuumizer(
replicaService,
localNode,
new TestClockService(new HybridClockImpl()),
placementDriver
);
}

@Test
void smallBatchSentAsSingleRequest() {
int count = 10;
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);

PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();

assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
assertEquals(count, result.txnsToVacuum.size());
assertEquals(1, requestCaptor.getAllValues().size());
assertEquals(count, requestCaptor.getValue().transactionIds().size());
}

@Test
void largeBatchIsSplitIntoMultipleRequests() {
int count = VACUUM_BATCH_SIZE * 3 + 1;
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);

PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();

assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
assertEquals(count, result.txnsToVacuum.size());

// Should be split into 4 requests.
assertEquals(4, requestCaptor.getAllValues().size());

// Each request should have at most VACUUM_BATCH_SIZE tx IDs.
assertThat(
requestCaptor.getAllValues().stream().map(r -> r.transactionIds().size()).toList(),
everyItem(lessThanOrEqualTo(VACUUM_BATCH_SIZE))
);

// All tx IDs should be covered.
Set<UUID> allSentIds = new HashSet<>();
for (VacuumTxStateReplicaRequest req : requestCaptor.getAllValues()) {
allSentIds.addAll(req.transactionIds());
}
assertEquals(count, allSentIds.size());
}

@Test
void partialBatchFailureDoesNotAffectOtherBatches() {
int count = VACUUM_BATCH_SIZE * 2;
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);

// First invocation succeeds, second fails.
when(replicaService.invoke(any(InternalClusterNode.class), any(VacuumTxStateReplicaRequest.class)))
.thenReturn(completedFuture(null))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("test failure")));

PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();

// Only the first batch's tx IDs should be in the successful set.
assertEquals(VACUUM_BATCH_SIZE, result.vacuumizedPersistentTxnStatesCount);
assertEquals(VACUUM_BATCH_SIZE, result.txnsToVacuum.size());
}

@Test
void txsWithoutCleanupTimestampAreSuccessfulWithoutRequest() {
ZonePartitionId partitionId = new ZonePartitionId(1, 0);
Set<VacuumizableTx> txs = new HashSet<>();

// Tx without cleanup timestamp — should go directly to successful.
UUID txWithoutCleanup = UUID.randomUUID();
txs.add(new VacuumizableTx(txWithoutCleanup, null));

Map<ZonePartitionId, Set<VacuumizableTx>> txIds = Map.of(partitionId, txs);

PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();

assertTrue(result.txnsToVacuum.contains(txWithoutCleanup));
assertEquals(0, result.vacuumizedPersistentTxnStatesCount);
// No requests should be sent since the only tx has no cleanup timestamp.
assertEquals(0, requestCaptor.getAllValues().size());
}

private static Map<ZonePartitionId, Set<VacuumizableTx>> createTxIds(int count) {
ZonePartitionId partitionId = new ZonePartitionId(1, 0);
Set<VacuumizableTx> txs = new HashSet<>();

for (int i = 0; i < count; i++) {
txs.add(new VacuumizableTx(UUID.randomUUID(), 1L));
}

return Map.of(partitionId, txs);
}
}