Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent migration operations running before previous finalization completes #14832

Merged
merged 1 commit into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,23 @@ public boolean isMigrating() {
return isMigrating;
}

public void setMigrating(boolean isMigrating) {
this.isMigrating = isMigrating;
/**
* Sets migrating flag if it's not set already.
* @return true if migrating flag is updated, false otherwise
*/
public boolean setMigrating() {
if (isMigrating) {
return false;
}
isMigrating = true;
return true;
}

/**
* Resets migrating flag.
*/
public void resetMigrating() {
isMigrating = false;
}

@Override
Expand Down Expand Up @@ -234,7 +249,7 @@ void reset(PartitionReplica localReplica) {
assert localReplica != null;
this.replicas = new PartitionReplica[MAX_REPLICA_COUNT];
this.localReplica = localReplica;
setMigrating(false);
resetMigrating();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,18 +313,22 @@ PartitionReplica[][] repartition(Set<Member> excludedMembers, Collection<Integer
return newState;
}

public void setMigratingFlag(int partitionId) {
public boolean trySetMigratingFlag(int partitionId) {
if (logger.isFinestEnabled()) {
logger.finest("Setting partition-migrating flag. partitionId=" + partitionId);
}
partitions[partitionId].setMigrating(true);
return partitions[partitionId].setMigrating();
}

public void clearMigratingFlag(int partitionId) {
if (logger.isFinestEnabled()) {
logger.finest("Clearing partition-migrating flag. partitionId=" + partitionId);
}
partitions[partitionId].setMigrating(false);
partitions[partitionId].resetMigrating();
}

public boolean isMigrating(int partitionId) {
return partitions[partitionId].isMigrating();
}

/** Sets the replica members for the {@code partitionId}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ void setActiveMigration() {
+ ". Current active migration is " + currentActiveMigration);
}
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
partitionStateManager.setMigratingFlag(migrationInfo.getPartitionId());
if (!partitionStateManager.trySetMigratingFlag(migrationInfo.getPartitionId())) {
throw new RetryableHazelcastException("Cannot set migrating flag, "
+ "probably previous migration's finalization is not completed yet.");
}
}

void onMigrationStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public void beforeRun() throws Exception {

InternalPartitionServiceImpl service = getService();
PartitionStateManager partitionStateManager = service.getPartitionStateManager();
partitionStateManager.setMigratingFlag(getPartitionId());
if (!partitionStateManager.trySetMigratingFlag(getPartitionId())) {
throw new IllegalStateException("Cannot set migrating flag, "
+ "probably previous migration's finalization is not completed yet.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,25 @@ public FinalizeMigrationOperation(MigrationInfo migrationInfo, MigrationEndpoint

@Override
public void run() {
NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
InternalPartitionServiceImpl partitionService = getService();
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
int partitionId = migrationInfo.getPartitionId();

if (isOldBackupReplicaOwner() && partitionStateManager.isMigrating(partitionId)) {
// On old backup replica, migrating flag is not set during migration.
// Because replica is copied from partition owner to new backup replica.
// Old backup owner is not notified about migration until migration is committed
// and completed migration is published by master.
// If this partition's migrating flag is set, then it means another migration
// is submitted to this member for the same partition and it's already executed.
// This finalization is now obsolete.
getLogger().fine("Cannot execute migration finalization, because this member was previous owner of a backup replica,"
+ " and a new migration operation has already superseded this finalization. "
+ "This operation is now obsolete. -> " + migrationInfo);
return;
}

NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
notifyServices(nodeEngine);

if (endpoint == MigrationEndpoint.SOURCE && success) {
Expand All @@ -78,10 +95,7 @@ public void run() {
rollbackDestination();
}

InternalPartitionServiceImpl partitionService = getService();
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
partitionStateManager.clearMigratingFlag(migrationInfo.getPartitionId());

partitionStateManager.clearMigratingFlag(partitionId);
if (success) {
nodeEngine.onPartitionMigrate(migrationInfo);
}
Expand All @@ -100,9 +114,7 @@ private void notifyServices(NodeEngineImpl nodeEngine) {
// Old backup owner is not notified about migration until migration
// is committed on destination. This is the only place on backup owner
// knows replica is moved away from itself.
PartitionReplica source = migrationInfo.getSource();
if (source != null && migrationInfo.getSourceCurrentReplicaIndex() > 0
&& source.isIdentical(nodeEngine.getLocalMember())) {
if (isOldBackupReplicaOwner()) {
// execute beforeMigration on old backup before commit/rollback
for (MigrationAwareService service : migrationAwareServices) {
beforeMigration(event, service);
Expand Down Expand Up @@ -217,6 +229,12 @@ private void finishMigration(PartitionMigrationEvent event, MigrationAwareServic
}
}

private boolean isOldBackupReplicaOwner() {
PartitionReplica source = migrationInfo.getSource();
return source != null && migrationInfo.getSourceCurrentReplicaIndex() > 0
&& source.isIdentical(getNodeEngine().getLocalMember());
}

@Override
public boolean returnsResponse() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void testLockLeaseTime_lockIsReleasedEventuallyWhenPartitionIsMigrating()
final int leaseTime = 1000;

lock.lock(leaseTime, TimeUnit.MILLISECONDS);
partition.setMigrating(true);
partition.setMigrating();

spawn(new Runnable() {
@Override
Expand All @@ -338,7 +338,7 @@ public void run() {
} catch (InterruptedException e) {
e.printStackTrace();
}
partition.setMigrating(false);
partition.resetMigrating();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private InternalCompletableFuture<Boolean> invokeOperation(final Config config)
final int partitionId = 0;
final InternalPartitionServiceImpl partitionService = (InternalPartitionServiceImpl) getPartitionService(instance);
final InternalPartitionImpl partition = (InternalPartitionImpl) partitionService.getPartition(partitionId);
partition.setMigrating(true);
partition.setMigrating();

final InternalOperationService operationService = getOperationService(instance);
final InvocationBuilder invocationBuilder = operationService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void executeLocallyRetriesWhenPartitionIsMigrating() throws InterruptedEx
final InternalPartitionService partitionService = nodeEngineImpl.getPartitionService();
final int randomPartitionId = (int) (Math.random() * partitionService.getPartitionCount());
final InternalPartitionImpl partition = (InternalPartitionImpl) partitionService.getPartition(randomPartitionId);
partition.setMigrating(true);
partition.setMigrating();

final String operationResponse = "operationResponse";
final Operation operation = new LocalOperation(operationResponse)
Expand All @@ -104,7 +104,7 @@ public void run() {
} catch (InterruptedException e) {

}
partition.setMigrating(false);
partition.resetMigrating();
}
});

Expand Down