Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
429431e
wip
denis-chudov Mar 28, 2024
7befdb0
merge main
denis-chudov Apr 3, 2024
0aae08c
wip
denis-chudov Apr 4, 2024
ac3ce06
wip
denis-chudov Apr 5, 2024
34a37c6
wip poc
denis-chudov Apr 11, 2024
da4c1fe
added tests
denis-chudov Apr 15, 2024
0c81f05
Merge branch 'main' into ignite-21763
denis-chudov Apr 15, 2024
2fbaefc
wip
denis-chudov Apr 15, 2024
d75bbf0
fixed comments, fixed flaky tests
denis-chudov Apr 17, 2024
077745e
Merge branch 'main' into ignite-21763
denis-chudov Apr 17, 2024
0ed34a0
code style, pmd
denis-chudov Apr 17, 2024
d96eb70
checkstyle
denis-chudov Apr 17, 2024
29869a9
some refactoring
denis-chudov Apr 18, 2024
83f764b
added retries to NodeUtils#transferPrimary
denis-chudov Apr 18, 2024
a1d7e20
increased partitions count for test
denis-chudov Apr 18, 2024
91a6923
checkstyle fix
denis-chudov Apr 18, 2024
4ad2530
Merge branch 'main' into ignite-21763
denis-chudov Apr 22, 2024
a5c7e88
fixes after review
denis-chudov Apr 24, 2024
2257d39
Merge branch 'ignite-21763' of https://github.com/gridgain/apache-ign…
denis-chudov Apr 24, 2024
2f932de
more fixes, comments and javadocs
denis-chudov Apr 25, 2024
f57a011
fixed compilation
denis-chudov Apr 25, 2024
5c01692
code style
denis-chudov Apr 25, 2024
76a54f4
Merge branch 'main' into ignite-21763
denis-chudov Apr 25, 2024
5107b91
code style
denis-chudov Apr 25, 2024
517092e
code style*
denis-chudov Apr 25, 2024
e4597e4
code style**
denis-chudov Apr 25, 2024
012eacb
some more fixes
denis-chudov Apr 26, 2024
8110ef6
Merge branch 'main' into ignite-21763
denis-chudov Apr 26, 2024
a4edea2
removed logs
denis-chudov Apr 27, 2024
66c892a
tests fix
denis-chudov Apr 27, 2024
618276d
turned off time-based vacuum for some tests to avoid races
denis-chudov Apr 27, 2024
e0c985f
*
denis-chudov Apr 27, 2024
99d0913
reworked dependencies
denis-chudov Apr 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public CompletableFuture<Void> cleanup(String node, UUID txId) {
}

@Override
public void vacuum() {
// No-op.
public CompletableFuture<Void> vacuum() {
return nullCompletedFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void testPrimaryChangeSubscription() throws Exception {
return falseCompletedFuture();
});

NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);

assertTrue(waitForCondition(primaryChanged::get, 10_000));
}
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testPrimaryChangeLongHandling() throws Exception {

Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());

NodeUtils.transferPrimary(nodes, tblReplicationGrp, null);
NodeUtils.transferPrimary(nodes, tblReplicationGrp);

CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testClearingTransactionResourcesWhenPrimaryChange() throws Exception
assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());

NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);

assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void testLeaseProlong() throws Exception {
}

