diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java index 42f2d3e73..32ad00f8e 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java @@ -94,7 +94,9 @@ public List getOverloadKeeperContainerMigra public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List keeperContainerDetailModels) { logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels); try { - keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels); + if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) { + return RetMessage.createFailMessage("has unfinished migration task!"); + } } catch (Throwable th) { logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage()); return RetMessage.createFailMessage(th.getMessage()); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java index 189916e88..7cea77edf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) { .getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName()); for (ShardModel shardModel : allShardModel) { - if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(), + if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(), (model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null) ? null : model.getTargetKeeperContainer().getAddr().getHost())) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java index 38d6222bb..9791c8156 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt private final List alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS); + public final static String KEEPER_MIGRATION = "keeper_migration"; + public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success"; public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail"; @@ -47,10 +49,6 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success"; - - public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success"; public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail"; @@ -87,7 +85,7 @@ void migrateAllKeepers(List readyToMigratio ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) { logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard); alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp, diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java index 341d5835f..24759f6fe 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java @@ -41,29 +41,4 @@ protected InfoCommand generteInfoCommand(Endpoint key) { return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut); } - protected void addHookAndExecute(Command command, Callbackable callback) { - logger.info("[zyfTest][addHookAndExecute] start execute"); - CommandFuture future = command.execute(); - logger.info("[zyfTest][addHookAndExecute] start addListener"); - future.addListener(new CommandFutureListener() { - @Override - public void operationComplete(CommandFuture commandFuture) throws Exception { - if(!commandFuture.isSuccess()) { - logger.info("[zyfTest][addHookAndExecute] listener fail"); - callback.fail(commandFuture.cause()); - } else { - logger.info("[zyfTest][addHookAndExecute] listener success"); - callback.success(commandFuture.get()); - } - } - }); - try { - logger.info("[zyfTest][addHookAndExecute] before get"); - future.get(); - logger.info("[zyfTest][addHookAndExecute] get over"); - } catch (Exception e){ - throw new RuntimeException(e); - } - } - } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperRoleCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperRoleCommand.java new file mode 100644 index 000000000..56159268d --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperRoleCommand.java @@ -0,0 +1,40 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; + +import java.util.concurrent.ScheduledExecutorService; + +public class CheckKeeperRoleCommand extends AbstractKeeperCommand{ + + private Endpoint keeper; + + private boolean expectedRole; + + public CheckKeeperRoleCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedRole) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + this.expectedRole = expectedRole; + } + + @Override + public String getName() { + return "CheckKeeperRoleCommand"; + } + + @Override + protected void doExecute() throws Throwable { + InfoCommand infoCommand = generteInfoCommand(keeper); + if (new InfoResultExtractor(infoCommand.execute().get()).getKeeperActive() == expectedRole) { + this.future().setSuccess(); + } + this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedRole))); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java index 01f540717..c81da01e7 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java @@ -1,33 +1,21 @@ package com.ctrip.xpipe.redis.console.keeper.Command; -import com.ctrip.xpipe.api.command.Command; import com.ctrip.xpipe.api.endpoint.Endpoint; -import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.command.DefaultRetryCommandFactory; -import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import java.util.concurrent.ScheduledExecutorService; public class FullSyncJudgeCommand extends AbstractKeeperCommand { - private Endpoint active; + private Endpoint activeInstance; - private Endpoint backUp; + private Endpoint backUpInstance; - private long intervalTime; - - private long activeMasterReplOffset; - - private long backupMasterReplOffset; - - public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) { + public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance) { super(keyedObjectPool, scheduled); - this.active = active; - this.backUp = backUp; - this.intervalTime = intervalTime; + this.activeInstance = activeInstance; + this.backUpInstance = backUpInstance; } @Override @@ -37,51 +25,17 @@ public String getName() { @Override protected void doExecute() throws Throwable { - try { - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); - Command activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active)); - Command backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp)); - addHookAndExecute(activeRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable); - } - }); - - try { - Thread.sleep(intervalTime); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - addHookAndExecute(backUpRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable); - } - }); - - if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { - this.future().setSuccess(); - } - } finally { - try { - keyedObjectPool.clear(active); - keyedObjectPool.clear(backUp); - } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e); - } + long activeMasterReplOffset, backupMasterReplOffset; + activeMasterReplOffset = new InfoResultExtractor(generteInfoCommand(activeInstance).execute().get()).getMasterReplOffset(); + Thread.sleep(1000); + backupMasterReplOffset = new InfoResultExtractor(generteInfoCommand(backUpInstance).execute().get()).getMasterReplOffset(); + + logger.debug("[FullSyncJudgeCommand] activeInstance: {}, backUpInstance: {}, activeMasterReplOffset: {}, backupMasterReplOffset:{}", + activeInstance, backUpInstance, activeMasterReplOffset, backupMasterReplOffset); + if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { + this.future().setSuccess(); } + this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance))); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java index a2478a9a2..bc3fb40d9 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java @@ -1,35 +1,30 @@ package com.ctrip.xpipe.redis.console.keeper.Command; import com.ctrip.xpipe.api.command.Command; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.command.DefaultRetryCommandFactory; import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; -import com.ctrip.xpipe.redis.console.model.RedisTbl; import com.ctrip.xpipe.redis.console.service.KeeperContainerService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; -public class SwitchMasterCommand extends AbstractKeeperCommand{ +public class SwitchMasterCommand extends AbstractCommand { - private String activeIp; + private String activeKeeperIp; - private String backupIp; - - private List keepers; + private long shardId; private KeeperContainerService keeperContainerService; - public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List keepers, KeeperContainerService keeperContainerService) { - super(keyedObjectPool, scheduled); - this.activeIp = activeIp; - this.backupIp = backupIp; - this.keepers = keepers; + public SwitchMasterCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) { + this.activeKeeperIp = activeKeeperIp; + this.shardId = shardId; this.keeperContainerService = keeperContainerService; } @@ -40,72 +35,8 @@ public String getName() { @Override protected void doExecute() throws Throwable { - try { - logger.info("[zyfTest][SwitchMasterCommand] start"); - if (keepers.size() != 2) { - logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers); - return; - } - int activeKeeperPort = -1; - String backUpKeeperIp = null; - for (RedisTbl keeper : keepers) { - if (keeper.getRedisIp().equals(activeIp)) { - activeKeeperPort = keeper.getRedisPort(); - } else { - backUpKeeperIp = keeper.getRedisIp(); - } - } - - if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { - logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}" - , activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp); - return; - } - - KeeperTransMeta keeperInstanceMeta = null; - logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers"); - List allKeepers = keeperContainerService.getAllKeepers(activeIp); - logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers"); - for (KeeperInstanceMeta keeper : allKeepers) { - if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { - keeperInstanceMeta = keeper; - break; - } - } - - if (keeperInstanceMeta == null) { - logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); - return; - } - logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers"); - keeperContainerService.resetKeepers(keeperInstanceMeta); - logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers"); - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); - Command retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort))); - logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand"); - int finalActiveKeeperPort = activeKeeperPort; - addHookAndExecute(retryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success"); - if (!new InfoResultExtractor(message).getKeeperActive()) { - future().setSuccess(); - } - } - - @Override - public void fail(Throwable throwable) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail"); - logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable); - } - }); - if (retryInfoCommand.future().isSuccess()) { - future().setSuccess(); - logger.info("[zyfTest][SwitchMasterCommand] over success"); - } - } catch (Exception e) { - logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); - } + keeperContainerService.resetKeepers(activeKeeperIp, shardId); + this.future().setSuccess(); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java deleted file mode 100644 index 212b2a839..000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.ctrip.xpipe.redis.console.keeper.impl; - -public class DefaultKeeperContainerAvailablePool { -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java index 03a398ba6..2a81dfeaf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java @@ -169,8 +169,8 @@ public int hashCode() { @Override public String toString() { return "MigrationKeeperContainerDetailModel{" + - "srcKeeperContainer=" + srcKeeperContainer + - ", targetKeeperContainer=" + targetKeeperContainer + + "srcKeeperContainer=" + srcKeeperContainer.getKeeperIp() + + ", targetKeeperContainer=" + targetKeeperContainer.getKeeperIp() + ", migrateKeeperCount=" + migrateKeeperCount + ", migrateKeeperCompleteCount=" + migrateKeeperCompleteCount + ", switchActive=" + switchActive + diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java index 4fd2f8353..e6735623b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java @@ -5,7 +5,7 @@ import java.util.List; public interface KeeperContainerMigrationService { - void beginMigrateKeeperContainers(List keeperContainerDetailModels); + boolean beginMigrateKeeperContainers(List keeperContainerDetailModels); List getMigrationProcess(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java index 3c84923aa..6a1d80cec 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java @@ -42,7 +42,7 @@ public interface KeeperContainerService { List getAllKeepers(String keeperContainerIp); - void resetKeepers(KeeperTransMeta keeperInstanceMeta); + void resetKeepers(String activeKeeperIp, Long replId); Map keeperContainerIdDcMap(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java index 1f116dde4..083af7f6c 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java @@ -12,9 +12,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.*; @@ -32,63 +30,58 @@ public class DefaultKeeperContainerMigrationService implements KeeperContainerMi private volatile AtomicBoolean isBegin = new AtomicBoolean(false); @Override - public void beginMigrateKeeperContainers(List keeperContainerDetailModels) { - if (!isBegin.compareAndSet(false, true)) { - logger.info("[beginMigrateKeeperContainers] has already begin!!"); - return; - } - logger.debug("[beginMigrateKeeperContainers] begin migrate keeper containers {}", keeperContainerDetailModels); - readyToMigrationKeeperContainers = keeperContainerDetailModels; - Set alreadyMigrateShards = new HashSet<>(); - for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { - List migrateShards = keeperContainer.getMigrateShards(); - if (CollectionUtils.isEmpty(migrateShards)) continue; + public boolean beginMigrateKeeperContainers(List keeperContainerDetailModels) { + try { + if (!isBegin.compareAndSet(false, true)) { + logger.info("[beginMigrateKeeperContainers] has already begin!!"); + return false; + } + readyToMigrationKeeperContainers = keeperContainerDetailModels; + for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { + List migrateShards = keeperContainer.getMigrateShards(); + if (CollectionUtils.isEmpty(migrateShards)) continue; - String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); - String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); - for (DcClusterShard migrateShard : migrateShards) { - ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), - migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!alreadyMigrateShards.add(migrateShard)) { - logger.info("[beginMigrateKeeperContainers] shard {} has already migrated, should not migrate in the same time", migrateShard); - continue; - } - logger.debug("[beginMigrateKeeperContainers] begin migrate shard {} from srcKeeperContainer:{} to targetKeeperContainer:{}", - migrateShard, srcKeeperContainerIp, targetKeeperContainerIp); - String event; - if (keeperContainer.isSwitchActive()) { - logger.info("[zyfTest] start switchMaster"); - if (shardModelService.switchMaster(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { - logger.info("[zyfTest] switchMaster success"); - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_SWITCH_MASTER_SUCCESS; - } else { - logger.info("[zyfTest] switchMaster fail"); - event = KEEPER_SWITCH_MASTER_FAIL; - } - }else if (keeperContainer.isKeeperPairOverload()) { - if (shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_BACKUP_SUCCESS; - } else { - event = KEEPER_MIGRATION_BACKUP_FAIL; - } - }else { - if (shardModelService.migrateAutoBalanceKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_ACTIVE_START_SUCCESS; - } else { - event = KEEPER_MIGRATION_ACTIVE_START_FAIL; + String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); + String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); + for (DcClusterShard migrateShard : migrateShards) { + ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), + migrateShard.getClusterId(), migrateShard.getShardId(), false, null); + logger.debug("[beginMigrateKeeperContainers] begin migrate shard {} from srcKeeperContainer:{} to targetKeeperContainer:{}", + migrateShard, srcKeeperContainerIp, targetKeeperContainerIp); + String event; + if (keeperContainer.isSwitchActive()) { + if (shardModelService.switchActiveKeeper(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_SWITCH_MASTER_SUCCESS; + } else { + event = KEEPER_SWITCH_MASTER_FAIL; + } + }else if (keeperContainer.isKeeperPairOverload()) { + if (shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_BACKUP_SUCCESS; + } else { + event = KEEPER_MIGRATION_BACKUP_FAIL; + } + }else { + if (shardModelService.migrateActiveKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_ACTIVE_SUCCESS; + } else { + event = KEEPER_MIGRATION_ACTIVE_FAIL; + } } + CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION, event); + logger.info("migrate keeper dc:{}, cluster:{}, shard:{}, src:{}, target:{}", + migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, targetKeeperContainerIp); } - CatEventMonitor.DEFAULT.logEvent(event, String.format("dc:%s, cluster:%s, shard:%s, src:%s, target:%s", - migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, - targetKeeperContainerIp)); } + return true; + } finally { + isBegin.set(false); } - isBegin.set(false); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java index 2a2a83f87..583132ace 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java @@ -455,12 +455,14 @@ public List getAllKeepers(String keeperContainerIp) { } @Override - public void resetKeepers(KeeperTransMeta keeperInstanceMeta) { + public void resetKeepers(String activeKeeperIp, Long replId) { + KeeperTransMeta keeperInstanceMeta = new KeeperTransMeta(); + keeperInstanceMeta.setReplId(replId); getOrCreateRestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); - restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", keeperInstanceMeta.getKeeperMeta().getIp()), + restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", activeKeeperIp), HttpMethod.POST, requestEntity, Void.class); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java index 2becee4d9..c645d620c 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java @@ -18,11 +18,11 @@ ShardModel getShardModel(String dcName, String clusterName, String shardName, ShardModel getSourceShardModel(String clusterName, String srcDcName, String toDcName, String shardName); - boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp); - boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel); + boolean switchActiveKeeper(String srcIp, String targetIp, ShardModel shardModel); - boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, - String srcKeeperContainerIp, String targetKeeperContainerIp); + boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, + String srcKeeperContainerIp, String targetKeeperContainerIp); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 18de8817a..8d36a19ea 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -1,35 +1,24 @@ package com.ctrip.xpipe.redis.console.service.model.impl; -import com.ctrip.framework.xpipe.redis.ProxyRegistry; import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.api.command.CommandFuture; -import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.command.DefaultRetryCommandFactory; import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.command.SequenceCommandChain; import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.netty.commands.NettyClient; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.console.constant.XPipeConsoleConstant; import com.ctrip.xpipe.redis.console.exception.DataNotFoundException; import com.ctrip.xpipe.redis.console.exception.ServerException; +import com.ctrip.xpipe.redis.console.keeper.Command.CheckKeeperRoleCommand; import com.ctrip.xpipe.redis.console.keeper.Command.FullSyncJudgeCommand; import com.ctrip.xpipe.redis.console.keeper.Command.SwitchMasterCommand; import com.ctrip.xpipe.redis.console.model.*; import com.ctrip.xpipe.redis.console.repository.AzGroupClusterRepository; import com.ctrip.xpipe.redis.console.service.*; import com.ctrip.xpipe.redis.console.service.model.ShardModelService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import com.ctrip.xpipe.utils.ObjectUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import com.ctrip.xpipe.utils.XpipeThreadFactory; @@ -83,9 +72,7 @@ public class ShardModelServiceImpl implements ShardModelService{ @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; - private RetryCommandFactory switchMasterCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 1000); - - private RetryCommandFactory fullSyncCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); + private RetryCommandFactory retryCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); @Override public List getAllShardModel(String dcName, String clusterName) { @@ -255,7 +242,7 @@ private void addRedisesAndKeepersToNormalShard(ShardModel shardModel, long dcClu } @Override - public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + public boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); @@ -263,43 +250,83 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel } @Override - public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { - Command switchMasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeIp, backupIp, shardModel.getKeepers(), keeperContainerService)); + public boolean switchActiveKeeper(String activeIp, String backupIp, ShardModel shardModel) { + List keepers = shardModel.getKeepers(); + if (keepers.size() != 2) { + logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardName: {}, shardKeepersNum: {}", activeIp, backupIp, shardModel.getShardTbl().getShardName(), keepers.size()); + return false; + } + Endpoint activeKeeper = null, backUpKeeper = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } + } + + if (activeKeeper == null || backUpKeeper == null || !backupIp.equals(backUpKeeper.getHost())) { + logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeKeeper: {}, backUpKeeper: {}, shardModelKeepers1: {}, shardModelKeepers2: {}" + , activeKeeper, backUpKeeper, new DefaultEndPoint(keepers.get(0).getRedisIp(), keepers.get(0).getRedisPort()), new DefaultEndPoint(keepers.get(1).getRedisIp(), keepers.get(1).getRedisPort())); + return false; + } + Command switchMasterCommand = retryCommandFactory.createRetryCommand(new SwitchMasterCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperRoleCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); + SequenceCommandChain chain = new SequenceCommandChain(false, false); + chain.add(switchMasterCommand); + chain.add(checkKeeperRoleCommand); try { - logger.info("[zyfTest] start switchMasterCommand execute"); - switchMasterCommand.execute().get(); - logger.info("[zyfTest] start switchMasterCommand execute over"); - logger.info("[zyfTest] start switchMasterCommand execute success?:{}",switchMasterCommand.future().isSuccess()); - return switchMasterCommand.future().isSuccess(); + chain.execute().get(); + return chain.future().isSuccess(); } catch (Exception e) { - logger.error("[switchMaster] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); + logger.error("[switchMaster] switch master failed, activeKeeper: {}, backUpKeeper: {}", activeKeeper, backUpKeeper, e); return false; + } finally { + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[clear] clear keyed object pool error, activeKeeper:{}, backUpKeeper:{}", activeKeeper, backUpKeeper, e); + } } } @Override - public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { + public boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List oldKeepers = shardModel.getKeepers(); List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) { - throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); + logger.error(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); + return false; } - RedisTbl active = newKeepers.get(0); - RedisTbl backup = newKeepers.get(1); - DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort()); - DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort()); - Command fullSyncJudgeRetryCommand = fullSyncCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKey, backupKey, 1000)); - Command switchmasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeKey.getHost(), backupKey.getHost(), newKeepers, keeperContainerService)); + Endpoint activeKeeper, backUpKeeper; + if (newKeepers.get(0).getRedisIp().equals(targetKeeperContainerIp)) { + activeKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + backUpKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + activeKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } + Command fullSyncJudgeRetryCommand = retryCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKeeper, backUpKeeper)); + Command switchmasterCommand = retryCommandFactory.createRetryCommand(new SwitchMasterCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperRoleCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); SequenceCommandChain chain = new SequenceCommandChain(false, false); chain.add(fullSyncJudgeRetryCommand); chain.add(switchmasterCommand); + chain.add(checkKeeperRoleCommand); try { chain.execute().get(); return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); } catch (InterruptedException | ExecutionException e) { - logger.error("[fullSyncJudge] execute fullSyncJudgeRetryCommand fail", e); return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); + } finally { + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[clear] clear keyed object pool error, activeKeeper:{}, backUpKeeper:{}", activeKeeper, backUpKeeper, e); + } } } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java index ac2c89345..d5f6e1cd1 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java @@ -40,7 +40,7 @@ public void beforeAutoMigrateOverloadKeeperContainerActionTest() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); } @@ -100,7 +100,7 @@ public void testMigrateAllKeepersFail() { .setSrcKeeperContainer(model2).setTargetKeeperContainer(model4).setMigrateKeeperCount(4).setMigrateShards(migrationShards2); readyToMigrationKeeperContainers.add(migrationKeeperContainerDetailModel2); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); action.migrateAllKeepers(readyToMigrationKeeperContainers); Assert.assertEquals(0, migrationKeeperContainerDetailModel1.getMigrateKeeperCompleteCount()); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index fb0e58500..36e241bd7 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -1,6 +1,5 @@ package com.ctrip.xpipe.redis.console.service; -import com.ctrip.xpipe.command.DefaultCommandFuture; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; import com.ctrip.xpipe.redis.console.model.RedisTbl; @@ -8,7 +7,6 @@ import com.ctrip.xpipe.redis.console.model.ShardTbl; import com.ctrip.xpipe.redis.console.service.impl.DefaultKeeperAdvancedService; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -73,7 +71,7 @@ public void testMigrateAutoBalanceKeepers() throws Exception { Mockito.when(executor.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(future); Mockito.when(future.get()).thenReturn(null); try { - shardModelService.migrateAutoBalanceKeepers(dcName, clusterName, shardModel, srcIp, targetIp); + shardModelService.migrateActiveKeeper(dcName, clusterName, shardModel, srcIp, targetIp); } catch (Exception e) { Assert.assertEquals(e.getClass(), RuntimeException.class); } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java index 8bb7ad082..fac19b8b4 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java @@ -9,7 +9,6 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -40,9 +39,9 @@ public void before() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); - Mockito.when(shardModelService.switchMaster(Mockito.anyString(), Mockito.anyString(), Mockito.any())) + Mockito.when(shardModelService.switchActiveKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any())) .thenReturn(true); } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java index 7dcb8e6d1..e5dd83752 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java @@ -325,7 +325,7 @@ public void resetKeepersTest() { HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), eq(requestEntity), eq(Void.class))).thenReturn(null); - keeperContainerService.resetKeepers(keeperInstanceMeta); + keeperContainerService.resetKeepers(keeperInstanceMeta.getKeeperMeta().getIp(), keeperInstanceMeta.getReplId()); } @Test