Skip to content

Commit

Permalink
执行迁移计划流程控制优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed May 23, 2024
1 parent a6c5302 commit b850e4d
Show file tree
Hide file tree
Showing 32 changed files with 624 additions and 413 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) {
long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue();
deleteKeeper(info);
Map<DcClusterShardKeeper, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow);
} catch (Throwable throwable) {
logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.console.controller.consoleportal;

import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer;
Expand Down Expand Up @@ -92,16 +93,25 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra

@RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST)
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) {
return RetMessage.createFailMessage("The previous migration tasks are still in progress!");
}
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th);
return RetMessage.createFailMessage(th.getMessage());
}
return RetMessage.createSuccessMessage();
}

@RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST)
public RetMessage migrateKeeperTaskTerminate() {
if(keeperContainerMigrationService.stopMigrate()){
return RetMessage.createSuccessMessage("All migration tasks have been completed");
}
return RetMessage.createSuccessMessage("No migration tasks in progress");
}

@RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getLastedAllReadyMigrateKeeperContainers() {
return analyzer.getAllDcKeeperContainerUsedInfoModelsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
.getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName());

for (ShardModel shardModel : allShardModel) {
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(),
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
? null : model.getTargetKeeperContainer().getAddr().getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

private final List<ALERT_TYPE> alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS);

public final static String KEEPER_MIGRATION = "keeper_migration";

public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success";

public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail";
Expand All @@ -47,14 +49,12 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail";

public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success";

public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail";

public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success";

public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail";

public final static String KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR = "keeper_migration_active_rollback_error";

public final static String KEEPER_MIGRATION_BACKUP_SUCCESS = "keeper_migration_backup_success";

public final static String KEEPER_MIGRATION_BACKUP_FAIL = "keeper_migration_backup_fail";
Expand Down Expand Up @@ -87,7 +87,7 @@ void migrateAllKeepers(List<MigrationKeeperContainerDetailModel> readyToMigratio
ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(),
migrateShard.getClusterId(), migrateShard.getShardId(), false, null);

if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) {
logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard);
alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.framework.xpipe.redis.ProxyRegistry;
import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,37 +29,9 @@ protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool,
this.scheduled = scheduled;
}

protected InfoCommand generteInfoCommand(Endpoint key) {
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
}
protected InfoCommand generateInfoReplicationCommand(Endpoint key) {
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
}

protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
logger.info("[zyfTest][addHookAndExecute] start execute");
CommandFuture<V> future = command.execute();
logger.info("[zyfTest][addHookAndExecute] start addListener");
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
logger.info("[zyfTest][addHookAndExecute] listener fail");
callback.fail(commandFuture.cause());
} else {
logger.info("[zyfTest][addHookAndExecute] listener success");
callback.success(commandFuture.get());
}
}
});
try {
logger.info("[zyfTest][addHookAndExecute] before get");
future.get();
logger.info("[zyfTest][addHookAndExecute] get over");
} catch (Exception e){
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import com.ctrip.xpipe.utils.VisibleForTesting;

import java.util.concurrent.ScheduledExecutorService;

public class CheckKeeperActiveCommand<T> extends AbstractKeeperCommand<T>{

private Endpoint keeper;

private boolean expectedActive;

public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
this.expectedActive = expectedActive;
}

@Override
public String getName() {
return "CheckKeeperActiveCommand";
}

@Override
protected void doExecute() throws Throwable {
InfoCommand infoCommand = generateInfoReplicationCommand(keeper);
if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;

import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED;

public class CheckKeeperConnectedCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint keeper;

public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "CheckKeeperConnectedCommand";
}

@Override
protected void doExecute() throws Throwable {
SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get();
if (REDIS_REPL_CONNECTED == role.getMasterState()) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.ObjectPoolException;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint active;
private Endpoint activeInstance;

private Endpoint backUp;

private long intervalTime;
private Endpoint backUpInstance;

private long activeMasterReplOffset;

private long backupMasterReplOffset;

public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) {
super(keyedObjectPool, scheduled);
this.active = active;
this.backUp = backUp;
this.intervalTime = intervalTime;
this.activeInstance = activeInstance;
this.backUpInstance = backUpInstance;
this.activeMasterReplOffset = activeMasterReplOffset;
}

@Override
Expand All @@ -37,51 +28,13 @@ public String getName() {

@Override
protected void doExecute() throws Throwable {
try {
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
}
});

try {
Thread.sleep(intervalTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
}
});

if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
} finally {
try {
keyedObjectPool.clear(active);
keyedObjectPool.clear(backUp);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
}
long backupMasterReplOffset;
backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset();
if (backupMasterReplOffset > 0 && activeMasterReplOffset > 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class KeeperContainerReplOffsetGetCommand<V> extends AbstractKeeperCommand<Object>{

private Endpoint keeper;

public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "KeeperContainerReplOffsetGetCommand";
}

@Override
protected void doExecute() throws Throwable {
this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset());
}

@Override
protected void doReset() {

}
}

0 comments on commit b850e4d

Please sign in to comment.