Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT] Partitions should remain unavailable until services commit/rollback #9235

Merged
merged 1 commit into from Nov 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -57,6 +57,11 @@ public void run() {
int partitionId = getPartitionId();
int replicaIndex = this.replicaIndex;
InternalPartition partition = partitionService.getPartition(partitionId);
if (partition.isMigrating()) {
notifyCallback(false);
return;
}

Address target = partition.getReplicaAddress(replicaIndex);
if (target == null) {
notifyCallback(false);
Expand Down
Expand Up @@ -201,8 +201,7 @@ private void finalizeMigration(MigrationInfo migrationInfo) {

op.setPartitionId(partitionId).setNodeEngine(nodeEngine).setValidateTarget(false)
.setService(partitionService);
nodeEngine.getOperationService().executeOperation(op);
removeActiveMigration(partitionId);
nodeEngine.getOperationService().execute(op);
} else {
final Address partitionOwner = partitionStateManager.getPartitionImpl(partitionId).getOwnerOrNull();
if (node.getThisAddress().equals(partitionOwner)) {
Expand All @@ -219,17 +218,20 @@ private void finalizeMigration(MigrationInfo migrationInfo) {
}
}

public boolean addActiveMigration(MigrationInfo migrationInfo) {
public MigrationInfo setActiveMigration(MigrationInfo migrationInfo) {
partitionServiceLock.lock();
try {
if (activeMigrationInfo == null) {
partitionStateManager.setMigrating(migrationInfo.getPartitionId(), true);
activeMigrationInfo = migrationInfo;
return true;
return null;
}

logger.warning(migrationInfo + " not added! Already existing active migration: " + activeMigrationInfo);
return false;
if (logger.isFineEnabled()) {
logger.fine("Active migration is not set: " + migrationInfo
+ ". Existing active migration: " + activeMigrationInfo);
}
return activeMigrationInfo;
} finally {
partitionServiceLock.unlock();
}
Expand All @@ -239,7 +241,7 @@ MigrationInfo getActiveMigration() {
return activeMigrationInfo;
}

private boolean removeActiveMigration(int partitionId) {
public boolean removeActiveMigration(int partitionId) {
partitionServiceLock.lock();
try {
if (activeMigrationInfo != null) {
Expand Down
Expand Up @@ -220,6 +220,7 @@ private boolean checkAndTriggerReplicaSync() {
}
}

@SuppressWarnings("checkstyle:npathcomplexity")
private int invokeReplicaSyncOperations(int maxBackupCount, Semaphore semaphore, AtomicBoolean result) {
Address thisAddress = node.getThisAddress();
ExecutionCallback<Object> callback = new ReplicaSyncResponseCallback(result, semaphore);
Expand All @@ -239,8 +240,15 @@ private int invokeReplicaSyncOperations(int maxBackupCount, Semaphore semaphore,
if (!thisAddress.equals(owner)) {
continue;
}

ownedCount++;

if (maxBackupCount == 0) {
if (partition.isMigrating()) {
result.set(false);
}
continue;
}

for (int index = 1; index <= maxBackupCount; index++) {
Address replicaAddress = partition.getReplicaAddress(index);

Expand Down
Expand Up @@ -27,6 +27,7 @@
import com.hazelcast.internal.partition.PartitionStateVersionMismatchException;
import com.hazelcast.internal.partition.impl.InternalMigrationListener;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.MigrationManager;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand All @@ -35,6 +36,7 @@
import com.hazelcast.spi.MigrationAwareService;
import com.hazelcast.spi.PartitionAwareOperation;
import com.hazelcast.spi.PartitionMigrationEvent;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.util.ExceptionUtil;

Expand Down Expand Up @@ -111,6 +113,16 @@ private void verifyClusterState() {
}
}

void setActiveMigration() {
InternalPartitionServiceImpl partitionService = getService();
MigrationManager migrationManager = partitionService.getMigrationManager();
MigrationInfo currentActiveMigration = migrationManager.setActiveMigration(migrationInfo);
if (currentActiveMigration != null) {
throw new RetryableHazelcastException("Cannot set active migration to " + migrationInfo
+ ". Current active migration is " + currentActiveMigration);
}
}

void onMigrationStart() {
InternalPartitionServiceImpl partitionService = getService();
InternalMigrationListener migrationListener = partitionService.getInternalMigrationListener();
Expand Down
Expand Up @@ -60,6 +60,9 @@ public void run() {
rollbackDestination();
}

InternalPartitionServiceImpl partitionService = getService();
partitionService.getMigrationManager().removeActiveMigration(getPartitionId());

if (success) {
nodeEngine.onPartitionMigrate(migrationInfo);
}
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.hazelcast.internal.partition.MigrationInfo;
import com.hazelcast.internal.partition.impl.InternalMigrationListener.MigrationParticipant;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.MigrationManager;
import com.hazelcast.internal.partition.impl.PartitionReplicaManager;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
Expand Down Expand Up @@ -78,7 +77,8 @@ protected MigrationParticipant getMigrationParticipantType() {

@Override
public void run() throws Exception {
assertMigrationInitiatorIsMaster();
checkMigrationInitiatorIsMaster();
setActiveMigration();

try {
doRun();
Expand All @@ -95,7 +95,7 @@ public void run() throws Exception {
}

private void doRun() throws Exception {
if (startMigration()) {
if (migrationInfo.startProcessing()) {
try {
executeBeforeMigrations();

Expand All @@ -106,7 +106,7 @@ private void doRun() throws Exception {
} catch (Throwable e) {
success = false;
failureReason = e;
getLogger().severe("Error while executing replication operations" + migrationInfo, e);
getLogger().severe("Error while executing replication operations " + migrationInfo, e);
} finally {
afterMigrate();
}
Expand All @@ -116,17 +116,13 @@ private void doRun() throws Exception {
}
}

private void assertMigrationInitiatorIsMaster() {
private void checkMigrationInitiatorIsMaster() {
Address masterAddress = getNodeEngine().getMasterAddress();
if (!masterAddress.equals(migrationInfo.getMaster())) {
throw new RetryableHazelcastException("Migration initiator is not master node! => " + toString());
}
}

private boolean startMigration() {
return migrationInfo.startProcessing() && addActiveMigration();
}

private void logMigrationCancelled() {
getLogger().warning("Migration is cancelled -> " + migrationInfo);
}
Expand Down Expand Up @@ -168,12 +164,6 @@ protected PartitionMigrationEvent getMigrationEvent() {
migrationInfo.getDestinationNewReplicaIndex());
}

private boolean addActiveMigration() {
InternalPartitionServiceImpl partitionService = getService();
MigrationManager migrationManager = partitionService.getMigrationManager();
return migrationManager.addActiveMigration(migrationInfo);
}

private void runMigrationOperation(Operation op) throws Exception {
prepareOperation(op);
op.beforeRun();
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.hazelcast.internal.partition.MigrationInfo;
import com.hazelcast.internal.partition.impl.InternalMigrationListener.MigrationParticipant;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.MigrationManager;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.ExceptionAction;
import com.hazelcast.spi.MigrationAwareService;
Expand Down Expand Up @@ -73,22 +72,18 @@ public void run() {
InternalPartition partition = getPartition();
verifySource(nodeEngine.getThisAddress(), partition);

setActiveMigration();

if (!migrationInfo.startProcessing()) {
getLogger().warning("Migration is cancelled -> " + migrationInfo);
setFailed();
return;
}

InternalPartitionServiceImpl partitionService = getService();
MigrationManager migrationManager = partitionService.getMigrationManager();
if (!migrationManager.addActiveMigration(migrationInfo)) {
setFailed();
return;
}

try {
executeBeforeMigrations();
Collection<Operation> tasks = prepareMigrationOperations();
InternalPartitionServiceImpl partitionService = getService();
long[] replicaVersions = partitionService.getPartitionReplicaVersions(migrationInfo.getPartitionId());
invokeMigrationOperation(destination, replicaVersions, tasks);
returnResponse = false;
Expand Down
Expand Up @@ -22,7 +22,9 @@
import com.hazelcast.internal.partition.MigrationCycleOperation;
import com.hazelcast.internal.partition.MigrationInfo;
import com.hazelcast.internal.partition.PartitionRuntimeState;
import com.hazelcast.internal.partition.impl.InternalPartitionImpl;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.partition.impl.PartitionStateManager;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -104,6 +106,8 @@ public void afterRun() throws Exception {
private void beforePromotion() {
NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
InternalOperationService operationService = nodeEngine.getOperationService();
InternalPartitionServiceImpl partitionService = getService();
PartitionStateManager partitionStateManager = partitionService.getPartitionStateManager();
AtomicInteger tasks = new AtomicInteger(promotions.size());

ILogger logger = getLogger();
Expand All @@ -113,6 +117,8 @@ private void beforePromotion() {
logger.fine("Submitting before promotion tasks for " + promotions.size() + " promotions.");
}
for (MigrationInfo promotion : promotions) {
InternalPartitionImpl partition = partitionStateManager.getPartitionImpl(promotion.getPartitionId());
partition.setMigrating(true);
operationService.execute(new BeforePromotionTask(this, promotion, nodeEngine, tasks));
}
}
Expand Down