Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent migration operations running before previous finalization completes v2 #16190

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class MigrationManager {
private final boolean fragmentedMigrationEnabled;
private final long memberHeartbeatTimeoutMillis;
private boolean triggerRepartitioningWhenClusterStateAllowsMigration;
private final Set<MigrationInfo> finalizingMigrationsRegistry
= Collections.newSetFromMap(new ConcurrentHashMap<MigrationInfo, Boolean>());

MigrationManager(Node node, InternalPartitionServiceImpl service, Lock partitionServiceLock) {
this.node = node;
Expand Down Expand Up @@ -217,8 +220,8 @@ void finalizeMigration(MigrationInfo migrationInfo) {

MigrationEndpoint endpoint = source ? MigrationEndpoint.SOURCE : MigrationEndpoint.DESTINATION;
FinalizeMigrationOperation op = new FinalizeMigrationOperation(migrationInfo, endpoint, success);
op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setValidateTarget(false)
.setService(partitionService);
op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setValidateTarget(false).setService(partitionService);
registerFinalizingMigration(migrationInfo);
InternalOperationService operationService = nodeEngine.getOperationService();
if (operationService.isRunAllowed(op)) {
// When migration finalization is triggered by subsequent migrations
Expand Down Expand Up @@ -248,6 +251,23 @@ void finalizeMigration(MigrationInfo migrationInfo) {
}
}

private void registerFinalizingMigration(MigrationInfo migration) {
finalizingMigrationsRegistry.add(migration);
}

public boolean removeFinalizingMigration(MigrationInfo migration) {
return finalizingMigrationsRegistry.remove(migration);
}

public boolean isFinalizingMigrationRegistered(int partitionId) {
for (MigrationInfo migrationInfo : finalizingMigrationsRegistry) {
if (partitionId == migrationInfo.getPartitionId()) {
return true;
}
}
return false;
}

/**
* Sets the active migration if none is set and returns {@code null}, otherwise returns the currently set active migration.
* Acquires the partition service lock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ private void applyCompletedMigrations() {
if (!partitionService.applyCompletedMigrations(completedMigrations, migrationInfo.getMaster())) {
throw new PartitionStateVersionMismatchException(partitionStateVersion, partitionService.getPartitionStateVersion());
}
if (partitionService.getMigrationManager().isFinalizingMigrationRegistered(migrationInfo.getPartitionId())) {
// There's a pending migration finalization operation in the queue.
// This happens when this node was the source of a backup replica migration
// and now it is destination of another replica migration on the same partition.
// Sources of backup migrations are not part of migration transaction
// and they learn the migration only while applying completed migrations.
throw new RetryableHazelcastException("There is a scheduled FinalizeMigrationOperation for the same partition => "
+ migrationInfo);
}
}

/** Verifies that the sent partition state version matches the local version or this node is master. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ public void run() {
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
int partitionId = migrationInfo.getPartitionId();

if (!partitionService.getMigrationManager().removeFinalizingMigration(migrationInfo)) {
throw new IllegalStateException("This migration is not registered as finalizing: " + migrationInfo);
}

if (isOldBackupReplicaOwner() && partitionStateManager.isMigrating(partitionId)) {
// On old backup replica, migrating flag is not set during migration.
// Because replica is copied from partition owner to new backup replica.
// Old backup owner is not notified about migration until migration is committed
// and completed migration is published by master.
// If this partition's migrating flag is set, then it means another migration
// is submitted to this member for the same partition and it's already executed.
// This finalization is now obsolete.
getLogger().fine("Cannot execute migration finalization, because this member was previous owner of a backup replica,"
+ " and a new migration operation has already superseded this finalization. "
+ "This operation is now obsolete. -> " + migrationInfo);
return;
throw new IllegalStateException("Another replica migration is started on the same partition before finalizing "
+ migrationInfo);
}

NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
Expand Down Expand Up @@ -185,8 +186,7 @@ private void rollbackDestination() {
+ partitionId);
}
} else {
int replicaOffset = migrationInfo.getDestinationCurrentReplicaIndex() <= 1 ? 1 : migrationInfo
.getDestinationCurrentReplicaIndex();
int replicaOffset = Math.max(migrationInfo.getDestinationCurrentReplicaIndex(), 1);

for (ServiceNamespace namespace : replicaManager.getNamespaces(partitionId)) {
long[] versions = updatePartitionReplicaVersions(replicaManager, partitionId, namespace, replicaOffset - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void afterMigrate() {
InternalPartitionServiceImpl partitionService = getService();
PartitionReplicaManager replicaManager = partitionService.getReplicaManager();
int destinationNewReplicaIndex = migrationInfo.getDestinationNewReplicaIndex();
int replicaOffset = destinationNewReplicaIndex <= 1 ? 1 : destinationNewReplicaIndex;
int replicaOffset = Math.max(destinationNewReplicaIndex, 1);

Map<ServiceNamespace, long[]> namespaceVersions = fragmentMigrationState.getNamespaceVersionMap();
for (Entry<ServiceNamespace, long[]> e : namespaceVersions.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -147,6 +148,7 @@ public void run() {

for (int p = 0; p < partitionCount; p++) {
Integer actual = (Integer) operationService.invokeOnPartition(null, new TestGetOperation(), p).join();
assertNotNull(actual);
assertEquals(value, actual.intValue());
}
}
Expand Down