From b850e4d0e7c3cf3e37028f20d1d588840599b506 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Thu, 23 May 2024 12:46:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=81=E7=A7=BB=E8=AE=A1?= =?UTF-8?q?=E5=88=92=E6=B5=81=E7=A8=8B=E6=8E=A7=E5=88=B6=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../keeper/infoStats/KeeperFlowCollector.java | 2 +- .../KeeperContainerInfoController.java | 16 +- .../consoleportal/RedisController.java | 2 +- ...oMigrateOverloadKeeperContainerAction.java | 10 +- .../keeper/Command/AbstractKeeperCommand.java | 36 +--- .../Command/CheckKeeperActiveCommand.java | 42 +++++ .../Command/CheckKeeperConnectedCommand.java | 40 +++++ .../keeper/Command/FullSyncJudgeCommand.java | 71 ++------ .../KeeperContainerReplOffsetGetCommand.java | 32 ++++ .../keeper/Command/KeeperResetCommand.java | 37 ++++ .../keeper/Command/SwitchMasterCommand.java | 115 ------------ .../DefaultKeeperContainerAvailablePool.java | 4 - .../MigrationKeeperContainerDetailModel.java | 13 +- .../KeeperContainerMigrationService.java | 4 +- .../service/KeeperContainerService.java | 3 +- ...efaultKeeperContainerMigrationService.java | 108 ++++++------ .../impl/KeeperContainerServiceImpl.java | 6 +- .../service/model/ShardModelService.java | 8 +- .../model/impl/ShardModelServiceImpl.java | 166 ++++++++++++------ .../src/main/resources/static/dist/bundle.js | 4 +- .../controllers/KeepercontainerOverloadCtl.ts | 124 +++++++------ .../services/KeeperContainerService.ts | 16 ++ .../views/index/keepercontainer_overall.html | 2 +- .../views/index/keepercontainer_overload.html | 26 ++- .../ctrip/xpipe/redis/console/AllTests.java | 4 + ...rateOverloadKeeperContainerActionTest.java | 4 +- .../impl/AbstractKeeperCommandTest.java | 76 ++++++++ .../impl/CheckerKeeperActiveCommandTest.java | 26 +++ .../service/ShardModelServiceTest.java | 29 ++- ...ltKeeperContainerMigrationServiceTest.java | 7 +- .../impl/KeeperContainerServiceImplTest.java | 2 +- .../protocal/cmd/InfoResultExtractor.java | 2 +- 32 files changed, 624 insertions(+), 413 deletions(-) create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java delete mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java delete mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java create mode 100644 redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java create mode 100644 redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java index 5123f952ad..50ac8b131b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/keeper/infoStats/KeeperFlowCollector.java @@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) { long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue(); deleteKeeper(info); Map keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new); - keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow); + keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow); } catch (Throwable throwable) { logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java index 42f2d3e737..efe4c74f4c 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/KeeperContainerInfoController.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.console.controller.consoleportal; import com.ctrip.xpipe.redis.checker.controller.result.RetMessage; +import com.ctrip.xpipe.redis.checker.model.DcClusterShard; import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel; import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController; import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer; @@ -92,16 +93,25 @@ public List getOverloadKeeperContainerMigra @RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST) public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List keeperContainerDetailModels) { - logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels); try { - keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels); + if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) { + return RetMessage.createFailMessage("The previous migration tasks are still in progress!"); + } } catch (Throwable th) { - logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage()); + logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th); return RetMessage.createFailMessage(th.getMessage()); } return RetMessage.createSuccessMessage(); } + @RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST) + public RetMessage migrateKeeperTaskTerminate() { + if(keeperContainerMigrationService.stopMigrate()){ + return RetMessage.createSuccessMessage("All migration tasks have been completed"); + } + return RetMessage.createSuccessMessage("No migration tasks in progress"); + } + @RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET) public List getLastedAllReadyMigrateKeeperContainers() { return analyzer.getAllDcKeeperContainerUsedInfoModelsList(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java index 189916e888..7cea77edf2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/consoleportal/RedisController.java @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) { .getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName()); for (ShardModel shardModel : allShardModel) { - if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(), + if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(), (model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null) ? null : model.getTargetKeeperContainer().getAddr().getHost())) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java index 38d6222bb7..6f585b55f1 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerAction.java @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt private final List alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS); + public final static String KEEPER_MIGRATION = "keeper_migration"; + public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success"; public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail"; @@ -47,14 +49,12 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success"; - - public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail"; - public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success"; public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail"; + public final static String KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR = "keeper_migration_active_rollback_error"; + public final static String KEEPER_MIGRATION_BACKUP_SUCCESS = "keeper_migration_backup_success"; public final static String KEEPER_MIGRATION_BACKUP_FAIL = "keeper_migration_backup_fail"; @@ -87,7 +87,7 @@ void migrateAllKeepers(List readyToMigratio ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) { logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard); alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp, diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java index 341d5835f1..ede11fdd53 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java @@ -1,18 +1,14 @@ package com.ctrip.xpipe.redis.console.keeper.Command; import com.ctrip.framework.xpipe.redis.ProxyRegistry; -import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.api.command.CommandFuture; -import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.netty.commands.NettyClient; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; -import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import com.ctrip.xpipe.utils.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,37 +29,9 @@ protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, this.scheduled = scheduled; } - protected InfoCommand generteInfoCommand(Endpoint key) { - if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { - commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; - } + protected InfoCommand generateInfoReplicationCommand(Endpoint key) { SimpleObjectPool keyPool = keyedObjectPool.getKeyPool(key); return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut); } - protected void addHookAndExecute(Command command, Callbackable callback) { - logger.info("[zyfTest][addHookAndExecute] start execute"); - CommandFuture future = command.execute(); - logger.info("[zyfTest][addHookAndExecute] start addListener"); - future.addListener(new CommandFutureListener() { - @Override - public void operationComplete(CommandFuture commandFuture) throws Exception { - if(!commandFuture.isSuccess()) { - logger.info("[zyfTest][addHookAndExecute] listener fail"); - callback.fail(commandFuture.cause()); - } else { - logger.info("[zyfTest][addHookAndExecute] listener success"); - callback.success(commandFuture.get()); - } - } - }); - try { - logger.info("[zyfTest][addHookAndExecute] before get"); - future.get(); - logger.info("[zyfTest][addHookAndExecute] get over"); - } catch (Exception e){ - throw new RuntimeException(e); - } - } - } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java new file mode 100644 index 0000000000..6179bff81f --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java @@ -0,0 +1,42 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; +import com.ctrip.xpipe.utils.VisibleForTesting; + +import java.util.concurrent.ScheduledExecutorService; + +public class CheckKeeperActiveCommand extends AbstractKeeperCommand{ + + private Endpoint keeper; + + private boolean expectedActive; + + public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + this.expectedActive = expectedActive; + } + + @Override + public String getName() { + return "CheckKeeperActiveCommand"; + } + + @Override + protected void doExecute() throws Throwable { + InfoCommand infoCommand = generateInfoReplicationCommand(keeper); + if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) { + this.future().setSuccess(); + return; + } + this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive))); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java new file mode 100644 index 0000000000..7131c9de02 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java @@ -0,0 +1,40 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand; +import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; + +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED; + +public class CheckKeeperConnectedCommand extends AbstractKeeperCommand { + + private Endpoint keeper; + + public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + } + + @Override + public String getName() { + return "CheckKeeperConnectedCommand"; + } + + @Override + protected void doExecute() throws Throwable { + SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get(); + if (REDIS_REPL_CONNECTED == role.getMasterState()) { + this.future().setSuccess(); + return; + } + this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper))); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java index 01f5407176..a65d0021b6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java @@ -1,33 +1,24 @@ package com.ctrip.xpipe.redis.console.keeper.Command; -import com.ctrip.xpipe.api.command.Command; import com.ctrip.xpipe.api.endpoint.Endpoint; -import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.command.DefaultRetryCommandFactory; -import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import java.util.concurrent.ScheduledExecutorService; public class FullSyncJudgeCommand extends AbstractKeeperCommand { - private Endpoint active; + private Endpoint activeInstance; - private Endpoint backUp; - - private long intervalTime; + private Endpoint backUpInstance; private long activeMasterReplOffset; - private long backupMasterReplOffset; - - public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) { + public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) { super(keyedObjectPool, scheduled); - this.active = active; - this.backUp = backUp; - this.intervalTime = intervalTime; + this.activeInstance = activeInstance; + this.backUpInstance = backUpInstance; + this.activeMasterReplOffset = activeMasterReplOffset; } @Override @@ -37,51 +28,13 @@ public String getName() { @Override protected void doExecute() throws Throwable { - try { - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); - Command activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active)); - Command backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp)); - addHookAndExecute(activeRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable); - } - }); - - try { - Thread.sleep(intervalTime); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - addHookAndExecute(backUpRetryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable); - } - }); - - if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { - this.future().setSuccess(); - } - } finally { - try { - keyedObjectPool.clear(active); - keyedObjectPool.clear(backUp); - } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e); - } + long backupMasterReplOffset; + backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset(); + if (backupMasterReplOffset > 0 && activeMasterReplOffset > 0 && backupMasterReplOffset > activeMasterReplOffset) { + this.future().setSuccess(); + return; } + this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance))); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java new file mode 100644 index 0000000000..4920caab18 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java @@ -0,0 +1,32 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; + +import java.util.concurrent.ScheduledExecutorService; + +public class KeeperContainerReplOffsetGetCommand extends AbstractKeeperCommand{ + + private Endpoint keeper; + + public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) { + super(keyedObjectPool, scheduled); + this.keeper = keeper; + } + + @Override + public String getName() { + return "KeeperContainerReplOffsetGetCommand"; + } + + @Override + protected void doExecute() throws Throwable { + this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset()); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java new file mode 100644 index 0000000000..176ed186b0 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java @@ -0,0 +1,37 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.command.AbstractCommand; +import com.ctrip.xpipe.redis.console.service.KeeperContainerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KeeperResetCommand extends AbstractCommand { + + private String activeKeeperIp; + + private long shardId; + + private KeeperContainerService keeperContainerService; + + public KeeperResetCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) { + this.activeKeeperIp = activeKeeperIp; + this.shardId = shardId; + this.keeperContainerService = keeperContainerService; + } + + @Override + public String getName() { + return "KeeperResetCommand"; + } + + @Override + protected void doExecute() throws Throwable { + keeperContainerService.resetKeeper(activeKeeperIp, shardId); + this.future().setSuccess(); + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java deleted file mode 100644 index a2478a9a2d..0000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java +++ /dev/null @@ -1,115 +0,0 @@ -package com.ctrip.xpipe.redis.console.keeper.Command; - -import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.command.DefaultRetryCommandFactory; -import com.ctrip.xpipe.command.RetryCommandFactory; -import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; -import com.ctrip.xpipe.redis.console.model.RedisTbl; -import com.ctrip.xpipe.redis.console.service.KeeperContainerService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; - -import java.util.List; -import java.util.concurrent.ScheduledExecutorService; - -public class SwitchMasterCommand extends AbstractKeeperCommand{ - - private String activeIp; - - private String backupIp; - - private List keepers; - - private KeeperContainerService keeperContainerService; - - public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List keepers, KeeperContainerService keeperContainerService) { - super(keyedObjectPool, scheduled); - this.activeIp = activeIp; - this.backupIp = backupIp; - this.keepers = keepers; - this.keeperContainerService = keeperContainerService; - } - - @Override - public String getName() { - return "SwitchMasterCommand"; - } - - @Override - protected void doExecute() throws Throwable { - try { - logger.info("[zyfTest][SwitchMasterCommand] start"); - if (keepers.size() != 2) { - logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers); - return; - } - int activeKeeperPort = -1; - String backUpKeeperIp = null; - for (RedisTbl keeper : keepers) { - if (keeper.getRedisIp().equals(activeIp)) { - activeKeeperPort = keeper.getRedisPort(); - } else { - backUpKeeperIp = keeper.getRedisIp(); - } - } - - if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { - logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}" - , activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp); - return; - } - - KeeperTransMeta keeperInstanceMeta = null; - logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers"); - List allKeepers = keeperContainerService.getAllKeepers(activeIp); - logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers"); - for (KeeperInstanceMeta keeper : allKeepers) { - if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { - keeperInstanceMeta = keeper; - break; - } - } - - if (keeperInstanceMeta == null) { - logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); - return; - } - logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers"); - keeperContainerService.resetKeepers(keeperInstanceMeta); - logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers"); - RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); - Command retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort))); - logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand"); - int finalActiveKeeperPort = activeKeeperPort; - addHookAndExecute(retryInfoCommand, new Callbackable() { - @Override - public void success(String message) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success"); - if (!new InfoResultExtractor(message).getKeeperActive()) { - future().setSuccess(); - } - } - - @Override - public void fail(Throwable throwable) { - logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail"); - logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable); - } - }); - if (retryInfoCommand.future().isSuccess()) { - future().setSuccess(); - logger.info("[zyfTest][SwitchMasterCommand] over success"); - } - } catch (Exception e) { - logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); - } - } - - @Override - protected void doReset() { - - } -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java deleted file mode 100644 index 212b2a8391..0000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.ctrip.xpipe.redis.console.keeper.impl; - -public class DefaultKeeperContainerAvailablePool { -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java index 03a398ba6a..e7c494d74d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/MigrationKeeperContainerDetailModel.java @@ -168,16 +168,7 @@ public int hashCode() { @Override public String toString() { - return "MigrationKeeperContainerDetailModel{" + - "srcKeeperContainer=" + srcKeeperContainer + - ", targetKeeperContainer=" + targetKeeperContainer + - ", migrateKeeperCount=" + migrateKeeperCount + - ", migrateKeeperCompleteCount=" + migrateKeeperCompleteCount + - ", switchActive=" + switchActive + - ", keeperPairOverload=" + keeperPairOverload + - ", cause='" + cause + '\'' + - ", migrateShards=" + migrateShards + - ", updateTime=" + updateTime + - '}'; + String type = switchActive ? "switchActiveKeeper" : (keeperPairOverload ? "migrateBackupKeeper" : "migrateActiveKeeper"); + return String.format("[%s]%s->%s:%s", type, srcKeeperContainer.getKeeperIp(), targetKeeperContainer.getKeeperIp(), migrateShards); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java index 4fd2f8353f..9bc325b515 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerMigrationService.java @@ -5,7 +5,9 @@ import java.util.List; public interface KeeperContainerMigrationService { - void beginMigrateKeeperContainers(List keeperContainerDetailModels); + boolean beginMigrateKeeperContainers(List keeperContainerDetailModels) throws Throwable; List getMigrationProcess(); + + boolean stopMigrate(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java index 3c84923aa5..159e49af3b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperContainerService.java @@ -4,7 +4,6 @@ import com.ctrip.xpipe.redis.console.model.KeeperContainerInfoModel; import com.ctrip.xpipe.redis.console.model.KeepercontainerTbl; import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import java.util.List; import java.util.Map; @@ -42,7 +41,7 @@ public interface KeeperContainerService { List getAllKeepers(String keeperContainerIp); - void resetKeepers(KeeperTransMeta keeperInstanceMeta); + void resetKeeper(String activeKeeperIp, Long replId); Map keeperContainerIdDcMap(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java index 1f116dde43..6fc5081f88 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.console.service.impl; +import com.ctrip.xpipe.exception.XpipeRuntimeException; import com.ctrip.xpipe.monitor.CatEventMonitor; import com.ctrip.xpipe.redis.checker.model.DcClusterShard; import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel; @@ -12,9 +13,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.*; @@ -32,67 +31,72 @@ public class DefaultKeeperContainerMigrationService implements KeeperContainerMi private volatile AtomicBoolean isBegin = new AtomicBoolean(false); @Override - public void beginMigrateKeeperContainers(List keeperContainerDetailModels) { + public boolean beginMigrateKeeperContainers(List keeperContainerDetailModels) throws Throwable { if (!isBegin.compareAndSet(false, true)) { - logger.info("[beginMigrateKeeperContainers] has already begin!!"); - return; + throw new Throwable("Migration tasks have already begin!!"); } - logger.debug("[beginMigrateKeeperContainers] begin migrate keeper containers {}", keeperContainerDetailModels); - readyToMigrationKeeperContainers = keeperContainerDetailModels; - Set alreadyMigrateShards = new HashSet<>(); - for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { - List migrateShards = keeperContainer.getMigrateShards(); - if (CollectionUtils.isEmpty(migrateShards)) continue; + try { + readyToMigrationKeeperContainers = keeperContainerDetailModels; + for (MigrationKeeperContainerDetailModel keeperContainer : readyToMigrationKeeperContainers) { + if(!isBegin.get()) break; + List migrateShards = keeperContainer.getMigrateShards(); + if (CollectionUtils.isEmpty(migrateShards)) continue; - String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); - String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); - for (DcClusterShard migrateShard : migrateShards) { - ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), - migrateShard.getClusterId(), migrateShard.getShardId(), false, null); - if (!alreadyMigrateShards.add(migrateShard)) { - logger.info("[beginMigrateKeeperContainers] shard {} has already migrated, should not migrate in the same time", migrateShard); - continue; - } - logger.debug("[beginMigrateKeeperContainers] begin migrate shard {} from srcKeeperContainer:{} to targetKeeperContainer:{}", - migrateShard, srcKeeperContainerIp, targetKeeperContainerIp); - String event; - if (keeperContainer.isSwitchActive()) { - logger.info("[zyfTest] start switchMaster"); - if (shardModelService.switchMaster(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { - logger.info("[zyfTest] switchMaster success"); - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_SWITCH_MASTER_SUCCESS; - } else { - logger.info("[zyfTest] switchMaster fail"); - event = KEEPER_SWITCH_MASTER_FAIL; - } - }else if (keeperContainer.isKeeperPairOverload()) { - if (shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_BACKUP_SUCCESS; - } else { - event = KEEPER_MIGRATION_BACKUP_FAIL; - } - }else { - if (shardModelService.migrateAutoBalanceKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, - srcKeeperContainerIp, targetKeeperContainerIp)) { - keeperContainer.migrateKeeperCompleteCountIncrease(); - event = KEEPER_MIGRATION_ACTIVE_START_SUCCESS; - } else { - event = KEEPER_MIGRATION_ACTIVE_START_FAIL; + String srcKeeperContainerIp = keeperContainer.getSrcKeeperContainer().getKeeperIp(); + String targetKeeperContainerIp = keeperContainer.getTargetKeeperContainer().getKeeperIp(); + for (DcClusterShard migrateShard : migrateShards) { + if(!isBegin.get()) break; + ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(), + migrateShard.getClusterId(), migrateShard.getShardId(), false, null); + logger.info("[migrateKeeperContainers][begin][{}-{}-{}][{}->{}]", + migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, targetKeeperContainerIp); + String event; + if (keeperContainer.isSwitchActive()) { + if (shardModelService.switchActiveKeeper(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_SWITCH_MASTER_SUCCESS; + } else { + event = KEEPER_SWITCH_MASTER_FAIL; + } + }else if (keeperContainer.isKeeperPairOverload()) { + if (shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_BACKUP_SUCCESS; + } else { + event = KEEPER_MIGRATION_BACKUP_FAIL; + } + }else { + try { + if (shardModelService.migrateActiveKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel, + srcKeeperContainerIp, targetKeeperContainerIp)) { + keeperContainer.migrateKeeperCompleteCountIncrease(); + event = KEEPER_MIGRATION_ACTIVE_SUCCESS; + } else { + event = KEEPER_MIGRATION_ACTIVE_FAIL; + } + } catch (Throwable th) { + event = KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR; + } } + CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION, event); + logger.info("[migrateKeeperContainers][{}-{}-{}][{}->{}] {}", + migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, targetKeeperContainerIp, event); } - CatEventMonitor.DEFAULT.logEvent(event, String.format("dc:%s, cluster:%s, shard:%s, src:%s, target:%s", - migrateShard.getDcId(), migrateShard.getClusterId(), migrateShard.getShardId(), srcKeeperContainerIp, - targetKeeperContainerIp)); } + return true; + } finally { + isBegin.set(false); } - isBegin.set(false); } @Override public List getMigrationProcess() { return readyToMigrationKeeperContainers; } + + @Override + public boolean stopMigrate() { + return isBegin.compareAndSet(true, false); + } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java index 2a2a83f87f..b842708fa2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImpl.java @@ -455,12 +455,14 @@ public List getAllKeepers(String keeperContainerIp) { } @Override - public void resetKeepers(KeeperTransMeta keeperInstanceMeta) { + public void resetKeeper(String activeKeeperIp, Long replId) { + KeeperTransMeta keeperInstanceMeta = new KeeperTransMeta(); + keeperInstanceMeta.setReplId(replId); getOrCreateRestTemplate(); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); - restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", keeperInstanceMeta.getKeeperMeta().getIp()), + restTemplate.exchange(String.format("http://%s:8080/keepers/election/reset", activeKeeperIp), HttpMethod.POST, requestEntity, Void.class); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java index 2becee4d9b..dfb4033791 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/ShardModelService.java @@ -18,11 +18,11 @@ ShardModel getShardModel(String dcName, String clusterName, String shardName, ShardModel getSourceShardModel(String clusterName, String srcDcName, String toDcName, String shardName); - boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp); - boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel); + boolean switchActiveKeeper(String srcIp, String targetIp, ShardModel shardModel); - boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, - String srcKeeperContainerIp, String targetKeeperContainerIp); + boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, + String srcKeeperContainerIp, String targetKeeperContainerIp) throws Throwable; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 18de8817ae..b7056ef115 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -1,35 +1,23 @@ package com.ctrip.xpipe.redis.console.service.model.impl; -import com.ctrip.framework.xpipe.redis.ProxyRegistry; import com.ctrip.xpipe.api.command.Command; -import com.ctrip.xpipe.api.command.CommandFuture; -import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.pool.ObjectPoolException; -import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.command.DefaultRetryCommandFactory; import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.command.SequenceCommandChain; import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.exception.XpipeRuntimeException; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; -import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; import com.ctrip.xpipe.redis.console.constant.XPipeConsoleConstant; import com.ctrip.xpipe.redis.console.exception.DataNotFoundException; import com.ctrip.xpipe.redis.console.exception.ServerException; -import com.ctrip.xpipe.redis.console.keeper.Command.FullSyncJudgeCommand; -import com.ctrip.xpipe.redis.console.keeper.Command.SwitchMasterCommand; +import com.ctrip.xpipe.redis.console.keeper.Command.*; import com.ctrip.xpipe.redis.console.model.*; import com.ctrip.xpipe.redis.console.repository.AzGroupClusterRepository; import com.ctrip.xpipe.redis.console.service.*; import com.ctrip.xpipe.redis.console.service.model.ShardModelService; -import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; -import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; import com.ctrip.xpipe.utils.ObjectUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import com.ctrip.xpipe.utils.XpipeThreadFactory; @@ -40,11 +28,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.*; import static com.ctrip.xpipe.redis.checker.resource.Resource.*; +import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR; @Service public class ShardModelServiceImpl implements ShardModelService{ @@ -83,9 +73,15 @@ public class ShardModelServiceImpl implements ShardModelService{ @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; - private RetryCommandFactory switchMasterCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 1000); + private RetryCommandFactory retryCommandFactory; - private RetryCommandFactory fullSyncCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); + private RetryCommandFactory retryLongCommandFactory; + + @PostConstruct + public void init() { + retryCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 500); + retryLongCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); + } @Override public List getAllShardModel(String dcName, String clusterName) { @@ -255,7 +251,7 @@ private void addRedisesAndKeepersToNormalShard(ShardModel shardModel, long dcClu } @Override - public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardModel, + public boolean migrateBackupKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); @@ -263,75 +259,145 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel } @Override - public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { - Command switchMasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeIp, backupIp, shardModel.getKeepers(), keeperContainerService)); + public boolean switchActiveKeeper(String activeIp, String backupIp, ShardModel shardModel) { + List keepers = shardModel.getKeepers(); + if (keepers.size() != 2) { + logger.warn("[switchMaster][keeperSizeMissMatch][{}:{}->{}] {}", + shardModel.getShardTbl().getShardName(), activeIp, backupIp, keepers.size()); + return false; + } + Endpoint activeKeeper = null, backUpKeeper = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(keeper.getRedisIp(), keeper.getRedisPort()); + } + } + + if (activeKeeper == null || backUpKeeper == null || !backupIp.equals(backUpKeeper.getHost())) { + logger.warn("[switchMaster][keeperActiveMissMatch][{}:{}->{}]keepers1:{}:{},keepers2:{}:{}" + , shardModel.getShardTbl().getShardName(), activeIp, backupIp, + keepers.get(0).getRedisIp(), keepers.get(0).getRedisPort(), keepers.get(1).getRedisIp(), keepers.get(1).getRedisPort()); + return false; + } + Command switchMasterCommand = retryCommandFactory.createRetryCommand(new KeeperResetCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperActiveCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); + SequenceCommandChain chain = new SequenceCommandChain(false, false); + chain.add(switchMasterCommand); + chain.add(checkKeeperRoleCommand); try { - logger.info("[zyfTest] start switchMasterCommand execute"); - switchMasterCommand.execute().get(); - logger.info("[zyfTest] start switchMasterCommand execute over"); - logger.info("[zyfTest] start switchMasterCommand execute success?:{}",switchMasterCommand.future().isSuccess()); - return switchMasterCommand.future().isSuccess(); + chain.execute().get(); + return chain.future().isSuccess(); } catch (Exception e) { - logger.error("[switchMaster] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); + logger.error("[switchMaster][commandChainError][{}:{}->{}]", shardModel.getShardTbl().getShardName(), activeKeeper, backUpKeeper, e); return false; + } finally { + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[switchMaster][keyedObjectPoolClearError][{}:{}->{}]", shardModel.getShardTbl().getShardName(), activeKeeper, backUpKeeper, e); + } } } @Override - public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { + public boolean migrateActiveKeeper(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) throws Throwable { List oldKeepers = shardModel.getKeepers(); List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp); if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) { - throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); + logger.error("[migrateActiveKeeper][doMigrateKeepersFailed][{}:{}:{}]", dcName, clusterName, shardModel); + return false; } - RedisTbl active = newKeepers.get(0); - RedisTbl backup = newKeepers.get(1); - DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort()); - DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort()); - Command fullSyncJudgeRetryCommand = fullSyncCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKey, backupKey, 1000)); - Command switchmasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeKey.getHost(), backupKey.getHost(), newKeepers, keeperContainerService)); - SequenceCommandChain chain = new SequenceCommandChain(false, false); - chain.add(fullSyncJudgeRetryCommand); - chain.add(switchmasterCommand); + Endpoint activeKeeper, backUpKeeper; + if (newKeepers.get(0).getRedisIp().equals(targetKeeperContainerIp)) { + activeKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + backUpKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } else { + backUpKeeper = new DefaultEndPoint(newKeepers.get(1).getRedisIp(), newKeepers.get(1).getRedisPort()); + activeKeeper = new DefaultEndPoint(newKeepers.get(0).getRedisIp(), newKeepers.get(0).getRedisPort()); + } + SequenceCommandChain chain = null; try { + Command pingNewKeeperCommand = retryLongCommandFactory.createRetryCommand(new CheckKeeperConnectedCommand<>(keyedObjectPool, scheduled, backUpKeeper)); + pingNewKeeperCommand.execute().get(); + if (!pingNewKeeperCommand.future().isSuccess()) { + logger.error("[migrateActiveKeeper][pingNewKeeperCommandFailed][{}:{}:{}]keeper:{}", dcName, clusterName, shardModel, backUpKeeper); + return false; + } + Command replOffsetGetCommand = retryCommandFactory.createRetryCommand(new KeeperContainerReplOffsetGetCommand<>(keyedObjectPool, scheduled, activeKeeper)); + long activeMasterReplOffset = (long)replOffsetGetCommand.execute().get(); + if (!replOffsetGetCommand.future().isSuccess()) { + logger.error("[migrateActiveKeeper][replOffsetGetCommandFailed][{}:{}:{}]keeper:{}", dcName, clusterName, shardModel, activeKeeper); + return false; + } + Command fullSyncJudgeRetryCommand = retryCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKeeper, backUpKeeper, activeMasterReplOffset)); + Command switchmasterCommand = retryCommandFactory.createRetryCommand(new KeeperResetCommand<>(activeKeeper.getHost(), shardModel.getShardTbl().getId(), keeperContainerService)); + Command checkKeeperRoleCommand = retryCommandFactory.createRetryCommand(new CheckKeeperActiveCommand<>(keyedObjectPool, scheduled, backUpKeeper, true)); + chain = new SequenceCommandChain(false, false); + chain.add(fullSyncJudgeRetryCommand); + chain.add(switchmasterCommand); + chain.add(checkKeeperRoleCommand); chain.execute().get(); - return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); - } catch (InterruptedException | ExecutionException e) { - logger.error("[fullSyncJudge] execute fullSyncJudgeRetryCommand fail", e); - return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); + return getMigrateActiveKeeperResult(chain, dcName, clusterName, shardModel, oldKeepers); + } catch (Exception e) { + logger.error("[migrateActiveKeeper][doCommandException][{}:{}:{}]", dcName, clusterName, shardModel, e); + return getMigrateActiveKeeperResult(chain, dcName, clusterName, shardModel, oldKeepers); + } finally { + try { + keyedObjectPool.clear(activeKeeper); + keyedObjectPool.clear(backUpKeeper); + } catch (ObjectPoolException e) { + logger.error("[migrateActiveKeeper][keyedObjectPoolClearError][{}, {}]", activeKeeper, backUpKeeper, e); + } } + } - private boolean getAutoBalanceResult(boolean taskSuccess, String dcName, String clusterName, ShardModel shardModel, List oldKeepers) { - if (taskSuccess) { - return true; + private boolean getMigrateActiveKeeperResult(SequenceCommandChain chain, String dcName, String clusterName, ShardModel shardModel, List oldKeepers) throws Throwable{ + if (chain == null || !chain.future().isSuccess()) { + logger.info("[migrateActiveKeeper][doMigrateActiveKeeperRollback][{}:{}:{}]chain:{}, chain success:{}", dcName, clusterName, shardModel, chain, chain != null && chain.future().isSuccess()); + if (!doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers)) { + throw new Throwable(KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR); + } + return false; } - doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers); - return false; + return true; } + private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel shardModel, List newKeepers) { if (newKeepers == null) { - logger.debug("[migrateKeepers] no need to replace keepers"); + logger.error("[doMigrateKeepers][keeperIsNull][{}:{}:{}]", dcName, clusterName, shardModel.getShardTbl().getShardName()); return false; }else if (newKeepers.size() == 2) { try { shardModel.setKeepers(newKeepers); - logger.info("[Update Redises][construct]{},{},{},{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModel); + logger.info("[doMigrateKeepers][construct][{},{},{}]{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModelToString(shardModel)); redisService.updateRedises(dcName, clusterName, shardModel.getShardTbl().getShardName(), shardModel); - logger.info("[Update Redises][success]{},{},{},{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModel); + logger.info("[doMigrateKeepers][success][{},{},{}]{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModelToString(shardModel)); return true; } catch (Exception e) { - logger.error("[Update Redises][failed]{},{},{},{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModel, e); + logger.error("[doMigrateKeepers][failed][{},{},{}]{}", clusterName, dcName, shardModel.getShardTbl().getShardName(), shardModelToString(shardModel), e); return false; } } else { - logger.info("[migrateKeepers] fail to migrate keepers with unexpected newKeepers {}", newKeepers); + logger.error("[doMigrateKeepers][keeperSizeMissMatch][{}:{}:{}]keeper size:{}", dcName, clusterName, shardModel.getShardTbl().getShardName(), newKeepers.size()); return false; } } + private String shardModelToString(ShardModel shardModel) { + StringBuilder builder = new StringBuilder(); + builder.append("redis:"); + shardModel.getRedises().forEach(redis -> builder.append(redis.getRedisIp()).append(":").append(redis.getRedisPort()).append(", ")); + builder.append("keepers:"); + shardModel.getKeepers().forEach(redis -> builder.append(redis.getRedisIp()).append(":").append(redis.getRedisPort()).append(", ")); + return builder.toString(); + } + @VisibleForTesting public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { this.keyedObjectPool = pool; diff --git a/redis/redis-console/src/main/resources/static/dist/bundle.js b/redis/redis-console/src/main/resources/static/dist/bundle.js index 8fc58a451d..6069ad5f20 100644 --- a/redis/redis-console/src/main/resources/static/dist/bundle.js +++ b/redis/redis-console/src/main/resources/static/dist/bundle.js @@ -9183,7 +9183,7 @@ eval("angular\n .module('index')\n .controller('KeepercontainerOverallCtl' \***********************************************************/ /***/ (() => { -eval("angular\n .module('index')\n .controller('KeepercontainerOverloadCtl', KeepercontainerOverloadCtl);\nKeepercontainerOverloadCtl.$inject = ['$rootScope', '$scope', '$window', '$stateParams', 'KeeperContainerService',\n 'toastr', 'NgTableParams', '$interval'];\nfunction KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, KeeperContainerService, toastr, NgTableParams, $interval) {\n $scope.overloadKeeperContainer = [];\n $scope.tableParams = new NgTableParams({}, {});\n $scope.migratingTableParams = new NgTableParams({}, {});\n $scope.selectAll = false;\n $scope.toggleAll = toggleAll;\n $scope.isChecked = isChecked;\n var OPERATE_TYPE = {\n DETAIL: 'detail',\n MIGRATING: 'migrating',\n STOPPED: 'stopped'\n };\n $scope.operateType = $stateParams.type;\n $scope.migratingKeeperContainers = [];\n $scope.scheduledWork;\n $scope.beginToMigrateOverloadKeeperContainers = beginToMigrateOverloadKeeperContainers;\n KeeperContainerService.getAllOverloadKeepercontainer()\n .then(function (result) {\n if (Array.isArray(result))\n $scope.overloadKeeperContainer = result;\n $scope.overloadKeeperContainer.forEach(function (container) {\n switch (container.cause) {\n case 'BOTH':\n container.cause = '数据量和流量超载';\n break;\n case 'PEER_DATA_OVERLOAD':\n container.cause = '数据量超载';\n break;\n case 'INPUT_FLOW_OVERLOAD':\n container.cause = '流量超载';\n break;\n case 'RESOURCE_LACK':\n container.cause = '资源不足';\n break;\n case 'PAIR_RESOURCE_LACK':\n container.cause = '资源不足(keeper对)';\n break;\n case 'KEEPER_PAIR_BOTH':\n case 'KEEPER_PAIR_PEER_DATA_OVERLOAD':\n case 'KEEPER_PAIR_INPUT_FLOW_OVERLOAD':\n container.cause = 'keeper对超载';\n break;\n }\n if (container.cause == '资源不足' || container.cause == '资源不足(keeper对)') {\n container.result = '';\n }\n else if (!container.switchActive && !container.keeperPairOverload) {\n container.result = '迁移主keeper';\n }\n else if (container.switchActive && !container.keeperPairOverload) {\n container.result = '主备切换';\n }\n else if (!container.switchActive && container.keeperPairOverload) {\n container.result = '迁移备keeper';\n }\n if (container.updateTime != null) {\n container.time = container.updateTime.substring(0, 19).replace(\"T\", \" \");\n }\n });\n $scope.tableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.overloadKeeperContainer\n });\n });\n function beginToMigrateOverloadKeeperContainers() {\n $scope.migratingKeeperContainers = $scope.overloadKeeperContainer.filter(function (keeperContainer) {\n return keeperContainer.selected;\n });\n $scope.tableParams = new NgTableParams({}, {});\n $scope.migratingTableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.migratingKeeperContainers\n });\n $scope.operateType = OPERATE_TYPE.MIGRATING;\n KeeperContainerService.beginToMigrateOverloadKeeperContainers.apply(KeeperContainerService, $scope.migratingKeeperContainers)\n .then(function (result) {\n if (result.message == 'success') {\n toastr.success(\"迁移成功\");\n }\n else {\n toastr.error(result.message, \"迁移失败\");\n }\n getOverloadKeeperContainerMigrationProcess();\n $interval.cancel($scope.scheduledWork);\n });\n }\n function getOverloadKeeperContainerMigrationProcess() {\n if ($scope.operateType == OPERATE_TYPE.MIGRATING) {\n KeeperContainerService.getOverloadKeeperContainerMigrationProcess()\n .then(function (result) {\n if (result == null)\n return;\n $scope.migratingKeeperContainers = result;\n $scope.migratingTableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.migratingKeeperContainers\n });\n });\n }\n }\n $scope.scheduledWork = $interval(getOverloadKeeperContainerMigrationProcess, 1000);\n function toggleAll() {\n $scope.selectAll = !$scope.selectAll;\n $scope.overloadKeeperContainer.forEach(function (keeperContainer) {\n keeperContainer.selected = !keeperContainer.selected;\n });\n }\n function isChecked() {\n return $scope.selectAll;\n }\n}\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/controllers/KeepercontainerOverloadCtl.ts?"); +eval("angular\n .module('index')\n .controller('KeepercontainerOverloadCtl', KeepercontainerOverloadCtl);\nKeepercontainerOverloadCtl.$inject = ['$rootScope', '$scope', '$window', '$stateParams', 'KeeperContainerService',\n 'toastr', 'NgTableParams', 'AppUtil', '$interval'];\nfunction KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, KeeperContainerService, toastr, NgTableParams, AppUtil, $interval) {\n $scope.overloadKeeperContainer = [];\n $scope.tableParams = new NgTableParams({}, {});\n $scope.migratingTableParams = new NgTableParams({}, {});\n $scope.selectAll = false;\n $scope.toggleAll = toggleAll;\n $scope.isChecked = isChecked;\n var OPERATE_TYPE = {\n DETAIL: 'detail',\n MIGRATING: 'migrating',\n STOPPED: 'stopped'\n };\n $scope.operateType = $stateParams.type;\n $scope.migratingKeeperContainers = [];\n $scope.scheduledWork;\n $scope.beginToMigrateOverloadKeeperContainers = beginToMigrateOverloadKeeperContainers;\n $scope.migrateKeeperTaskTerminate = migrateKeeperTaskTerminate;\n getAllOverloadKeepercontainer();\n function getAllOverloadKeepercontainer() {\n KeeperContainerService.getAllOverloadKeepercontainer()\n .then(function (result) {\n if (Array.isArray(result))\n $scope.overloadKeeperContainer = result;\n $scope.overloadKeeperContainer.forEach(function (container) {\n switch (container.cause) {\n case 'BOTH':\n container.cause = '数据量和流量超载';\n break;\n case 'PEER_DATA_OVERLOAD':\n container.cause = '数据量超载';\n break;\n case 'INPUT_FLOW_OVERLOAD':\n container.cause = '流量超载';\n break;\n case 'RESOURCE_LACK':\n container.cause = '资源不足';\n break;\n case 'PAIR_RESOURCE_LACK':\n container.cause = '资源不足(keeper对)';\n break;\n case 'KEEPER_PAIR_BOTH':\n case 'KEEPER_PAIR_PEER_DATA_OVERLOAD':\n case 'KEEPER_PAIR_INPUT_FLOW_OVERLOAD':\n container.cause = 'keeper对超载';\n break;\n }\n if (container.cause == '资源不足' || container.cause == '资源不足(keeper对)') {\n container.result = '';\n }\n else if (!container.switchActive && !container.keeperPairOverload) {\n container.result = '迁移主keeper';\n }\n else if (container.switchActive && !container.keeperPairOverload) {\n container.result = '主备切换';\n }\n else if (!container.switchActive && container.keeperPairOverload) {\n container.result = '迁移备keeper';\n }\n if (container.updateTime != null) {\n container.time = container.updateTime.substring(0, 19).replace(\"T\", \" \");\n }\n container.showDetail = false;\n });\n $scope.tableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.overloadKeeperContainer\n });\n });\n }\n function beginToMigrateOverloadKeeperContainers() {\n $scope.migratingKeeperContainers = $scope.overloadKeeperContainer.filter(function (keeperContainer) {\n return keeperContainer.selected;\n });\n $scope.tableParams = new NgTableParams({}, {});\n $scope.migratingTableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.migratingKeeperContainers\n });\n $scope.operateType = OPERATE_TYPE.MIGRATING;\n KeeperContainerService.beginToMigrateOverloadKeeperContainers.apply(KeeperContainerService, $scope.migratingKeeperContainers)\n .then(function (result) {\n if (result.message == 'success') {\n toastr.success(\"迁移成功\");\n $scope.operateType = OPERATE_TYPE.DETAIL;\n getAllOverloadKeepercontainer();\n }\n else {\n toastr.error(result.message, \"迁移失败\");\n }\n getOverloadKeeperContainerMigrationProcess();\n $interval.cancel($scope.scheduledWork);\n });\n }\n function migrateKeeperTaskTerminate() {\n KeeperContainerService.migrateKeeperTaskTerminate.apply(KeeperContainerService)\n .then(function (result) {\n if (result.state == 0) {\n toastr.success(result.message);\n }\n else {\n toastr.error(result.message);\n }\n getOverloadKeeperContainerMigrationProcess();\n $interval.cancel($scope.scheduledWork);\n });\n $scope.operateType = OPERATE_TYPE.DETAIL;\n }\n function getOverloadKeeperContainerMigrationProcess() {\n if ($scope.operateType == OPERATE_TYPE.MIGRATING) {\n KeeperContainerService.getOverloadKeeperContainerMigrationProcess()\n .then(function (result) {\n if (result == null)\n return;\n $scope.migratingKeeperContainers = result;\n $scope.migratingTableParams = new NgTableParams({\n page: 1,\n count: 10,\n }, {\n filterDelay: 100,\n counts: [10, 25, 50],\n dataset: $scope.migratingKeeperContainers\n });\n });\n }\n }\n $scope.scheduledWork = $interval(getOverloadKeeperContainerMigrationProcess, 1000);\n function toggleAll() {\n $scope.selectAll = !$scope.selectAll;\n $scope.overloadKeeperContainer.forEach(function (keeperContainer) {\n keeperContainer.selected = !keeperContainer.selected;\n });\n }\n function isChecked() {\n return $scope.selectAll;\n }\n}\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/controllers/KeepercontainerOverloadCtl.ts?"); /***/ }), @@ -9433,7 +9433,7 @@ eval("angular\n .module('services')\n .service('HealthCheckService', Healt \****************************************************/ /***/ (() => { -eval("angular\n .module('services')\n .service('KeeperContainerService', ['$resource', '$q', function ($resource, $q) {\n var resource = $resource('', {}, {\n find_availablekeepers_by_dc: {\n method: 'POST',\n url: '/console/dcs/:dcName/availablekeepers',\n isArray: true\n },\n find_active_kcs_by_dc_and_cluster: {\n method: 'GET',\n url: '/console/dcs/:dcName/cluster/:clusterName/activekeepercontainers',\n isArray: true\n },\n find_keepercontainer_by_id: {\n method: 'GET',\n url: '/console/keepercontainer/:id',\n },\n find_available_keepers_by_dc_az_and_org: {\n method: 'GET',\n url: '/console/keepercontainers/dc/:dcName/az/:azName/org/:orgName',\n isArray: true\n },\n get_all_infos: {\n method: 'GET',\n url: '/console/keepercontainer/infos/all',\n isArray: true\n },\n get_all_diskTypes: {\n method: 'GET',\n url: '/console/keepercontainer/diskType',\n isArray: true\n },\n get_all_organizations: {\n method: 'GET',\n url: '/console/organizations',\n isArray: true\n },\n add_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer'\n },\n update_keepercontainer: {\n method: 'PUT',\n url: '/console/keepercontainer'\n },\n keepercontainer_fullSynchronizationTime: {\n method: 'GET',\n url: '/console/keepercontainer/max/fullSynchronizationTime'\n },\n get_all_overload_keepercontainer: {\n method: 'GET',\n url: '/console/keepercontainers/overload/all',\n isArray: true\n },\n get_all_overload_lasted_used_info: {\n method: 'GET',\n url: '/console/keepercontainer/overload/info/lasted',\n isArray: true\n },\n get_overload_keepercontainer_migration_process: {\n method: 'GET',\n url: '/console/keepercontainer/overload/migration/process',\n isArray: true\n },\n begin_to_migrate_overload_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer/overload/migration/begin'\n },\n stop_to_migrate_overload_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer/overload/migration/stop'\n }\n });\n function findActiveKeeperContainersByDc(dcName) {\n var d = $q.defer();\n resource.find_activekeepercontainers_by_dc({\n dcName: dcName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDc(dcName, shard) {\n var d = $q.defer();\n resource.find_availablekeepers_by_dc({\n dcName: dcName\n }, shard, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findKeepercontainerById(id) {\n var d = $q.defer();\n resource.find_keepercontainer_by_id({\n id: id\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDcAzAndOrg(dcName, azName, orgName) {\n var d = $q.defer();\n resource.find_available_keepers_by_dc_az_and_org({\n dcName: dcName,\n azName: azName,\n orgName: orgName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDcAndCluster(dcName, clusterName) {\n var d = $q.defer();\n resource.find_active_kcs_by_dc_and_cluster({\n dcName: dcName,\n clusterName: clusterName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllInfos() {\n var d = $q.defer();\n resource.get_all_infos({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllOrganizations() {\n var d = $q.defer();\n resource.get_all_organizations({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllDiskTypes() {\n var d = $q.defer();\n resource.get_all_diskTypes({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function addKeepercontainer(addr, dcName, orgName, azName, active, diskType) {\n var d = $q.defer();\n resource.add_keepercontainer({}, {\n addr: addr,\n dcName: dcName,\n orgName: orgName,\n azName: azName,\n active: active,\n diskType: diskType\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getKeepercontainerFullSynchronizationTime() {\n var d = $q.defer();\n resource.keepercontainer_fullSynchronizationTime({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function updateKeepercontainer(addr, dcName, orgName, azName, active, diskType) {\n var d = $q.defer();\n resource.update_keepercontainer({}, {\n addr: addr,\n dcName: dcName,\n orgName: orgName,\n azName: azName,\n active: active,\n diskType: diskType\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllOverloadKeepercontainer() {\n var d = $q.defer();\n resource.get_all_overload_keepercontainer({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllKeepercontainerUsedInfo() {\n var d = $q.defer();\n resource.get_all_overload_lasted_used_info({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getOverloadKeeperContainerMigrationProcess() {\n var d = $q.defer();\n resource.get_overload_keepercontainer_migration_process({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function beginToMigrateOverloadKeeperContainers() {\n var d = $q.defer();\n resource.begin_to_migrate_overload_keepercontainer(Array.from(arguments), function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n return {\n findAvailableKeepersByDc: findAvailableKeepersByDc,\n findAvailableKeepersByDcAndCluster: findAvailableKeepersByDcAndCluster,\n findKeepercontainerById: findKeepercontainerById,\n findAvailableKeepersByDcAzAndOrg: findAvailableKeepersByDcAzAndOrg,\n getAllInfos: getAllInfos,\n getAllOrganizations: getAllOrganizations,\n getAllDiskTypes: getAllDiskTypes,\n addKeepercontainer: addKeepercontainer,\n updateKeepercontainer: updateKeepercontainer,\n getAllOverloadKeepercontainer: getAllOverloadKeepercontainer,\n getAllKeepercontainerUsedInfo: getAllKeepercontainerUsedInfo,\n getOverloadKeeperContainerMigrationProcess: getOverloadKeeperContainerMigrationProcess,\n beginToMigrateOverloadKeeperContainers: beginToMigrateOverloadKeeperContainers,\n getKeepercontainerFullSynchronizationTime: getKeepercontainerFullSynchronizationTime\n };\n }]);\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/services/KeeperContainerService.ts?"); +eval("angular\n .module('services')\n .service('KeeperContainerService', ['$resource', '$q', function ($resource, $q) {\n var resource = $resource('', {}, {\n find_availablekeepers_by_dc: {\n method: 'POST',\n url: '/console/dcs/:dcName/availablekeepers',\n isArray: true\n },\n find_active_kcs_by_dc_and_cluster: {\n method: 'GET',\n url: '/console/dcs/:dcName/cluster/:clusterName/activekeepercontainers',\n isArray: true\n },\n find_keepercontainer_by_id: {\n method: 'GET',\n url: '/console/keepercontainer/:id',\n },\n find_available_keepers_by_dc_az_and_org: {\n method: 'GET',\n url: '/console/keepercontainers/dc/:dcName/az/:azName/org/:orgName',\n isArray: true\n },\n get_all_infos: {\n method: 'GET',\n url: '/console/keepercontainer/infos/all',\n isArray: true\n },\n get_all_diskTypes: {\n method: 'GET',\n url: '/console/keepercontainer/diskType',\n isArray: true\n },\n get_all_organizations: {\n method: 'GET',\n url: '/console/organizations',\n isArray: true\n },\n add_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer'\n },\n update_keepercontainer: {\n method: 'PUT',\n url: '/console/keepercontainer'\n },\n keepercontainer_fullSynchronizationTime: {\n method: 'GET',\n url: '/console/keepercontainer/max/fullSynchronizationTime'\n },\n get_all_overload_keepercontainer: {\n method: 'GET',\n url: '/console/keepercontainers/overload/all',\n isArray: true\n },\n get_all_overload_lasted_used_info: {\n method: 'GET',\n url: '/console/keepercontainer/overload/info/lasted',\n isArray: true\n },\n get_overload_keepercontainer_migration_process: {\n method: 'GET',\n url: '/console/keepercontainer/overload/migration/process',\n isArray: true\n },\n begin_to_migrate_overload_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer/overload/migration/begin'\n },\n migrate_keeper_task_terminate: {\n method: 'POST',\n url: '/console/keepercontainer/overload/migration/terminate'\n },\n stop_to_migrate_overload_keepercontainer: {\n method: 'POST',\n url: '/console/keepercontainer/overload/migration/stop'\n }\n });\n function findActiveKeeperContainersByDc(dcName) {\n var d = $q.defer();\n resource.find_activekeepercontainers_by_dc({\n dcName: dcName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDc(dcName, shard) {\n var d = $q.defer();\n resource.find_availablekeepers_by_dc({\n dcName: dcName\n }, shard, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findKeepercontainerById(id) {\n var d = $q.defer();\n resource.find_keepercontainer_by_id({\n id: id\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDcAzAndOrg(dcName, azName, orgName) {\n var d = $q.defer();\n resource.find_available_keepers_by_dc_az_and_org({\n dcName: dcName,\n azName: azName,\n orgName: orgName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function findAvailableKeepersByDcAndCluster(dcName, clusterName) {\n var d = $q.defer();\n resource.find_active_kcs_by_dc_and_cluster({\n dcName: dcName,\n clusterName: clusterName\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllInfos() {\n var d = $q.defer();\n resource.get_all_infos({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllOrganizations() {\n var d = $q.defer();\n resource.get_all_organizations({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllDiskTypes() {\n var d = $q.defer();\n resource.get_all_diskTypes({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function addKeepercontainer(addr, dcName, orgName, azName, active, diskType) {\n var d = $q.defer();\n resource.add_keepercontainer({}, {\n addr: addr,\n dcName: dcName,\n orgName: orgName,\n azName: azName,\n active: active,\n diskType: diskType\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getKeepercontainerFullSynchronizationTime() {\n var d = $q.defer();\n resource.keepercontainer_fullSynchronizationTime({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function updateKeepercontainer(addr, dcName, orgName, azName, active, diskType) {\n var d = $q.defer();\n resource.update_keepercontainer({}, {\n addr: addr,\n dcName: dcName,\n orgName: orgName,\n azName: azName,\n active: active,\n diskType: diskType\n }, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllOverloadKeepercontainer() {\n var d = $q.defer();\n resource.get_all_overload_keepercontainer({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getAllKeepercontainerUsedInfo() {\n var d = $q.defer();\n resource.get_all_overload_lasted_used_info({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function getOverloadKeeperContainerMigrationProcess() {\n var d = $q.defer();\n resource.get_overload_keepercontainer_migration_process({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function beginToMigrateOverloadKeeperContainers() {\n var d = $q.defer();\n resource.begin_to_migrate_overload_keepercontainer(Array.from(arguments), function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n function migrateKeeperTaskTerminate() {\n var d = $q.defer();\n resource.migrate_keeper_task_terminate({}, function (result) {\n d.resolve(result);\n }, function (result) {\n d.reject(result);\n });\n return d.promise;\n }\n return {\n findAvailableKeepersByDc: findAvailableKeepersByDc,\n findAvailableKeepersByDcAndCluster: findAvailableKeepersByDcAndCluster,\n findKeepercontainerById: findKeepercontainerById,\n findAvailableKeepersByDcAzAndOrg: findAvailableKeepersByDcAzAndOrg,\n getAllInfos: getAllInfos,\n getAllOrganizations: getAllOrganizations,\n getAllDiskTypes: getAllDiskTypes,\n addKeepercontainer: addKeepercontainer,\n updateKeepercontainer: updateKeepercontainer,\n getAllOverloadKeepercontainer: getAllOverloadKeepercontainer,\n getAllKeepercontainerUsedInfo: getAllKeepercontainerUsedInfo,\n getOverloadKeeperContainerMigrationProcess: getOverloadKeeperContainerMigrationProcess,\n beginToMigrateOverloadKeeperContainers: beginToMigrateOverloadKeeperContainers,\n migrateKeeperTaskTerminate: migrateKeeperTaskTerminate,\n getKeepercontainerFullSynchronizationTime: getKeepercontainerFullSynchronizationTime\n };\n }]);\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/services/KeeperContainerService.ts?"); /***/ }), diff --git a/redis/redis-console/src/main/resources/static/scripts/controllers/KeepercontainerOverloadCtl.ts b/redis/redis-console/src/main/resources/static/scripts/controllers/KeepercontainerOverloadCtl.ts index af728017b9..6e96230137 100644 --- a/redis/redis-console/src/main/resources/static/scripts/controllers/KeepercontainerOverloadCtl.ts +++ b/redis/redis-console/src/main/resources/static/scripts/controllers/KeepercontainerOverloadCtl.ts @@ -3,10 +3,10 @@ angular .controller('KeepercontainerOverloadCtl', KeepercontainerOverloadCtl); KeepercontainerOverloadCtl.$inject = ['$rootScope', '$scope', '$window', '$stateParams', 'KeeperContainerService', - 'toastr', 'NgTableParams', '$interval']; + 'toastr', 'NgTableParams', 'AppUtil', '$interval']; function KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, KeeperContainerService, - toastr, NgTableParams, $interval) { + toastr, NgTableParams, AppUtil, $interval) { $scope.overloadKeeperContainer = []; $scope.tableParams = new NgTableParams({}, {}); $scope.migratingTableParams = new NgTableParams({}, {}); @@ -24,55 +24,61 @@ function KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, K $scope.scheduledWork; $scope.beginToMigrateOverloadKeeperContainers = beginToMigrateOverloadKeeperContainers; + $scope.migrateKeeperTaskTerminate = migrateKeeperTaskTerminate; - KeeperContainerService.getAllOverloadKeepercontainer() - .then(function (result) { - if (Array.isArray(result)) $scope.overloadKeeperContainer = result; - $scope.overloadKeeperContainer.forEach(function(container) { - switch(container.cause) { - case 'BOTH': - container.cause = '数据量和流量超载'; - break; - case 'PEER_DATA_OVERLOAD': - container.cause = '数据量超载'; - break; - case 'INPUT_FLOW_OVERLOAD': - container.cause = '流量超载'; - break; - case 'RESOURCE_LACK': - container.cause = '资源不足'; - break; - case 'PAIR_RESOURCE_LACK': - container.cause = '资源不足(keeper对)'; - break; - case 'KEEPER_PAIR_BOTH': - case 'KEEPER_PAIR_PEER_DATA_OVERLOAD': - case 'KEEPER_PAIR_INPUT_FLOW_OVERLOAD': - container.cause = 'keeper对超载'; - break; - } - if (container.cause == '资源不足' || container.cause == '资源不足(keeper对)') { - container.result = '' - }else if (!container.switchActive && !container.keeperPairOverload) { - container.result = '迁移主keeper' - } else if (container.switchActive && !container.keeperPairOverload) { - container.result = '主备切换' - } else if (!container.switchActive && container.keeperPairOverload) { - container.result = '迁移备keeper' - } - if (container.updateTime != null) { - container.time = container.updateTime.substring(0, 19).replace("T", " "); - } - }); - $scope.tableParams = new NgTableParams({ - page : 1, - count : 10, - }, { - filterDelay: 100, - counts: [10, 25, 50], - dataset: $scope.overloadKeeperContainer + getAllOverloadKeepercontainer(); + + function getAllOverloadKeepercontainer() { + KeeperContainerService.getAllOverloadKeepercontainer() + .then(function (result) { + if (Array.isArray(result)) $scope.overloadKeeperContainer = result; + $scope.overloadKeeperContainer.forEach(function (container) { + switch (container.cause) { + case 'BOTH': + container.cause = '数据量和流量超载'; + break; + case 'PEER_DATA_OVERLOAD': + container.cause = '数据量超载'; + break; + case 'INPUT_FLOW_OVERLOAD': + container.cause = '流量超载'; + break; + case 'RESOURCE_LACK': + container.cause = '资源不足'; + break; + case 'PAIR_RESOURCE_LACK': + container.cause = '资源不足(keeper对)'; + break; + case 'KEEPER_PAIR_BOTH': + case 'KEEPER_PAIR_PEER_DATA_OVERLOAD': + case 'KEEPER_PAIR_INPUT_FLOW_OVERLOAD': + container.cause = 'keeper对超载'; + break; + } + if (container.cause == '资源不足' || container.cause == '资源不足(keeper对)') { + container.result = '' + } else if (!container.switchActive && !container.keeperPairOverload) { + container.result = '迁移主keeper' + } else if (container.switchActive && !container.keeperPairOverload) { + container.result = '主备切换' + } else if (!container.switchActive && container.keeperPairOverload) { + container.result = '迁移备keeper' + } + if (container.updateTime != null) { + container.time = container.updateTime.substring(0, 19).replace("T", " "); + } + container.showDetail = false; + }); + $scope.tableParams = new NgTableParams({ + page: 1, + count: 10, + }, { + filterDelay: 100, + counts: [10, 25, 50], + dataset: $scope.overloadKeeperContainer + }); }); - }); + } function beginToMigrateOverloadKeeperContainers() { @@ -95,8 +101,10 @@ function KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, K KeeperContainerService.beginToMigrateOverloadKeeperContainers.apply(KeeperContainerService, $scope.migratingKeeperContainers) .then(result => { - if(result.message == 'success' ) { + if(result.message == 'success') { toastr.success("迁移成功"); + $scope.operateType = OPERATE_TYPE.DETAIL; + getAllOverloadKeepercontainer(); } else { toastr.error(result.message, "迁移失败"); } @@ -105,6 +113,22 @@ function KeepercontainerOverloadCtl($rootScope, $scope, $window, $stateParams, K }); } + function migrateKeeperTaskTerminate() { + KeeperContainerService.migrateKeeperTaskTerminate.apply(KeeperContainerService) + .then(result => { + if(result.state == 0) { + toastr.success(result.message); + } else { + toastr.error(result.message); + } + getOverloadKeeperContainerMigrationProcess(); + $interval.cancel($scope.scheduledWork); + }); + + $scope.operateType = OPERATE_TYPE.DETAIL; + + } + function getOverloadKeeperContainerMigrationProcess() { if ($scope.operateType == OPERATE_TYPE.MIGRATING) { KeeperContainerService.getOverloadKeeperContainerMigrationProcess() diff --git a/redis/redis-console/src/main/resources/static/scripts/services/KeeperContainerService.ts b/redis/redis-console/src/main/resources/static/scripts/services/KeeperContainerService.ts index 79b0bcd5da..8db18d33d1 100644 --- a/redis/redis-console/src/main/resources/static/scripts/services/KeeperContainerService.ts +++ b/redis/redis-console/src/main/resources/static/scripts/services/KeeperContainerService.ts @@ -68,6 +68,10 @@ angular method:'POST', url:'/console/keepercontainer/overload/migration/begin' }, + migrate_keeper_task_terminate: { + method:'POST', + url:'/console/keepercontainer/overload/migration/terminate' + }, stop_to_migrate_overload_keepercontainer:{ method:'POST', url:'/console/keepercontainer/overload/migration/stop' @@ -267,6 +271,17 @@ angular return d.promise; } + function migrateKeeperTaskTerminate() { + var d = $q.defer(); + resource.migrate_keeper_task_terminate({}, + function (result) { + d.resolve(result); + }, function (result) { + d.reject(result); + }); + return d.promise; + } + return { findAvailableKeepersByDc : findAvailableKeepersByDc, findAvailableKeepersByDcAndCluster : findAvailableKeepersByDcAndCluster, @@ -281,6 +296,7 @@ angular getAllKeepercontainerUsedInfo : getAllKeepercontainerUsedInfo, getOverloadKeeperContainerMigrationProcess : getOverloadKeeperContainerMigrationProcess, beginToMigrateOverloadKeeperContainers : beginToMigrateOverloadKeeperContainers, + migrateKeeperTaskTerminate : migrateKeeperTaskTerminate, getKeepercontainerFullSynchronizationTime : getKeepercontainerFullSynchronizationTime } }]); diff --git a/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overall.html b/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overall.html index f42509a440..41282fd22f 100644 --- a/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overall.html +++ b/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overall.html @@ -14,7 +14,7 @@ - + diff --git a/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overload.html b/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overload.html index d6e059fdcc..51c00f9cea 100644 --- a/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overload.html +++ b/redis/redis-console/src/main/resources/static/views/index/keepercontainer_overload.html @@ -16,6 +16,9 @@

取消全选 全选
+ + 终止任务 + 迁移 @@ -25,7 +28,7 @@

{{info.key.port}}{{info.key.port}} {{ info.key.clusterName }}
- + - + + + + +
@@ -35,13 +38,32 @@

{{info.srcKeeperContainer.dcName}} {{info.srcKeeperContainer.activeRedisUsedMemory / 1024 | number:2}} {{info.srcKeeperContainer.activeInputFlow}}{{info.migrateKeeperCount}}{{info.migrateKeeperCount}} {{info.targetKeeperContainer.keeperIp}} {{info.targetKeeperContainer.activeRedisUsedMemory / 1024 | number:2}} {{info.targetKeeperContainer.activeInputFlow}} {{info.cause}} {{info.result}} {{info.time}} + +
+
    +
  • + + {{shard.dcId}}:{{shard.clusterId}}:{{shard.shardId}} +
  • +
+
diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java index 8894ae9f40..62eaf9f949 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/AllTests.java @@ -40,6 +40,8 @@ import com.ctrip.xpipe.redis.console.healthcheck.nonredis.sentinelconfig.SentinelConfigCheckTest; import com.ctrip.xpipe.redis.console.healthcheck.nonredis.unhealthycluster.UnhealthyClusterCheckerTest; import com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerActionTest; +import com.ctrip.xpipe.redis.console.keeper.impl.AbstractKeeperCommandTest; +import com.ctrip.xpipe.redis.console.keeper.impl.CheckerKeeperActiveCommandTest; import com.ctrip.xpipe.redis.console.keeper.impl.DefaultKeeperUsedInfoAnalyzerTest; import com.ctrip.xpipe.redis.console.keeper.impl.GetAllDcCommandTest; import com.ctrip.xpipe.redis.console.migration.MigrationShardRollbackTest; @@ -235,6 +237,8 @@ AutoMigrateOverloadKeeperContainerActionTest.class, DefaultKeeperUsedInfoAnalyzerTest.class, GetAllDcCommandTest.class, + AbstractKeeperCommandTest.class, + CheckerKeeperActiveCommandTest.class, RouteInfoControllerTest.class, RedisControllerTest.class, diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java index ac2c89345d..d5f6e1cd19 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/AutoMigrateOverloadKeeperContainerActionTest.java @@ -40,7 +40,7 @@ public void beforeAutoMigrateOverloadKeeperContainerActionTest() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); } @@ -100,7 +100,7 @@ public void testMigrateAllKeepersFail() { .setSrcKeeperContainer(model2).setTargetKeeperContainer(model4).setMigrateKeeperCount(4).setMigrateShards(migrationShards2); readyToMigrationKeeperContainers.add(migrationKeeperContainerDetailModel2); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())).thenReturn(false); action.migrateAllKeepers(readyToMigrationKeeperContainers); Assert.assertEquals(0, migrationKeeperContainerDetailModel1.getMigrateKeeperCompleteCount()); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java new file mode 100644 index 0000000000..28b7fa47fc --- /dev/null +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/AbstractKeeperCommandTest.java @@ -0,0 +1,76 @@ +package com.ctrip.xpipe.redis.console.keeper.impl; + +import com.ctrip.xpipe.api.command.CommandFuture; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.console.keeper.Command.AbstractKeeperCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; + +import javax.annotation.PostConstruct; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; + +import static org.mockito.Mockito.when; + + +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) +public class AbstractKeeperCommandTest { + + @Mock + XpipeNettyClientKeyedObjectPool keyedObjectPool; + + @Mock + ScheduledExecutorService scheduled; + + @Mock + SimpleObjectPool keyPool; + + @Mock + InfoCommand infoCommand; + + @Mock + CommandFuture infoCommandFuture; + + static Endpoint key = new DefaultEndPoint("10.10.10.10", 6379); + + @PostConstruct + public void post() { + when(infoCommand.execute()).thenReturn(infoCommandFuture); + when(keyedObjectPool.getKeyPool(key)).thenReturn(keyPool); + } + + @Test + public void testGetKeeperCommandName() throws Throwable { + TestAbstractKeeperCommandTest test = new TestAbstractKeeperCommandTest(keyedObjectPool, scheduled); + test.doExecute(); + } + + private static class TestAbstractKeeperCommandTest extends AbstractKeeperCommand{ + + protected TestAbstractKeeperCommandTest(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) { + super(keyedObjectPool, scheduled); + } + + @Override + public String getName() { + return null; + } + + @Override + protected void doExecute() throws Throwable { + this.generateInfoReplicationCommand(key); + } + + @Override + protected void doReset() { + + } + } + +} diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java new file mode 100644 index 0000000000..bff28f98be --- /dev/null +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/CheckerKeeperActiveCommandTest.java @@ -0,0 +1,26 @@ +package com.ctrip.xpipe.redis.console.keeper.impl; + +import com.ctrip.xpipe.redis.console.keeper.Command.CheckKeeperActiveCommand; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; + +import java.util.concurrent.ExecutionException; + +import static org.mockito.Mockito.when; + + +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) +public class CheckerKeeperActiveCommandTest extends AbstractKeeperCommandTest{ + + @Test + public void checkerKeeperActiveCommandTest() { + CheckKeeperActiveCommand command = new CheckKeeperActiveCommand(keyedObjectPool, scheduled, key, true); + command.execute(); + Assert.assertFalse(command.future().isSuccess()); + } + + +} diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index fb0e585003..1e273521d9 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -1,6 +1,6 @@ package com.ctrip.xpipe.redis.console.service; -import com.ctrip.xpipe.command.DefaultCommandFuture; +import com.ctrip.xpipe.command.RetryCommandFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; import com.ctrip.xpipe.redis.console.model.RedisTbl; @@ -8,7 +8,6 @@ import com.ctrip.xpipe.redis.console.model.ShardTbl; import com.ctrip.xpipe.redis.console.service.impl.DefaultKeeperAdvancedService; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl; -import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -20,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,12 +37,18 @@ public class ShardModelServiceTest extends ShardModelServiceImpl{ @Mock private KeeperAdvancedService keeperAdvancedService; + @Mock + private KeeperContainerService keeperContainerService; + @Mock private RedisService redisService; @Mock private RedisSession redisSession; + @Mock + private RetryCommandFactory retryCommandFactory; + private final String dcName = "dc1"; private final String clusterName = "cluster1"; @@ -67,15 +73,15 @@ public void initMockData() { } @Test - public void testMigrateAutoBalanceKeepers() throws Exception { + public void testMigrateAutoBalanceKeepers() throws ExecutionException, InterruptedException { ScheduledThreadPoolExecutor executor = Mockito.mock(ScheduledThreadPoolExecutor.class); ScheduledFuture future = Mockito.mock(ScheduledFuture.class); Mockito.when(executor.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(future); Mockito.when(future.get()).thenReturn(null); try { - shardModelService.migrateAutoBalanceKeepers(dcName, clusterName, shardModel, srcIp, targetIp); - } catch (Exception e) { - Assert.assertEquals(e.getClass(), RuntimeException.class); + shardModelService.migrateActiveKeeper(dcName, clusterName, shardModel, srcIp, targetIp); + } catch (Throwable th) { + Assert.assertEquals(th.getMessage(), "keeper_migration_active_rollback_error"); } } @@ -90,4 +96,15 @@ public void testGetSwitchMaterNewKeepers() { Assert.assertFalse(switchMaterNewKeepers.get(0).isMaster()); } + @Test + public void testSwitchActiveKeeper() { + shardModelService.switchActiveKeeper("ip1","ip2",shardModel); + List keepers = new ArrayList<>(); + keepers.add(new RedisTbl().setRedisIp("ip1").setRedisPort(6380)); + keepers.add(new RedisTbl().setRedisIp("ip2").setRedisPort(6381)); + shardModel.setKeepers(keepers); + Assert.assertFalse(shardModelService.switchActiveKeeper("ip1","ip3",shardModel)); + Assert.assertTrue(shardModelService.switchActiveKeeper("ip1","ip2",shardModel)); + } + } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java index 8bb7ad0821..dcdda4ac44 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java @@ -9,7 +9,6 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -40,14 +39,14 @@ public void before() { ShardModel shardModel = new ShardModel(); Mockito.when(shardModelService.getShardModel(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyObject())) .thenReturn(shardModel); - Mockito.when(shardModelService.migrateShardKeepers(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(shardModelService.migrateBackupKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.anyString(), Mockito.anyString())) .thenReturn(true); - Mockito.when(shardModelService.switchMaster(Mockito.anyString(), Mockito.anyString(), Mockito.any())) + Mockito.when(shardModelService.switchActiveKeeper(Mockito.anyString(), Mockito.anyString(), Mockito.any())) .thenReturn(true); } @Test - public void testMigrationKeeperContainer() { + public void testMigrationKeeperContainer() throws Throwable { List models = new ArrayList<>(); MigrationKeeperContainerDetailModel model = new MigrationKeeperContainerDetailModel(); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java index 7dcb8e6d10..b4557eff3f 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java @@ -325,7 +325,7 @@ public void resetKeepersTest() { HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), eq(requestEntity), eq(Void.class))).thenReturn(null); - keeperContainerService.resetKeepers(keeperInstanceMeta); + keeperContainerService.resetKeeper(keeperInstanceMeta.getKeeperMeta().getIp(), keeperInstanceMeta.getReplId()); } @Test diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java index 3d8043d9d5..bafe13aacf 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/InfoResultExtractor.java @@ -120,7 +120,7 @@ public long getSyncPartialErr() { public Long getSwapUsedDbSize() { return extractAsLong(KEY_SWAP_USED_DB_SIZE);} - public boolean getKeeperActive() { return "ACTIVE".equals(extract(KEY_KEEPER_ACTIVE)); } + public boolean isKeeperActive() { return "ACTIVE".equals(extract(KEY_KEEPER_ACTIVE)); } public long getMasterReplOffset() { Long result = extractAsLong(KEY_MASTER_REPL_OFFSET);