Skip to content

Commit

Permalink
Merge pull request #16190 from mdogan/migration-finalization-race-x
Browse files Browse the repository at this point in the history
Prevent migration operations running before previous finalization completes v2
  • Loading branch information
mdogan committed Dec 9, 2019
2 parents 8b1aae6 + 694921b commit 55a3b60
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -120,6 +121,8 @@ public class MigrationManager {
private final boolean fragmentedMigrationEnabled;
private final long memberHeartbeatTimeoutMillis;
private boolean triggerRepartitioningWhenClusterStateAllowsMigration;
private final Set<MigrationInfo> finalizingMigrationsRegistry
= Collections.newSetFromMap(new ConcurrentHashMap<MigrationInfo, Boolean>());

MigrationManager(Node node, InternalPartitionServiceImpl service, Lock partitionServiceLock) {
this.node = node;
Expand Down Expand Up @@ -217,8 +220,8 @@ void finalizeMigration(MigrationInfo migrationInfo) {

MigrationEndpoint endpoint = source ? MigrationEndpoint.SOURCE : MigrationEndpoint.DESTINATION;
FinalizeMigrationOperation op = new FinalizeMigrationOperation(migrationInfo, endpoint, success);
op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setValidateTarget(false)
.setService(partitionService);
op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setValidateTarget(false).setService(partitionService);
registerFinalizingMigration(migrationInfo);
InternalOperationService operationService = nodeEngine.getOperationService();
if (operationService.isRunAllowed(op)) {
// When migration finalization is triggered by subsequent migrations
Expand Down Expand Up @@ -248,6 +251,23 @@ void finalizeMigration(MigrationInfo migrationInfo) {
}
}

private void registerFinalizingMigration(MigrationInfo migration) {
finalizingMigrationsRegistry.add(migration);
}

public boolean removeFinalizingMigration(MigrationInfo migration) {
return finalizingMigrationsRegistry.remove(migration);
}

public boolean isFinalizingMigrationRegistered(int partitionId) {
for (MigrationInfo migrationInfo : finalizingMigrationsRegistry) {
if (partitionId == migrationInfo.getPartitionId()) {
return true;
}
}
return false;
}

/**
* Sets the active migration if none is set and returns {@code null}, otherwise returns the currently set active migration.
* Acquires the partition service lock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ private void applyCompletedMigrations() {
if (!partitionService.applyCompletedMigrations(completedMigrations, migrationInfo.getMaster())) {
throw new PartitionStateVersionMismatchException(partitionStateVersion, partitionService.getPartitionStateVersion());
}
if (partitionService.getMigrationManager().isFinalizingMigrationRegistered(migrationInfo.getPartitionId())) {
// There's a pending migration finalization operation in the queue.
// This happens when this node was the source of a backup replica migration
// and now it is destination of another replica migration on the same partition.
// Sources of backup migrations are not part of migration transaction
// and they learn the migration only while applying completed migrations.
throw new RetryableHazelcastException("There is a scheduled FinalizeMigrationOperation for the same partition => "
+ migrationInfo);
}
}

/** Verifies that the sent partition state version matches the local version or this node is master. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,19 @@ public void run() {
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
int partitionId = migrationInfo.getPartitionId();

if (!partitionService.getMigrationManager().removeFinalizingMigration(migrationInfo)) {
throw new IllegalStateException("This migration is not registered as finalizing: " + migrationInfo);
}

if (isOldBackupReplicaOwner() && partitionStateManager.isMigrating(partitionId)) {
// On old backup replica, migrating flag is not set during migration.
// Because replica is copied from partition owner to new backup replica.
// Old backup owner is not notified about migration until migration is committed
// and completed migration is published by master.
// If this partition's migrating flag is set, then it means another migration
// is submitted to this member for the same partition and it's already executed.
// This finalization is now obsolete.
getLogger().fine("Cannot execute migration finalization, because this member was previous owner of a backup replica,"
+ " and a new migration operation has already superseded this finalization. "
+ "This operation is now obsolete. -> " + migrationInfo);
return;
throw new IllegalStateException("Another replica migration is started on the same partition before finalizing "
+ migrationInfo);
}

NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
Expand Down Expand Up @@ -185,8 +186,7 @@ private void rollbackDestination() {
+ partitionId);
}
} else {
int replicaOffset = migrationInfo.getDestinationCurrentReplicaIndex() <= 1 ? 1 : migrationInfo
.getDestinationCurrentReplicaIndex();
int replicaOffset = Math.max(migrationInfo.getDestinationCurrentReplicaIndex(), 1);

for (ServiceNamespace namespace : replicaManager.getNamespaces(partitionId)) {
long[] versions = updatePartitionReplicaVersions(replicaManager, partitionId, namespace, replicaOffset - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void afterMigrate() {
InternalPartitionServiceImpl partitionService = getService();
PartitionReplicaManager replicaManager = partitionService.getReplicaManager();
int destinationNewReplicaIndex = migrationInfo.getDestinationNewReplicaIndex();
int replicaOffset = destinationNewReplicaIndex <= 1 ? 1 : destinationNewReplicaIndex;
int replicaOffset = Math.max(destinationNewReplicaIndex, 1);

Map<ServiceNamespace, long[]> namespaceVersions = fragmentMigrationState.getNamespaceVersionMap();
for (Entry<ServiceNamespace, long[]> e : namespaceVersions.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -147,6 +148,7 @@ public void run() {

for (int p = 0; p < partitionCount; p++) {
Integer actual = (Integer) operationService.invokeOnPartition(null, new TestGetOperation(), p).join();
assertNotNull(actual);
assertEquals(value, actual.intValue());
}
}
Expand Down

0 comments on commit 55a3b60

Please sign in to comment.