From 97cb610cd5d6464bff3aa344289f288ae5280470 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Fri, 29 Mar 2024 16:25:38 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=BC=82=E5=B8=B8case?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=97=A5=E5=BF=97=EF=BC=8C=E9=99=90=E5=88=B6?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=B1=A0=E7=BA=BF=E7=A8=8B=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redis/checker/resource/Resource.java | 2 + .../model/impl/ShardModelServiceImpl.java | 93 +++++++++++-------- .../redis/console/spring/ResourceConfig.java | 10 ++ ...ltKeeperContainerMigrationServiceTest.java | 7 +- .../impl/KeeperContainerServiceImplTest.java | 39 ++++++++ 5 files changed, 107 insertions(+), 44 deletions(-) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java index f16d653976..51a3e4da40 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java @@ -12,6 +12,8 @@ public class Resource { public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; + public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool"; + public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors"; public static final String PING_DELAY_INFO_SCHEDULED = "pingDelayInfoScheduled"; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index ae46e90bfb..8d29248e9e 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -20,7 +20,6 @@ import com.ctrip.xpipe.redis.console.service.*; import com.ctrip.xpipe.redis.console.service.model.ShardModelService; import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; @@ -40,8 +39,7 @@ import java.util.*; import java.util.concurrent.*; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_SESSION_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.*; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_FAIL; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_SUCCESS; @@ -78,6 +76,14 @@ public class ShardModelServiceImpl implements ShardModelService{ private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(20); + @Resource(name = REDIS_COMMAND_EXECUTOR) + private ScheduledExecutorService scheduled; + + @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) + private XpipeNettyClientKeyedObjectPool keyedObjectPool; + + private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); + private final long SWITCH_MASTER_CHECK_INTERVAL = 1000; private final int SWITCH_MASTER_CHECK_TIMES = 10; @@ -265,31 +271,45 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { try { List keepers = shardModel.getKeepers(); - int srcKeeperPort = keepers.stream() - .filter(r -> r.getRedisIp().equals(activeIp)) - .findFirst() - .map(RedisTbl::getRedisPort) - .orElseThrow(() -> new RuntimeException("No source keeper found")); - - String targetKeeperIp = keepers.stream() - .filter(r -> !r.getRedisIp().equals(activeIp)) - .findFirst() - .map(RedisTbl::getRedisIp) - .orElseThrow(() -> new RuntimeException("No target keeper found")); - - if (!targetKeeperIp.equals(backupIp)) { + if (keepers.size() != 2) { + logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModel: {}", activeIp, backupIp, shardModel); return false; } + int activeKeeperPort = -1; + String backUpKeeperIp = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeperPort = keeper.getRedisPort(); + } else { + backUpKeeperIp = keeper.getRedisIp(); + } + } - KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(activeIp).stream() - .filter(k -> k.getKeeperMeta().getPort() == srcKeeperPort) - .findFirst() - .orElseThrow(() -> new RuntimeException("No keeper instance found")); + if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { + logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModel: {}, activeKeeperPort: {}, backUpKeeperIp: {}" + , activeIp, backupIp, shardModel, activeKeeperPort, backUpKeeperIp); + return false; + } + + KeeperTransMeta keeperInstanceMeta = null; + List allKeepers = keeperContainerService.getAllKeepers(activeIp); + for (KeeperInstanceMeta keeper : allKeepers) { + if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { + keeperInstanceMeta = keeper; + break; + } + } + + if (keeperInstanceMeta == null) { + logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); + return false; + } keeperContainerService.resetKeepers(keeperInstanceMeta); - return checkKeeperActive(activeIp, srcKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); + return checkKeeperActive(activeIp, activeKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); } catch (Exception e) { + logger.error("[switchMaster] switch master failed", e); return false; } } @@ -310,39 +330,25 @@ public void success(String message) { @Override public void fail(Throwable throwable) { - logger.error("[switchMaster] ", throwable); + logger.error("[switchMaster] keeper: {}:{}", ip, port, throwable); } }); if (isMaster[0] == expectActive) break; Thread.sleep(interval); } - return !expectActive ^ isMaster[0]; + return isMaster[0] == expectActive; } catch (Exception e) { - logger.error("[switchMaster] check keeper active error", e); + logger.error("[switchMaster] check keeper active error, keeper: {}:{}", ip, port, e); return false; } finally { try { keyedObjectPool.clear(activeKey); } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error", e); + logger.error("[clear] clear keyed object pool error, keeper: {}:{}", ip, port, e); } } } - @Resource(name = REDIS_COMMAND_EXECUTOR) - private ScheduledExecutorService scheduled; - - @Resource(name = REDIS_SESSION_NETTY_CLIENT_POOL) - private XpipeNettyClientKeyedObjectPool keyedObjectPool; - - private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); - - @VisibleForTesting - public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { - this.keyedObjectPool = pool; - } - - @VisibleForTesting public InfoCommand generteInfoCommand(Endpoint key) { if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; @@ -420,6 +426,7 @@ private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel s protected class FullSyncJudgeTask implements Runnable{ private String activeIp; + private String backUpIp; private final InfoCommand activeInfoCommand; private final InfoCommand backupInfoCommand; @@ -433,7 +440,6 @@ protected class FullSyncJudgeTask implements Runnable{ private ShardModel shardModel; private long startTime = 0; private ScheduledFuture scheduledFuture; - public FullSyncJudgeTask(String activeIp, String backUpIp, InfoCommand activeInfoCommand, InfoCommand backupInfoCommand, long expireTime, long intervalTime, String dcName, String clusterName, ShardModel shardModel) { this.activeIp = activeIp; @@ -512,6 +518,7 @@ public void setBackupMasterReplOffset(long offset) { this.backupMasterReplOffset = offset; } + } private void addHookAndExecute(AbstractRedisCommand command, Callbackable callback) { silentCommand(command); @@ -532,7 +539,6 @@ public void operationComplete(CommandFuture commandFuture) throws Exception { throw new RuntimeException(e); } } - private void silentCommand(LoggableRedisCommand command) { command.logRequest(false); command.logResponse(false); @@ -544,5 +550,10 @@ public void setExecutor(ScheduledThreadPoolExecutor executor) { this.executor = executor; } + @VisibleForTesting + public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { + this.keyedObjectPool = pool; + } + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java index 605e114940..6c802fa636 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java @@ -26,6 +26,8 @@ public class ResourceConfig extends AbstractRedisConfigContext { private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8")); + private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1")); + @Bean(name = REDIS_COMMAND_EXECUTOR) public ScheduledExecutorService getRedisCommandExecutor() { int corePoolSize = OsUtils.getCpuCount(); @@ -54,6 +56,14 @@ public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws E return keyedObjectPool; } + @Bean(name = MIGRATE_KEEPER_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + @Bean(name = PING_DELAY_INFO_EXECUTORS) public ExecutorService getDelayPingExecturos() { return DefaultExecutorFactory.createAllowCoreTimeoutAbortPolicy("RedisHealthCheckInstance-").createExecutorService(); diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java index 5b7c9f26dd..8bb7ad0821 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationServiceTest.java @@ -9,6 +9,7 @@ import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -75,20 +76,20 @@ public void testMigrationKeeperContainer() { model.setSrcKeeperContainer(src).setTargetKeeperContainer(target).setMigrateKeeperCount(3).setMigrateShards(migrationShards); models.add(model); service.beginMigrateKeeperContainers(models); - Assert.assertEquals(3, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); + Assert.assertEquals(0, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); models.clear(); model.setSwitchActive(true); models.add(model); service.beginMigrateKeeperContainers(models); - Assert.assertEquals(6, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); + Assert.assertEquals(3, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); models.clear(); model.setSwitchActive(false); model.setKeeperPairOverload(true); models.add(model); service.beginMigrateKeeperContainers(models); - Assert.assertEquals(9, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); + Assert.assertEquals(6, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount()); } @Test diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java index b42ac7010c..682d3c5d67 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/impl/KeeperContainerServiceImplTest.java @@ -8,18 +8,29 @@ import com.ctrip.xpipe.redis.console.model.ClusterTbl; import com.ctrip.xpipe.redis.console.model.KeeperContainerInfoModel; import com.ctrip.xpipe.redis.console.model.KeepercontainerTbl; +import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; +import com.ctrip.xpipe.redis.core.entity.KeeperMeta; +import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.spring.RestTemplateFactory; import com.ctrip.xpipe.utils.StringUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.*; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.web.client.RestTemplate; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; + /** * @author wenchao.meng @@ -288,6 +299,34 @@ public void testUpdateKeeperContainerByInfoModel() { } + @Test + public void getAllKeepersTest() { + RestTemplate restTemplate = Mockito.mock(RestTemplate.class); + keeperContainerService.setRestTemplate(restTemplate); + ResponseEntity> response = Mockito.mock(ResponseEntity.class); + List list = new ArrayList<>(); + Mockito.when(response.getBody()).thenReturn(list); + Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), Mockito.isNull(), Mockito.any(ParameterizedTypeReference.class))).thenReturn(response); + List allKeepers = keeperContainerService.getAllKeepers("keeperContainerIp"); + Assert.assertEquals(list, allKeepers); + } + + @Test + public void resetKeepersTest() { + KeeperTransMeta keeperInstanceMeta = new KeeperInstanceMeta(); + KeeperMeta meta = new KeeperMeta(); + meta.setIp(""); + keeperInstanceMeta.setKeeperMeta(meta); + RestTemplate restTemplate = Mockito.mock(RestTemplate.class); + keeperContainerService.setRestTemplate(restTemplate); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity requestEntity = new HttpEntity<>(keeperInstanceMeta, headers); + Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), eq(requestEntity), eq(Void.class))).thenReturn(null); + keeperContainerService.resetKeepers(keeperInstanceMeta); + } + @Test public void testUpdateKeeperContainerByInfoModelFail() { KeeperContainerInfoModel keeper = keeperContainerService.findKeeperContainerInfoModelById(30);