diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index c05c3271ee3..0e045e2f58d 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -35,11 +35,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -47,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -170,13 +168,10 @@ public CompletableFuture onUpdate(WatchEvent event) { LeaseBatch leaseBatch = LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN)); - Set actualGroups = new HashSet<>(); - Map previousLeasesMap = leases.leaseByGroupId(); for (Lease lease : leaseBatch.leases()) { ReplicationGroupId grpId = lease.replicationGroupId(); - actualGroups.add(grpId); leasesMap.put(grpId, lease); @@ -190,15 +185,14 @@ public CompletableFuture onUpdate(WatchEvent event) { } } - firePrimaryReplicaExpiredEventIfNeed(event.revision(), lease); + firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), lease); } - for (Iterator> iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) { - Map.Entry e = iterator.next(); + for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { + if (!leasesMap.containsKey(grpId)) { + tryRemoveTracker(grpId); - if (!actualGroups.contains(e.getKey())) { - iterator.remove(); - tryRemoveTracker(e.getKey()); + firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), null); } } @@ -310,20 +304,27 @@ private void loadLeasesBusyAsync(long recoveryRevision) { /** * Fires the primary replica expire event if it needs. * + * @param grpId Group id, used for the cases when the {@code lease} parameter is null. Should be always not null. * @param causalityToken Causality token. * @param lease Lease to check on expiration. */ - private void firePrimaryReplicaExpiredEventIfNeed(long causalityToken, Lease lease) { - ReplicationGroupId grpId = lease.replicationGroupId(); + private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId grpId, long causalityToken, @Nullable Lease lease) { + assert lease == null || lease.replicationGroupId().equals(grpId) + : IgniteStringFormatter.format("Group id mismatch [groupId={}, lease={}]", grpId, lease); + Lease currentLease = leases.leaseByGroupId().get(grpId); - if (currentLease != null && currentLease.isAccepted() && !currentLease.getStartTime().equals(lease.getStartTime())) { - CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( - PRIMARY_REPLICA_EXPIRED, - new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) - )); + if (currentLease != null && currentLease.isAccepted()) { + boolean sameLease = lease != null && currentLease.getStartTime().equals(lease.getStartTime()); - assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + if (!sameLease) { + CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( + PRIMARY_REPLICA_EXPIRED, + new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) + )); + + assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + } } } diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java new file mode 100644 index 00000000000..7974b386f98 --- /dev/null +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.EntryEvent; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.impl.EntryImpl; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; +import org.apache.ignite.internal.placementdriver.leases.Lease; +import org.apache.ignite.internal.placementdriver.leases.LeaseBatch; +import org.apache.ignite.internal.placementdriver.leases.LeaseTracker; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +/** + * Tests for lease tracker. + */ +public class LeaseTrackerTest extends BaseIgniteAbstractTest { + @Test + public void testLeaseCleanup() { + AtomicReference listenerRef = new AtomicReference<>(); + MetaStorageManager msManager = mock(MetaStorageManager.class); + + doAnswer( + invocation -> { + WatchListener lsnr = invocation.getArgument(1); + listenerRef.set(lsnr); + return null; + } + ).when(msManager).registerPrefixWatch(any(), any()); + + Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes()); + + when(msManager.getLocally(any(), anyLong())).thenAnswer(invocation -> emptyEntry); + + LeaseTracker leaseTracker = new LeaseTracker(msManager); + leaseTracker.startTrack(0L); + + AtomicReference parametersRef = new AtomicReference<>(); + leaseTracker.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (p, e) -> { + parametersRef.set(p); + return completedFuture(false); + }); + + TablePartitionId partId0 = new TablePartitionId(0, 0); + Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId0); + TablePartitionId partId1 = new TablePartitionId(0, 1); + Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId1) + .acceptLease(new HybridTimestamp(2000, 0)); + + // In entry0, there are leases for partition ids partId0 and partId1. In entry1, there is only partId0, so partId1 is expired. + Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0); + Entry entry1 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0)).bytes(), 0, 1); + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry0))); + + assertNull(parametersRef.get()); + + // Check that the absence of accepted lease triggers the event. + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry1))); + assertNotNull(parametersRef.get()); + assertEquals(partId1, parametersRef.get().groupId()); + + // Check that the absence of not accepted lease doesn't trigger the event. + parametersRef.set(null); + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, emptyEntry))); + assertNull(parametersRef.get()); + } +}