From fd90a6e5b83385bfa5da770a79aa3d208e559c73 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Thu, 28 Mar 2024 21:18:44 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=8C=80keeeper-=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E8=BF=81=E7=A7=BB=E8=AE=A1=E5=88=92=EF=BC=9A=E4=B8=BB?= =?UTF-8?q?=E5=A4=87=E5=88=87=E6=8D=A2=E6=B7=BB=E5=8A=A0=E9=AA=8C=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...faultKeeperContainerMigrationAnalyzer.java | 11 +- ...eeperContainerUsedInfoAnalyzerContext.java | 15 +- ...eeperContainerUsedInfoAnalyzerContext.java | 2 +- ...efaultKeeperContainerMigrationService.java | 7 +- .../service/model/ShardModelService.java | 2 +- .../model/impl/ShardModelServiceImpl.java | 150 +++++++++++++----- .../service/ShardModelServiceTest.java | 35 ++-- 7 files changed, 163 insertions(+), 59 deletions(-) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java index 8b423c6b22..d181fba9f7 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java @@ -14,6 +14,7 @@ import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel; import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel; import com.ctrip.xpipe.redis.console.service.KeeperContainerAnalyzerService; +import com.ctrip.xpipe.redis.core.entity.DcMeta; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.utils.VisibleForTesting; import org.slf4j.Logger; @@ -55,7 +56,11 @@ public List getMigrationPlans(Map modelsWithoutResource = new ArrayList<>(); models.forEach(a -> modelsWithoutResource.add(KeeperContainerUsedInfoModel.cloneKeeperContainerUsedInfoModel(a))); - analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain, metaCache.getXpipeMeta().getDcs().get(currentDc)); + DcMeta dcMeta = null; + if (metaCache.getXpipeMeta() != null) { + dcMeta = metaCache.getXpipeMeta().getDcs().get(currentDc); + } + analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain, dcMeta); analyzerContext.initKeeperPairData(modelsWithoutResource, modelsMap); analyzerContext.initAvailablePool(modelsWithoutResource); for (KeeperContainerUsedInfoModel model : modelsWithoutResource) { @@ -82,7 +87,7 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod } KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, dcClusterShard, backUpKeeper, (Boolean) cause[1], false); if (bestKeeperContainer == null) { - break; + continue; } analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper); analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]); @@ -105,7 +110,7 @@ private void generatePairOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod KeeperContainerUsedInfoModel backUpKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelB : modelA; KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, dcClusterShard, activeKeeperContainer, (Boolean) cause[1], true); if (bestKeeperContainer == null) { - break; + continue; } if (!filterChain.isMigrateKeeperPairOverload(dcClusterShard, backUpKeeperContainer, bestKeeperContainer, analyzerContext)) { analyzerContext.addMigrationPlan(backUpKeeperContainer, bestKeeperContainer, false, true, (String) cause[0], dcClusterShard, activeKeeperContainer); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java index 7a2889e337..ced53e9300 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java @@ -43,6 +43,9 @@ public class DefaultKeeperContainerUsedInfoAnalyzerContext implements KeeperCont public DefaultKeeperContainerUsedInfoAnalyzerContext(KeeperContainerFilterChain filterChain, DcMeta dcMeta) { this.filterChain = filterChain; this.dcMeta = dcMeta; + if (dcMeta == null) { + logger.warn("[DefaultKeeperContainerUsedInfoAnalyzerContext dcMeta is null!"); + } } @Override @@ -120,7 +123,10 @@ public void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, } @Override - public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel srcKeeper, Map.Entry dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isActiveEntry) { + public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel srcKeeper, Map.Entry dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isMigrateShardBackUp) { + if (srcKeeper == null || srcKeeperPair == null) { + return null; + } String org = srcKeeper.getOrg(); String az = srcKeeper.getAz(); PriorityQueue queue = isPeerDataOverload ? minPeerDataKeeperContainers : minInputFlowKeeperContainers; @@ -130,8 +136,8 @@ public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedIn if ((org == null || org.equals(target.getOrg())) && (az == null || az.equals(target.getAz())) && !Objects.equals(target.getKeeperIp(), srcKeeperPair.getKeeperIp()) - && ((!isActiveEntry && filterChain.canMigrate(dcClusterShard, srcKeeperPair, target, this)) - || (isActiveEntry && !filterChain.isMigrateKeeperPairOverload(dcClusterShard, srcKeeperPair, target, this)))) { + && ((!isMigrateShardBackUp && filterChain.canMigrate(dcClusterShard, srcKeeperPair, target, this)) + || (isMigrateShardBackUp && !filterChain.isMigrateKeeperPairOverload(dcClusterShard, srcKeeperPair, target, this)))) { return target; } temp.add(target); @@ -165,6 +171,9 @@ public void initKeeperPairData(List usedInfoMap, M } private void getProblemKeeperContainer(Map.Entry entry) { + if (dcMeta == null) { + return; + } dcMeta.getClusters().values().forEach(clusterMeta -> { if (entry.getKey().getClusterId().equals(clusterMeta.getId())) { clusterMeta.getShards().values().forEach(shardMeta -> { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java index f49a5517d8..962252182d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java @@ -18,7 +18,7 @@ public interface KeeperContainerUsedInfoAnalyzerContext { void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, boolean isPeerDataOverload); - KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isActiveEntry); + KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isMigrateShardBackUp); String getBackUpKeeperIp(DcClusterShard activeKeeper); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java index 9a2c1e7425..6ce24aed4b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java @@ -72,12 +72,11 @@ public void beginMigrateKeeperContainers(List getAllShardModel(String dcName, String clusterName) { @@ -254,35 +262,71 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel } @Override - public boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel) { + public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { try { List keepers = shardModel.getKeepers(); int srcKeeperPort = keepers.stream() - .filter(r -> r.getRedisIp().equals(srcIp)) + .filter(r -> r.getRedisIp().equals(activeIp)) .findFirst() .map(RedisTbl::getRedisPort) .orElseThrow(() -> new RuntimeException("No source keeper found")); String targetKeeperIp = keepers.stream() - .filter(r -> !r.getRedisIp().equals(srcIp)) + .filter(r -> !r.getRedisIp().equals(activeIp)) .findFirst() .map(RedisTbl::getRedisIp) .orElseThrow(() -> new RuntimeException("No target keeper found")); - if (!targetKeeperIp.equals(targetIp)) { + if (!targetKeeperIp.equals(backupIp)) { return false; } - KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(srcIp).stream() + KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(activeIp).stream() .filter(k -> k.getKeeperMeta().getPort() == srcKeeperPort) .findFirst() .orElseThrow(() -> new RuntimeException("No keeper instance found")); keeperContainerService.resetKeepers(keeperInstanceMeta); + return checkKeeperActive(activeIp, srcKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); + } catch (Exception e) { return false; } - return true; + } + + private boolean checkKeeperActive(String ip, int port, boolean expectActive, long interval, int maxRetryTimes) { + DefaultEndPoint activeKey = new DefaultEndPoint(ip, port); + InfoCommand infoCommand = generteInfoCommand(activeKey); + final boolean[] isMaster = new boolean[1]; + try { + int time = 0; + while (time < maxRetryTimes){ + time ++; + addHookAndExecute(infoCommand, new Callbackable() { + @Override + public void success(String message) { + isMaster[0] = new InfoResultExtractor(message).getKeeperActive(); + } + + @Override + public void fail(Throwable throwable) { + logger.error("[switchMaster] ", throwable); + } + }); + if (isMaster[0] == expectActive) break; + Thread.sleep(interval); + } + return !expectActive ^ isMaster[0]; + } catch (Exception e) { + logger.error("[switchMaster] check keeper active error", e); + return false; + } finally { + try { + keyedObjectPool.clear(activeKey); + } catch (ObjectPoolException e) { + logger.error("[clear] clear keyed object pool error", e); + } + } } @Resource(name = REDIS_COMMAND_EXECUTOR) @@ -308,26 +352,30 @@ public InfoCommand generteInfoCommand(Endpoint key) { } @Override - public void migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { + public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { + List oldKeepers = shardModel.getKeepers(); List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp, true); if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) { throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); } - shardModel.setKeepers(newKeepers); RedisTbl active = newKeepers.get(0); RedisTbl backup = newKeepers.get(1); DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort()); DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort()); InfoCommand activeInfoCommand = generteInfoCommand(activeKey); InfoCommand backupInfoCommand = generteInfoCommand(backupKey); - long expireTime = 10L * 60 * 1000; - long intervalTime = 1000; + + FullSyncJudgeTask task = new FullSyncJudgeTask(active.getRedisIp(), backup.getRedisIp(), activeInfoCommand, backupInfoCommand, KEEPER_BALANCE_FULL_SYNC_EXPIRE_TIME, KEEPER_BALANCE_FULL_SYNC_INTERVAL_TIME, + dcName, clusterName, shardModel); try { - FullSyncJudgeTask task = new FullSyncJudgeTask(active.getRedisIp(), backup.getRedisIp(), activeInfoCommand, backupInfoCommand, expireTime, intervalTime, - dcName, clusterName, shardModel); - ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 1000, 1000, TimeUnit.MILLISECONDS); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 0,1000, TimeUnit.MILLISECONDS); task.setScheduledFuture(scheduledFuture); + scheduledFuture.get(); + return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers); + } catch (Exception e) { + logger.error("[clear] clear keyed object pool error", e); + return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers); } finally { try { keyedObjectPool.clear(activeKey); @@ -338,6 +386,16 @@ public void migrateAutoBalanceKeepers(String dcName, String clusterName, ShardMo } } + private boolean getAutoBalanceResult(FullSyncJudgeTask task, String dcName, String clusterName, ShardModel shardModel, List oldKeepers) { + if (task.getResult()) { + return true; + } + if (!doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers)) { + throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); + } + return false; + } + private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel shardModel, List newKeepers) { if (newKeepers == null) { logger.debug("[migrateKeepers] no need to replace keepers"); @@ -398,7 +456,6 @@ public void run() { if (startTime == 0) { startTime = System.currentTimeMillis(); } - isSuccess = true; addHookAndExecute(activeInfoCommand, new Callbackable() { @Override public void success(String message) { @@ -407,7 +464,7 @@ public void success(String message) { @Override public void fail(Throwable throwable) { - isSuccess = false; + backupMasterReplOffset = -1; } }); @@ -425,15 +482,16 @@ public void success(String message) { @Override public void fail(Throwable throwable) { - isSuccess = false; + backupMasterReplOffset = -1; } }); - - if (isSuccess && backupMasterReplOffset > activeMasterReplOffset) { - switchMaster(backUpIp, activeIp, shardModel); + if (backupMasterReplOffset > activeMasterReplOffset) { + isSuccess = true; + switchMaster(activeIp, backUpIp, shardModel); CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION_ACTIVE_SUCCESS, String.format("activeKeeper:%s, backupKeeper:%s, dc:%s, cluster:%s, shard:%s", activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName())); + scheduledFuture.cancel(true); Thread.currentThread().interrupt(); } if (System.currentTimeMillis() - startTime > expireTime && !isSuccess) { @@ -442,31 +500,49 @@ public void fail(Throwable throwable) { activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName())); scheduledFuture.cancel(true); } + } - private CommandFuture addHookAndExecute(AbstractRedisCommand command, Callbackable callback) { - silentCommand(command); - CommandFuture future = command.execute(); - future.addListener(new CommandFutureListener() { - @Override - public void operationComplete(CommandFuture commandFuture) throws Exception { - if(!commandFuture.isSuccess()) { - callback.fail(commandFuture.cause()); - } else { - callback.success(commandFuture.get()); - } - } - }); - return future; + public boolean getResult() { + return isSuccess; } - private void silentCommand(LoggableRedisCommand command) { - command.logRequest(false); - command.logResponse(false); + @VisibleForTesting + public void setBackupMasterReplOffset(long offset) { + this.backupMasterReplOffset = offset; + } + } + private void addHookAndExecute(AbstractRedisCommand command, Callbackable callback) { + silentCommand(command); + CommandFuture future = command.execute(); + future.addListener(new CommandFutureListener() { + @Override + public void operationComplete(CommandFuture commandFuture) throws Exception { + if(!commandFuture.isSuccess()) { + callback.fail(commandFuture.cause()); + } else { + callback.success(commandFuture.get()); + } + } + }); + try { + future.get(); + } catch (Exception e){ + throw new RuntimeException(e); } + } + + private void silentCommand(LoggableRedisCommand command) { + command.logRequest(false); + command.logResponse(false); } + @VisibleForTesting + public void setExecutor(ScheduledThreadPoolExecutor executor) { + this.executor = executor; + } + } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index 2d710f1d36..7fb25f9354 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -1,19 +1,14 @@ package com.ctrip.xpipe.redis.console.service; import com.ctrip.xpipe.command.DefaultCommandFuture; -import com.ctrip.xpipe.endpoint.DefaultEndPoint; -import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; -import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSessionManager; import com.ctrip.xpipe.redis.console.model.RedisTbl; import com.ctrip.xpipe.redis.console.model.ShardModel; import com.ctrip.xpipe.redis.console.model.ShardTbl; import com.ctrip.xpipe.redis.console.service.impl.DefaultKeeperAdvancedService; -import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl.*; import com.ctrip.xpipe.redis.console.service.model.impl.ShardModelServiceImpl; import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; -import org.apache.logging.log4j.core.config.CronScheduledFuture; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -72,21 +67,41 @@ public void initMockData() { } @Test - public void testMigrateAutoBalanceKeepers() { - shardModelService.migrateAutoBalanceKeepers(dcName, clusterName, shardModel, srcIp, targetIp); + public void testMigrateAutoBalanceKeepers() throws Exception { + ScheduledThreadPoolExecutor executor = Mockito.mock(ScheduledThreadPoolExecutor.class); + shardModelService.setExecutor(executor); + ScheduledFuture future = Mockito.mock(ScheduledFuture.class); + Mockito.when(executor.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(future); + Mockito.when(future.get()).thenReturn(null); + try { + shardModelService.migrateAutoBalanceKeepers(dcName, clusterName, shardModel, srcIp, targetIp); + } catch (Exception e) { + Assert.assertEquals(e.getClass(), RuntimeException.class); + } } @Test - public void testFullSyncJudgeTask() { - InfoCommand infoCommand1 = shardModelService.generteInfoCommand(new DefaultEndPoint("1", 6380)); - InfoCommand infoCommand2 = shardModelService.generteInfoCommand(new DefaultEndPoint("2", 6380)); + public void testFullSyncJudgeTask() throws Exception { + InfoCommand infoCommand1 = Mockito.mock(InfoCommand.class); + InfoCommand infoCommand2 = Mockito.mock(InfoCommand.class); + DefaultCommandFuture future1 = Mockito.mock(DefaultCommandFuture.class); + DefaultCommandFuture future2 = Mockito.mock(DefaultCommandFuture.class); + Mockito.when(infoCommand1.execute()).thenReturn(future1); + Mockito.when(infoCommand2.execute()).thenReturn(future2); + Mockito.when(future1.get()).thenReturn(null); + Mockito.when(future2.get()).thenReturn(null); FullSyncJudgeTask task = new FullSyncJudgeTask("1", "2", infoCommand1, infoCommand2, 1000, 1000, dcName, clusterName, shardModel); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 1000, 1000, TimeUnit.MILLISECONDS); task.setScheduledFuture(scheduledFuture); task.run(); + Assert.assertFalse(task.getResult()); + task.setBackupMasterReplOffset(10L); + task.run(); + Assert.assertTrue(task.getResult()); } + @Test public void testGetSwitchMaterNewKeepers() { ShardModel model = new ShardModel();