-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-20430 Got rid of unused set and fixed replica waiters removal #2604
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
5778a78
removed unused set
denis-chudov 7b5338a
Merge branch 'main' into ignite-20430-1
denis-chudov 33cc9ed
added test
denis-chudov c65b01c
test fix attempt
denis-chudov 92b2bc7
Merge branch 'main' into ignite-20430-1
denis-chudov b2e27e7
fix after review
denis-chudov b14f663
amended javadoc
denis-chudov File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,18 +35,16 @@ | |
import java.nio.ByteBuffer; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import org.apache.ignite.internal.event.AbstractEventProducer; | ||
import org.apache.ignite.internal.hlc.HybridTimestamp; | ||
import org.apache.ignite.internal.lang.IgniteStringFormatter; | ||
import org.apache.ignite.internal.logger.IgniteLogger; | ||
import org.apache.ignite.internal.logger.Loggers; | ||
import org.apache.ignite.internal.metastorage.Entry; | ||
|
@@ -170,13 +168,10 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) { | |
|
||
LeaseBatch leaseBatch = LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN)); | ||
|
||
Set<ReplicationGroupId> actualGroups = new HashSet<>(); | ||
|
||
Map<ReplicationGroupId, Lease> previousLeasesMap = leases.leaseByGroupId(); | ||
|
||
for (Lease lease : leaseBatch.leases()) { | ||
ReplicationGroupId grpId = lease.replicationGroupId(); | ||
actualGroups.add(grpId); | ||
|
||
leasesMap.put(grpId, lease); | ||
|
||
|
@@ -190,15 +185,14 @@ public CompletableFuture<Void> onUpdate(WatchEvent event) { | |
} | ||
} | ||
|
||
firePrimaryReplicaExpiredEventIfNeed(event.revision(), lease); | ||
firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), lease); | ||
} | ||
|
||
for (Iterator<Map.Entry<ReplicationGroupId, Lease>> iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) { | ||
Map.Entry<ReplicationGroupId, Lease> e = iterator.next(); | ||
for (ReplicationGroupId grpId : leases.leaseByGroupId().keySet()) { | ||
if (!leasesMap.containsKey(grpId)) { | ||
tryRemoveTracker(grpId); | ||
|
||
if (!actualGroups.contains(e.getKey())) { | ||
iterator.remove(); | ||
tryRemoveTracker(e.getKey()); | ||
firePrimaryReplicaExpiredEventIfNeeded(grpId, event.revision(), null); | ||
} | ||
} | ||
|
||
|
@@ -310,20 +304,27 @@ private void loadLeasesBusyAsync(long recoveryRevision) { | |
/** | ||
* Fires the primary replica expire event if it needs. | ||
* | ||
* @param grpId Group id, used for the cases when the {@code lease} parameter is null. Should be always not null. | ||
* @param causalityToken Causality token. | ||
* @param lease Lease to check on expiration. | ||
*/ | ||
private void firePrimaryReplicaExpiredEventIfNeed(long causalityToken, Lease lease) { | ||
ReplicationGroupId grpId = lease.replicationGroupId(); | ||
private void firePrimaryReplicaExpiredEventIfNeeded(ReplicationGroupId grpId, long causalityToken, @Nullable Lease lease) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add grpId in parameters declaration in the method javadoc. |
||
assert lease == null || lease.replicationGroupId().equals(grpId) | ||
: IgniteStringFormatter.format("Group id mismatch [groupId={}, lease={}]", grpId, lease); | ||
|
||
Lease currentLease = leases.leaseByGroupId().get(grpId); | ||
|
||
if (currentLease != null && currentLease.isAccepted() && !currentLease.getStartTime().equals(lease.getStartTime())) { | ||
CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, fireEvent( | ||
PRIMARY_REPLICA_EXPIRED, | ||
new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) | ||
)); | ||
if (currentLease != null && currentLease.isAccepted()) { | ||
boolean sameLease = lease != null && currentLease.getStartTime().equals(lease.getStartTime()); | ||
|
||
assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; | ||
if (!sameLease) { | ||
CompletableFuture<Void> prev = expirationFutureByGroup.put(grpId, fireEvent( | ||
PRIMARY_REPLICA_EXPIRED, | ||
new PrimaryReplicaEventParameters(causalityToken, grpId, currentLease.getLeaseholder()) | ||
)); | ||
|
||
assert prev == null || prev.isDone() : "Previous lease expiration process has not completed yet [grpId=" + grpId + ']'; | ||
} | ||
} | ||
} | ||
|
||
|
102 changes: 102 additions & 0 deletions
102
...ent-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.ignite.internal.placementdriver; | ||
|
||
import static java.util.concurrent.CompletableFuture.completedFuture; | ||
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertNull; | ||
import static org.mockito.ArgumentMatchers.any; | ||
import static org.mockito.ArgumentMatchers.anyLong; | ||
import static org.mockito.Mockito.doAnswer; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import org.apache.ignite.internal.hlc.HybridTimestamp; | ||
import org.apache.ignite.internal.metastorage.Entry; | ||
import org.apache.ignite.internal.metastorage.EntryEvent; | ||
import org.apache.ignite.internal.metastorage.MetaStorageManager; | ||
import org.apache.ignite.internal.metastorage.WatchEvent; | ||
import org.apache.ignite.internal.metastorage.WatchListener; | ||
import org.apache.ignite.internal.metastorage.impl.EntryImpl; | ||
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent; | ||
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters; | ||
import org.apache.ignite.internal.placementdriver.leases.Lease; | ||
import org.apache.ignite.internal.placementdriver.leases.LeaseBatch; | ||
import org.apache.ignite.internal.placementdriver.leases.LeaseTracker; | ||
import org.apache.ignite.internal.replicator.TablePartitionId; | ||
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; | ||
import org.junit.jupiter.api.Test; | ||
|
||
/** | ||
* Tests for lease tracker. | ||
*/ | ||
public class LeaseTrackerTest extends BaseIgniteAbstractTest { | ||
@Test | ||
public void testLeaseCleanup() { | ||
AtomicReference<WatchListener> listenerRef = new AtomicReference<>(); | ||
MetaStorageManager msManager = mock(MetaStorageManager.class); | ||
|
||
doAnswer( | ||
invocation -> { | ||
WatchListener lsnr = invocation.getArgument(1); | ||
listenerRef.set(lsnr); | ||
return null; | ||
} | ||
).when(msManager).registerPrefixWatch(any(), any()); | ||
|
||
Entry emptyEntry = EntryImpl.empty(PLACEMENTDRIVER_LEASES_KEY.bytes()); | ||
|
||
when(msManager.getLocally(any(), anyLong())).thenAnswer(invocation -> emptyEntry); | ||
|
||
LeaseTracker leaseTracker = new LeaseTracker(msManager); | ||
leaseTracker.startTrack(0L); | ||
|
||
AtomicReference<PrimaryReplicaEventParameters> parametersRef = new AtomicReference<>(); | ||
leaseTracker.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, (p, e) -> { | ||
parametersRef.set(p); | ||
return completedFuture(false); | ||
}); | ||
|
||
TablePartitionId partId0 = new TablePartitionId(0, 0); | ||
Lease lease0 = new Lease("notAccepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId0); | ||
TablePartitionId partId1 = new TablePartitionId(0, 1); | ||
Lease lease1 = new Lease("accepted", new HybridTimestamp(1, 0), new HybridTimestamp(1000, 0), partId1) | ||
.acceptLease(new HybridTimestamp(2000, 0)); | ||
|
||
// In entry0, there are leases for partition ids partId0 and partId1. In entry1, there is only partId0, so partId1 is expired. | ||
Entry entry0 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0, lease1)).bytes(), 0, 0); | ||
Entry entry1 = new EntryImpl(PLACEMENTDRIVER_LEASES_KEY.bytes(), new LeaseBatch(List.of(lease0)).bytes(), 0, 1); | ||
listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry0))); | ||
|
||
assertNull(parametersRef.get()); | ||
|
||
// Check that the absence of accepted lease triggers the event. | ||
listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, entry1))); | ||
assertNotNull(parametersRef.get()); | ||
assertEquals(partId1, parametersRef.get().groupId()); | ||
|
||
// Check that the absence of not accepted lease doesn't trigger the event. | ||
parametersRef.set(null); | ||
listenerRef.get().onUpdate(new WatchEvent(new EntryEvent(emptyEntry, emptyEntry))); | ||
assertNull(parametersRef.get()); | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to test this so there are fewer errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a test.