Skip to content

Commit

Permalink
执行迁移计划流程控制优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Apr 25, 2024
1 parent e72a362 commit c3e3271
Show file tree
Hide file tree
Showing 19 changed files with 198 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) {
return RetMessage.createFailMessage("has unfinished migration task!");
}
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
return RetMessage.createFailMessage(th.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
.getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName());

for (ShardModel shardModel : allShardModel) {
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(),
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
? null : model.getTargetKeeperContainer().getAddr().getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

private final List<ALERT_TYPE> alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS);

public final static String KEEPER_MIGRATION = "keeper_migration";

public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success";

public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail";
Expand All @@ -47,10 +49,6 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail";

public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success";

public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail";

public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success";

public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail";
Expand Down Expand Up @@ -87,7 +85,7 @@ void migrateAllKeepers(List<MigrationKeeperContainerDetailModel> readyToMigratio
ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(),
migrateShard.getClusterId(), migrateShard.getShardId(), false, null);

if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) {
logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard);
alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,4 @@ protected InfoCommand generteInfoCommand(Endpoint key) {
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
}

protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
logger.info("[zyfTest][addHookAndExecute] start execute");
CommandFuture<V> future = command.execute();
logger.info("[zyfTest][addHookAndExecute] start addListener");
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
logger.info("[zyfTest][addHookAndExecute] listener fail");
callback.fail(commandFuture.cause());
} else {
logger.info("[zyfTest][addHookAndExecute] listener success");
callback.success(commandFuture.get());
}
}
});
try {
logger.info("[zyfTest][addHookAndExecute] before get");
future.get();
logger.info("[zyfTest][addHookAndExecute] get over");
} catch (Exception e){
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class CheckKeeperRoleCommand<T> extends AbstractKeeperCommand<T>{

private Endpoint keeper;

private boolean expectedRole;

public CheckKeeperRoleCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedRole) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
this.expectedRole = expectedRole;
}

@Override
public String getName() {
return "CheckKeeperRoleCommand";
}

@Override
protected void doExecute() throws Throwable {
InfoCommand infoCommand = generteInfoCommand(keeper);
if (new InfoResultExtractor(infoCommand.execute().get()).getKeeperActive() == expectedRole) {
this.future().setSuccess();
}
this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedRole)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,21 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.ObjectPoolException;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint active;
private Endpoint activeInstance;

private Endpoint backUp;
private Endpoint backUpInstance;

private long intervalTime;

private long activeMasterReplOffset;

private long backupMasterReplOffset;

public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance) {
super(keyedObjectPool, scheduled);
this.active = active;
this.backUp = backUp;
this.intervalTime = intervalTime;
this.activeInstance = activeInstance;
this.backUpInstance = backUpInstance;
}

@Override
Expand All @@ -37,51 +25,17 @@ public String getName() {

@Override
protected void doExecute() throws Throwable {
try {
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
}
});

try {
Thread.sleep(intervalTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
}
});

if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
} finally {
try {
keyedObjectPool.clear(active);
keyedObjectPool.clear(backUp);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
}
long activeMasterReplOffset, backupMasterReplOffset;
activeMasterReplOffset = new InfoResultExtractor(generteInfoCommand(activeInstance).execute().get()).getMasterReplOffset();
Thread.sleep(1000);
backupMasterReplOffset = new InfoResultExtractor(generteInfoCommand(backUpInstance).execute().get()).getMasterReplOffset();

logger.debug("[FullSyncJudgeCommand] activeInstance: {}, backUpInstance: {}, activeMasterReplOffset: {}, backupMasterReplOffset:{}",
activeInstance, backUpInstance, activeMasterReplOffset, backupMasterReplOffset);
if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.console.model.RedisTbl;
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

public class SwitchMasterCommand<T> extends AbstractKeeperCommand<T>{
public class SwitchMasterCommand<T> extends AbstractCommand<T> {

private String activeIp;
private String activeKeeperIp;

private String backupIp;

private List<RedisTbl> keepers;
private long shardId;

private KeeperContainerService keeperContainerService;

public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List<RedisTbl> keepers, KeeperContainerService keeperContainerService) {
super(keyedObjectPool, scheduled);
this.activeIp = activeIp;
this.backupIp = backupIp;
this.keepers = keepers;
public SwitchMasterCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) {
this.activeKeeperIp = activeKeeperIp;
this.shardId = shardId;
this.keeperContainerService = keeperContainerService;
}

Expand All @@ -40,72 +35,8 @@ public String getName() {

@Override
protected void doExecute() throws Throwable {
try {
logger.info("[zyfTest][SwitchMasterCommand] start");
if (keepers.size() != 2) {
logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers);
return;
}
int activeKeeperPort = -1;
String backUpKeeperIp = null;
for (RedisTbl keeper : keepers) {
if (keeper.getRedisIp().equals(activeIp)) {
activeKeeperPort = keeper.getRedisPort();
} else {
backUpKeeperIp = keeper.getRedisIp();
}
}

if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) {
logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}"
, activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp);
return;
}

KeeperTransMeta keeperInstanceMeta = null;
logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers");
List<KeeperInstanceMeta> allKeepers = keeperContainerService.getAllKeepers(activeIp);
logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers");
for (KeeperInstanceMeta keeper : allKeepers) {
if (keeper.getKeeperMeta().getPort() == activeKeeperPort) {
keeperInstanceMeta = keeper;
break;
}
}

if (keeperInstanceMeta == null) {
logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort);
return;
}
logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers");
keeperContainerService.resetKeepers(keeperInstanceMeta);
logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers");
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000);
Command<String> retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort)));
logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand");
int finalActiveKeeperPort = activeKeeperPort;
addHookAndExecute(retryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success");
if (!new InfoResultExtractor(message).getKeeperActive()) {
future().setSuccess();
}
}

@Override
public void fail(Throwable throwable) {
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail");
logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable);
}
});
if (retryInfoCommand.future().isSuccess()) {
future().setSuccess();
logger.info("[zyfTest][SwitchMasterCommand] over success");
}
} catch (Exception e) {
logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e);
}
keeperContainerService.resetKeepers(activeKeeperIp, shardId);
this.future().setSuccess();
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public int hashCode() {
@Override
public String toString() {
return "MigrationKeeperContainerDetailModel{" +
"srcKeeperContainer=" + srcKeeperContainer +
", targetKeeperContainer=" + targetKeeperContainer +
"srcKeeperContainer=" + srcKeeperContainer.getKeeperIp() +
", targetKeeperContainer=" + targetKeeperContainer.getKeeperIp() +
", migrateKeeperCount=" + migrateKeeperCount +
", migrateKeeperCompleteCount=" + migrateKeeperCompleteCount +
", switchActive=" + switchActive +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.List;

public interface KeeperContainerMigrationService {
void beginMigrateKeeperContainers(List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels);
boolean beginMigrateKeeperContainers(List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels);

List<MigrationKeeperContainerDetailModel> getMigrationProcess();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface KeeperContainerService {

List<KeeperInstanceMeta> getAllKeepers(String keeperContainerIp);

void resetKeepers(KeeperTransMeta keeperInstanceMeta);
void resetKeepers(String activeKeeperIp, Long replId);

Map<Long, Long> keeperContainerIdDcMap();
}

0 comments on commit c3e3271

Please sign in to comment.