Skip to content

Commit

Permalink
Merge pull request #14832 from mdogan/migration-finalization-race-fix
Browse files Browse the repository at this point in the history
Prevent migration operations running before previous finalization completes
  • Loading branch information
mdogan committed Apr 4, 2019
2 parents 434facb + 5060616 commit 44e5e2c
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 21 deletions.
Expand Up @@ -60,8 +60,23 @@ public boolean isMigrating() {
return isMigrating;
}

public void setMigrating(boolean isMigrating) {
this.isMigrating = isMigrating;
/**
* Sets migrating flag if it's not set already.
* @return true if migrating flag is updated, false otherwise
*/
public boolean setMigrating() {
if (isMigrating) {
return false;
}
isMigrating = true;
return true;
}

/**
* Resets migrating flag.
*/
public void resetMigrating() {
isMigrating = false;
}

@Override
Expand Down Expand Up @@ -234,7 +249,7 @@ void reset(PartitionReplica localReplica) {
assert localReplica != null;
this.replicas = new PartitionReplica[MAX_REPLICA_COUNT];
this.localReplica = localReplica;
setMigrating(false);
resetMigrating();
}

@Override
Expand Down
Expand Up @@ -313,18 +313,22 @@ PartitionReplica[][] repartition(Set<Member> excludedMembers, Collection<Integer
return newState;
}

public void setMigratingFlag(int partitionId) {
public boolean trySetMigratingFlag(int partitionId) {
if (logger.isFinestEnabled()) {
logger.finest("Setting partition-migrating flag. partitionId=" + partitionId);
}
partitions[partitionId].setMigrating(true);
return partitions[partitionId].setMigrating();
}

public void clearMigratingFlag(int partitionId) {
if (logger.isFinestEnabled()) {
logger.finest("Clearing partition-migrating flag. partitionId=" + partitionId);
}
partitions[partitionId].setMigrating(false);
partitions[partitionId].resetMigrating();
}

public boolean isMigrating(int partitionId) {
return partitions[partitionId].isMigrating();
}

/** Sets the replica members for the {@code partitionId}. */
Expand Down
Expand Up @@ -216,7 +216,10 @@ void setActiveMigration() {
+ ". Current active migration is " + currentActiveMigration);
}
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
partitionStateManager.setMigratingFlag(migrationInfo.getPartitionId());
if (!partitionStateManager.trySetMigratingFlag(migrationInfo.getPartitionId())) {
throw new RetryableHazelcastException("Cannot set migrating flag, "
+ "probably previous migration's finalization is not completed yet.");
}
}

void onMigrationStart() {
Expand Down
Expand Up @@ -54,7 +54,10 @@ public void beforeRun() throws Exception {

InternalPartitionServiceImpl service = getService();
PartitionStateManager partitionStateManager = service.getPartitionStateManager();
partitionStateManager.setMigratingFlag(getPartitionId());
if (!partitionStateManager.trySetMigratingFlag(getPartitionId())) {
throw new IllegalStateException("Cannot set migrating flag, "
+ "probably previous migration's finalization is not completed yet.");
}
}

@Override
Expand Down
Expand Up @@ -68,8 +68,25 @@ public FinalizeMigrationOperation(MigrationInfo migrationInfo, MigrationEndpoint

@Override
public void run() {
NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
InternalPartitionServiceImpl partitionService = getService();
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
int partitionId = migrationInfo.getPartitionId();

if (isOldBackupReplicaOwner() && partitionStateManager.isMigrating(partitionId)) {
// On old backup replica, migrating flag is not set during migration.
// Because replica is copied from partition owner to new backup replica.
// Old backup owner is not notified about migration until migration is committed
// and completed migration is published by master.
// If this partition's migrating flag is set, then it means another migration
// is submitted to this member for the same partition and it's already executed.
// This finalization is now obsolete.
getLogger().fine("Cannot execute migration finalization, because this member was previous owner of a backup replica,"
+ " and a new migration operation has already superseded this finalization. "
+ "This operation is now obsolete. -> " + migrationInfo);
return;
}

NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
notifyServices(nodeEngine);

if (endpoint == MigrationEndpoint.SOURCE && success) {
Expand All @@ -78,10 +95,7 @@ public void run() {
rollbackDestination();
}

InternalPartitionServiceImpl partitionService = getService();
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
partitionStateManager.clearMigratingFlag(migrationInfo.getPartitionId());

partitionStateManager.clearMigratingFlag(partitionId);
if (success) {
nodeEngine.onPartitionMigrate(migrationInfo);
}
Expand All @@ -100,9 +114,7 @@ private void notifyServices(NodeEngineImpl nodeEngine) {
// Old backup owner is not notified about migration until migration
// is committed on destination. This is the only place on backup owner
// knows replica is moved away from itself.
PartitionReplica source = migrationInfo.getSource();
if (source != null && migrationInfo.getSourceCurrentReplicaIndex() > 0
&& source.isIdentical(nodeEngine.getLocalMember())) {
if (isOldBackupReplicaOwner()) {
// execute beforeMigration on old backup before commit/rollback
for (MigrationAwareService service : migrationAwareServices) {
beforeMigration(event, service);
Expand Down Expand Up @@ -217,6 +229,12 @@ private void finishMigration(PartitionMigrationEvent event, MigrationAwareServic
}
}

private boolean isOldBackupReplicaOwner() {
PartitionReplica source = migrationInfo.getSource();
return source != null && migrationInfo.getSourceCurrentReplicaIndex() > 0
&& source.isIdentical(getNodeEngine().getLocalMember());
}

@Override
public boolean returnsResponse() {
return false;
Expand Down
Expand Up @@ -328,7 +328,7 @@ public void testLockLeaseTime_lockIsReleasedEventuallyWhenPartitionIsMigrating()
final int leaseTime = 1000;

lock.lock(leaseTime, TimeUnit.MILLISECONDS);
partition.setMigrating(true);
partition.setMigrating();

spawn(new Runnable() {
@Override
Expand All @@ -338,7 +338,7 @@ public void run() {
} catch (InterruptedException e) {
e.printStackTrace();
}
partition.setMigrating(false);
partition.resetMigrating();
}
});

Expand Down
Expand Up @@ -77,7 +77,7 @@ private InternalCompletableFuture<Boolean> invokeOperation(final Config config)
final int partitionId = 0;
final InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl) getPartitionService(instance);
final InternalPartitionImpl partition = (InternalPartitionImpl) partitionService.getPartition(partitionId);
partition.setMigrating(true);
partition.setMigrating();

final InternalOperationService operationService = getOperationService(instance);
final InvocationBuilder invocationBuilder = operationService
Expand Down
Expand Up @@ -89,7 +89,7 @@ public void executeLocallyRetriesWhenPartitionIsMigrating() throws InterruptedEx
final InternalPartitionService partitionService = nodeEngineImpl.getPartitionService();
final int randomPartitionId = (int) (Math.random() * partitionService.getPartitionCount());
final InternalPartitionImpl partition = (InternalPartitionImpl) partitionService.getPartition(randomPartitionId);
partition.setMigrating(true);
partition.setMigrating();

final String operationResponse = "operationResponse";
final Operation operation = new LocalOperation(operationResponse)
Expand All @@ -104,7 +104,7 @@ public void run() {
} catch (InterruptedException e) {

}
partition.setMigrating(false);
partition.resetMigrating();
}
});

Expand Down

0 comments on commit 44e5e2c

Please sign in to comment.