From 5778a7813e16d7727abbc1a585f046e8dde6afcf Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Tue, 19 Sep 2023 13:32:19 +0300 Subject: [PATCH 1/5] removed unused set --- .../placementdriver/leases/LeaseTracker.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index 1d1350c4696..476a41a452c 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -31,10 +31,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -169,11 +166,8 @@ public CompletableFuture onUpdate(WatchEvent event) { LeaseBatch leaseBatch = LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN)); - Set actualGroups = new HashSet<>(); - for (Lease lease : leaseBatch.leases()) { ReplicationGroupId grpId = lease.replicationGroupId(); - actualGroups.add(grpId); leasesMap.put(grpId, lease); @@ -184,12 +178,9 @@ public CompletableFuture onUpdate(WatchEvent event) { } } - for (Iterator> iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) { - Map.Entry e = iterator.next(); - - if (!actualGroups.contains(e.getKey())) { - iterator.remove(); - tryRemoveTracker(e.getKey()); + for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { + if (!leasesMap.containsKey(grpId)) { + tryRemoveTracker(grpId); } } From 33cc9ed4162ff549361035fe2e7d88aa9b4153f2 Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Tue, 17 Oct 2023 22:00:34 +0300 Subject: [PATCH 2/5] added test --- modules/placement-driver/build.gradle | 1 + .../placementdriver/leases/LeaseTracker.java | 22 +++-- .../placementdriver/LeaseTrackerTest.java | 95 +++++++++++++++++++ 3 files changed, 110 insertions(+), 8 deletions(-) create mode 100644 modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java diff --git a/modules/placement-driver/build.gradle b/modules/placement-driver/build.gradle index 4de8e53528a..59bed9bfdc5 100644 --- a/modules/placement-driver/build.gradle +++ b/modules/placement-driver/build.gradle @@ -67,6 +67,7 @@ dependencies { testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation(testFixtures(project(':ignite-vault'))) testImplementation libs.hamcrest.core + testImplementation libs.mockito.core } description = 'ignite-placement-driver' diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index 72a6782d82f..56252530a43 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -184,12 +184,14 @@ public CompletableFuture onUpdate(WatchEvent event) { } } - firePrimaryReplicaExpiredEventIfNeed(event.revision(), lease); + firePrimaryReplicaExpiredEventIfNeeded(event.revision(), lease); } for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { if (!leasesMap.containsKey(grpId)) { tryRemoveTracker(grpId); + + firePrimaryReplicaExpiredEvent(event.revision(), grpId, leases.leaseByGroupId().get(grpId)); } } @@ -304,20 +306,24 @@ private void loadLeasesBusyAsync(long recoveryRevision) { * @param causalityToken Causality token. * @param lease Lease to check on expiration. */ - private void firePrimaryReplicaExpiredEventIfNeed(long causalityToken, Lease lease) { + private void firePrimaryReplicaExpiredEventIfNeeded(long causalityToken, Lease lease) { ReplicationGroupId grpId = lease.replicationGroupId(); Lease currentLease = leases.leaseByGroupId().get(grpId); if (currentLease != null && currentLease.isAccepted() && !currentLease.getStartTime().equals(lease.getStartTime())) { - CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( - PRIMARY_REPLICA_EXPIRED, - new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) - )); - - assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + firePrimaryReplicaExpiredEvent(causalityToken, grpId, currentLease); } } + private void firePrimaryReplicaExpiredEvent(long causalityToken, ReplicationGroupId grpId, Lease currentLease) { + CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( + PRIMARY_REPLICA_EXPIRED, + new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) + )); + + assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + } + private CompletableFuture fireEventReplicaBecomePrimary(long causalityToken, Lease lease) { String leaseholder = lease.getLeaseholder(); diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java new file mode 100644 index 00000000000..9d97f195e9c --- /dev/null +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.EntryEvent; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.impl.EntryImpl; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; +import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; +import org.apache.ignite.internal.placementdriver.leases.Lease; +import org.apache.ignite.internal.placementdriver.leases.LeaseBatch; +import org.apache.ignite.internal.placementdriver.leases.LeaseTracker; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.junit.jupiter.api.Test; + +/** + * Tests for lease tracker. + */ +public class LeaseTrackerTest { + @Test + public void testLeaseCleanup() { + AtomicReference listenerRef = new AtomicReference<>(); + MetaStorageManager msManager = mock(MetaStorageManager.class); + + doAnswer( + invocation -> { + WatchListener lsnr = invocation.getArgument(1); + listenerRef.set(lsnr); + return null; + } + ).when(msManager).registerPrefixWatch(any(), any()); + + Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes()); + + when(msManager.getLocally(any(), anyLong())).thenAnswer(invocation -> emptyEntry); + + LeaseTracker leaseTracker = new LeaseTracker(msManager); + leaseTracker.startTrack(0L); + + AtomicReference parametersRef = new AtomicReference<>(); + leaseTracker.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (p, e) -> { + parametersRef.set(p); + return completedFuture(false); + }); + + TablePartitionId partId0 = new TablePartitionId(0, 0); + Lease lease0 = new Lease("a", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId0); + TablePartitionId partId1 = new TablePartitionId(0, 1); + Lease lease1 = new Lease("b", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId1); + + // In entry0, there are leases for partition ids partId0 and partId1. In entry1, there is only partId0, so partId1 is expired. + Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0); + Entry entry1 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0)).bytes(), 0, 1); + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry0))); + + assertNull(parametersRef.get()); + + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry1))); + + assertNotNull(parametersRef.get()); + assertEquals(partId1, parametersRef.get().groupId()); + } +} From c65b01c7d64f814da3e4113b4e61241bc933afaa Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Wed, 18 Oct 2023 11:49:06 +0300 Subject: [PATCH 3/5] test fix attempt --- .../ignite/internal/placementdriver/LeaseTrackerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java index 9d97f195e9c..bcf3adead49 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java @@ -43,12 +43,13 @@ import org.apache.ignite.internal.placementdriver.leases.LeaseBatch; import org.apache.ignite.internal.placementdriver.leases.LeaseTracker; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.junit.jupiter.api.Test; /** * Tests for lease tracker. */ -public class LeaseTrackerTest { +public class LeaseTrackerTest extends BaseIgniteAbstractTest { @Test public void testLeaseCleanup() { AtomicReference listenerRef = new AtomicReference<>(); From b2e27e7f14d6336fd95d26133d6765a84f05d80a Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Wed, 18 Oct 2023 17:22:21 +0300 Subject: [PATCH 4/5] fix after review --- .../placementdriver/leases/LeaseTracker.java | 31 ++++++++++--------- .../placementdriver/LeaseTrackerTest.java | 12 +++++-- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index 56252530a43..cb791bc5c66 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.metastorage.Entry; @@ -184,14 +185,14 @@ public CompletableFuture onUpdate(WatchEvent event) { } } - firePrimaryReplicaExpiredEventIfNeeded(event.revision(), lease); + firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), lease); } for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { if (!leasesMap.containsKey(grpId)) { tryRemoveTracker(grpId); - firePrimaryReplicaExpiredEvent(event.revision(), grpId, leases.leaseByGroupId().get(grpId)); + firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), null); } } @@ -306,22 +307,24 @@ private void loadLeasesBusyAsync(long recoveryRevision) { * @param causalityToken Causality token. * @param lease Lease to check on expiration. */ - private void firePrimaryReplicaExpiredEventIfNeeded(long causalityToken, Lease lease) { - ReplicationGroupId grpId = lease.replicationGroupId(); + private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId grpId, long causalityToken, @Nullable Lease lease) { + assert lease == null || lease.replicationGroupId().equals(grpId) + : IgniteStringFormatter.format("Group id mismatch [groupId={}, lease={}]", grpId, lease); + Lease currentLease = leases.leaseByGroupId().get(grpId); - if (currentLease != null && currentLease.isAccepted() && !currentLease.getStartTime().equals(lease.getStartTime())) { - firePrimaryReplicaExpiredEvent(causalityToken, grpId, currentLease); - } - } + if (currentLease != null && currentLease.isAccepted()) { + boolean sameLease = lease != null && currentLease.getStartTime().equals(lease.getStartTime()); - private void firePrimaryReplicaExpiredEvent(long causalityToken, ReplicationGroupId grpId, Lease currentLease) { - CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( - PRIMARY_REPLICA_EXPIRED, - new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) - )); + if (!sameLease) { + CompletableFuture prev = expirationFutureByGroup.put(grpId, fireEvent( + PRIMARY_REPLICA_EXPIRED, + new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) + )); - assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; + } + } } private CompletableFuture fireEventReplicaBecomePrimary(long causalityToken, Lease lease) { diff --git a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java index bcf3adead49..7974b386f98 100644 --- a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java +++ b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java @@ -77,9 +77,10 @@ public void testLeaseCleanup() { }); TablePartitionId partId0 = new TablePartitionId(0, 0); - Lease lease0 = new Lease("a", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId0); + Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId0); TablePartitionId partId1 = new TablePartitionId(0, 1); - Lease lease1 = new Lease("b", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId1); + Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId1) + .acceptLease(new HybridTimestamp(2000, 0)); // In entry0, there are leases for partition ids partId0 and partId1. In entry1, there is only partId0, so partId1 is expired. Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0); @@ -88,9 +89,14 @@ public void testLeaseCleanup() { assertNull(parametersRef.get()); + // Check that the absence of accepted lease triggers the event. listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry1))); - assertNotNull(parametersRef.get()); assertEquals(partId1, parametersRef.get().groupId()); + + // Check that the absence of not accepted lease doesn't trigger the event. + parametersRef.set(null); + listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, emptyEntry))); + assertNull(parametersRef.get()); } } From b14f663bcac666dcf80dcb2ca9bd5d5e8baf65bc Mon Sep 17 00:00:00 2001 From: denis-chudov Date: Wed, 18 Oct 2023 20:46:47 +0300 Subject: [PATCH 5/5] amended javadoc --- .../ignite/internal/placementdriver/leases/LeaseTracker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java index cb791bc5c66..0e045e2f58d 100644 --- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java +++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java @@ -304,6 +304,7 @@ private void loadLeasesBusyAsync(long recoveryRevision) { /** * Fires the primary replica expire event if it needs. * + * @param grpId Group id, used for the cases when the {@code lease} parameter is null. Should be always not null. * @param causalityToken Causality token. * @param lease Lease to check on expiration. */