@Test
public void prolongAfterActiveActorChanger() throws Exception {
public void prolongAfterActiveActorChanged() throws Exception {
var acceptedNodeRef = new AtomicReference<String>();

leaseGrantHandler = (msg, from, to) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.placementdriver;

import static java.util.Objects.hash;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
Expand Down Expand Up @@ -247,10 +248,15 @@ private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, Lease lea
* Finds a node that can be the leaseholder.
*
* @param assignments Replication group assignment.
* @param grpId Group id.
* @param proposedConsistentId Proposed consistent id, found out of a lease negotiation. The parameter might be {@code null}.
* @return Cluster node, or {@code null} if no node in assignments can be the leaseholder.
*/
private @Nullable ClusterNode nextLeaseHolder(Set<Assignment> assignments, @Nullable String proposedConsistentId) {
private @Nullable ClusterNode nextLeaseHolder(
Set<Assignment> assignments,
ReplicationGroupId grpId,
@Nullable String proposedConsistentId
) {
// TODO: IGNITE-18879 Implement more intellectual algorithm to choose a node.
ClusterNode primaryCandidate = null;

Expand All @@ -267,8 +273,15 @@ private CompletableFuture<Boolean> denyLease(ReplicationGroupId grpId, Lease lea
primaryCandidate = candidateNode;

break;
} else if (primaryCandidate == null || primaryCandidate.name().hashCode() > assignment.consistentId().hashCode()) {
} else if (primaryCandidate == null) {
primaryCandidate = candidateNode;
} else {
int candidateHash = hash(primaryCandidate.name(), grpId);
int assignmentHash = hash(assignment.consistentId(), grpId);

if (candidateHash > assignmentHash) {
primaryCandidate = candidateNode;
}
}
}

Expand Down Expand Up @@ -360,7 +373,7 @@ private void updateLeaseBatchInternal() {
continue;
} else if (agreement.isDeclined()) {
// Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
ClusterNode candidate = nextLeaseHolder(assignments, agreement.getRedirectTo());
ClusterNode candidate = nextLeaseHolder(assignments, grpId, agreement.getRedirectTo());

if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
Expand All @@ -385,7 +398,7 @@ private void updateLeaseBatchInternal() {
? lease.getLeaseholder()
: lease.proposedCandidate();

ClusterNode candidate = nextLeaseHolder(assignments, proposedLeaseholder);
ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder);

if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl coordinatorNode

logger().info("Start transferring primary.");

NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
Expand Down Expand Up @@ -287,7 +287,7 @@ private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl primaryNode) {

logger().info("Start transferring primary.");

NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
Expand All @@ -44,21 +45,53 @@ public class NodeUtils {

private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();

/**
* Transfers the primary rights to another node, choosing any node from the cluster except the current leaseholder.
*
* @param nodes Nodes collection.
* @param groupId Group id.
* @return New primary replica name.
* @throws InterruptedException If failed.
*/
public static String transferPrimary(
Collection<IgniteImpl> nodes,
ReplicationGroupId groupId
) throws InterruptedException {
return transferPrimary(nodes, groupId, (Predicate) null);
}

/**
* Transfers the primary rights to another node.
*
* @param nodes Nodes collection.
* @param groupId Group id.
* @param preferablePrimary Primary replica name which is preferred for being primary or {@code null}.
* @param preferablePrimary Primary replica preferable node name.
* @return New primary replica name.
* @throws InterruptedException If failed.
*/
public static String transferPrimary(
Collection<IgniteImpl> nodes,
ReplicationGroupId groupId,
@Nullable String preferablePrimary
String preferablePrimary
) throws InterruptedException {
LOG.info("Moving the primary replica [preferablePrimary=" + preferablePrimary + "].");
return transferPrimary(nodes, groupId, s -> s.equals(preferablePrimary));
}

/**
* Transfers the primary rights to another node.
*
* @param nodes Nodes collection.
* @param groupId Group id.
* @param preferablePrimaryFilter Primary replica preferable nodes filter, accepts the node consistent id.
* @return New primary replica name.
* @throws InterruptedException If failed.
*/
public static String transferPrimary(
Collection<IgniteImpl> nodes,
ReplicationGroupId groupId,
@Nullable Predicate<String> preferablePrimaryFilter
) throws InterruptedException {
LOG.info("Moving the primary replica [groupId={}].", groupId);

IgniteImpl node = nodes.stream().findAny().orElseThrow();

Expand All @@ -68,16 +101,50 @@ public static String transferPrimary(
.filter(n -> n.id().equals(currentLeaseholder.getLeaseholderId()))
.findFirst().orElseThrow();

if (preferablePrimary == null) {
preferablePrimary = nodes.stream()
.map(IgniteImpl::name)
.filter(n -> !n.equals(currentLeaseholder.getLeaseholder()))
.findFirst()
.orElseThrow();
Predicate<String> filter = preferablePrimaryFilter == null ? name -> true : preferablePrimaryFilter::test;

String finalPreferablePrimary = nodes.stream()
.map(IgniteImpl::name)
.filter(n -> !n.equals(currentLeaseholder.getLeaseholder()) && filter.test(n))
.findFirst()
.orElseThrow();

LOG.info("Moving the primary replica [groupId={}, currentLeaseholder={}, preferablePrimary={}].", groupId, leaseholderNode.name(),
finalPreferablePrimary);

ReplicaMeta[] newPrimaryReplica = new ReplicaMeta[1];
boolean[] stopLeaseNeeded = { true };

boolean success = waitForCondition(() -> {
if (stopLeaseNeeded[0]) {
stopLeaseProlongation(nodes, leaseholderNode, groupId, finalPreferablePrimary);
}

ReplicaMeta previousPrimary = newPrimaryReplica[0] == null ? currentLeaseholder : newPrimaryReplica[0];

newPrimaryReplica[0] = leaseholder(node, groupId);

// If the lease is changed to not suitable one, then stopLeaseProlongation will be retried, otherwise the cycle will be stopped.
stopLeaseNeeded[0] =
!previousPrimary.getStartTime().equals(newPrimaryReplica[0].getStartTime()) // if lease changed
|| !previousPrimary.getExpirationTime().equals(newPrimaryReplica[0].getExpirationTime()); // if lease prolonged

return newPrimaryReplica[0].getLeaseholder().equals(finalPreferablePrimary);
}, 30_000);

if (success) {
LOG.info("Primary replica moved successfully from [{}] to [{}].", currentLeaseholder.getLeaseholder(), finalPreferablePrimary);
} else {
LOG.info("Moving the primary replica failed [groupId={}, actualPrimary={}].", groupId, newPrimaryReplica[0]);
}

String finalPreferablePrimary = preferablePrimary;
assertTrue(success);

return finalPreferablePrimary;
}

private static void stopLeaseProlongation(Collection<IgniteImpl> nodes, IgniteImpl leaseholderNode, ReplicationGroupId groupId,
String preferablePrimary) {
StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
.groupId(groupId)
.redirectProposal(preferablePrimary)
Expand All @@ -86,17 +153,6 @@ public static String transferPrimary(
nodes.forEach(
n -> leaseholderNode.clusterService().messagingService().send(n.clusterService().topologyService().localMember(), msg)
);

assertTrue(waitForCondition(() -> {
ReplicaMeta newPrimaryReplica = leaseholder(node, groupId);

return newPrimaryReplica.getLeaseholder().equals(finalPreferablePrimary);
}, 10_000));

LOG.info("Primary replica moved successfully from [{}] to [{}].",
currentLeaseholder.getLeaseholder(), finalPreferablePrimary);

return finalPreferablePrimary;
}

private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId groupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.ignite.internal.table;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -33,8 +33,6 @@
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
Expand Down Expand Up @@ -87,7 +85,8 @@ protected void customizeInitParameters(InitParametersBuilder builder) {

builder.clusterConfiguration("{"
+ " transaction: {"
+ " implicitTransactionTimeout: 30000"
+ " implicitTransactionTimeout: 30000,"
+ " txnResourceTtl: 2"
+ " },"
+ " replication: {"
+ " rpcTimeout: 30000"
Expand Down Expand Up @@ -205,17 +204,4 @@ private IgniteImpl findNode(int startRange, int endRange, Predicate<IgniteImpl>
private IgniteImpl findNodeByName(String leaseholder) {
return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
}

private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node.clock().now(),
10,
SECONDS
);

assertThat(primaryReplicaFut, willCompleteSuccessfully());

return primaryReplicaFut.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -57,7 +58,6 @@
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
Expand Down Expand Up @@ -1149,19 +1149,6 @@ private IgniteImpl nonPrimaryNode(String leaseholder) {
return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
}

private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node.clock().now(),
10,
SECONDS
);

assertThat(primaryReplicaFut, willCompleteSuccessfully());

return primaryReplicaFut.join();
}

private static String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
return waitAndGetPrimaryReplica(node, tblReplicationGrp).getLeaseholder();
}
Expand Down
Loading