Skip to content

Commit

Permalink
完善异常case流程日志,限制连接池线程数
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 29, 2024
1 parent fd90a6e commit 97cb610
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class Resource {

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";

public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool";

public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors";

public static final String PING_DELAY_INFO_SCHEDULED = "pingDelayInfoScheduled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.ctrip.xpipe.redis.console.service.*;
import com.ctrip.xpipe.redis.console.service.model.ShardModelService;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
Expand All @@ -40,8 +39,7 @@
import java.util.*;
import java.util.concurrent.*;

import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR;
import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_SESSION_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.*;
import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_FAIL;
import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_SUCCESS;

Expand Down Expand Up @@ -78,6 +76,14 @@ public class ShardModelServiceImpl implements ShardModelService{

private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(20);

@Resource(name = REDIS_COMMAND_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = MIGRATE_KEEPER_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI)));

private final long SWITCH_MASTER_CHECK_INTERVAL = 1000;

private final int SWITCH_MASTER_CHECK_TIMES = 10;
Expand Down Expand Up @@ -265,31 +271,45 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel
public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) {
try {
List<RedisTbl> keepers = shardModel.getKeepers();
int srcKeeperPort = keepers.stream()
.filter(r -> r.getRedisIp().equals(activeIp))
.findFirst()
.map(RedisTbl::getRedisPort)
.orElseThrow(() -> new RuntimeException("No source keeper found"));

String targetKeeperIp = keepers.stream()
.filter(r -> !r.getRedisIp().equals(activeIp))
.findFirst()
.map(RedisTbl::getRedisIp)
.orElseThrow(() -> new RuntimeException("No target keeper found"));

if (!targetKeeperIp.equals(backupIp)) {
if (keepers.size() != 2) {
logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModel: {}", activeIp, backupIp, shardModel);
return false;
}
int activeKeeperPort = -1;
String backUpKeeperIp = null;

Check warning on line 279 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L278-L279

Added lines #L278 - L279 were not covered by tests
for (RedisTbl keeper : keepers) {
if (keeper.getRedisIp().equals(activeIp)) {
activeKeeperPort = keeper.getRedisPort();

Check warning on line 282 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L282

Added line #L282 was not covered by tests
} else {
backUpKeeperIp = keeper.getRedisIp();

Check warning on line 284 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L284

Added line #L284 was not covered by tests
}
}

Check warning on line 286 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L286

Added line #L286 was not covered by tests

KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(activeIp).stream()
.filter(k -> k.getKeeperMeta().getPort() == srcKeeperPort)
.findFirst()
.orElseThrow(() -> new RuntimeException("No keeper instance found"));
if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) {
logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModel: {}, activeKeeperPort: {}, backUpKeeperIp: {}"
, activeIp, backupIp, shardModel, activeKeeperPort, backUpKeeperIp);
return false;

Check warning on line 291 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L289-L291

Added lines #L289 - L291 were not covered by tests
}

KeeperTransMeta keeperInstanceMeta = null;
List<KeeperInstanceMeta> allKeepers = keeperContainerService.getAllKeepers(activeIp);

Check warning on line 295 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L294-L295

Added lines #L294 - L295 were not covered by tests
for (KeeperInstanceMeta keeper : allKeepers) {
if (keeper.getKeeperMeta().getPort() == activeKeeperPort) {
keeperInstanceMeta = keeper;
break;

Check warning on line 299 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L298-L299

Added lines #L298 - L299 were not covered by tests
}
}

Check warning on line 301 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L301

Added line #L301 was not covered by tests

if (keeperInstanceMeta == null) {
logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort);
return false;

Check warning on line 305 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L304-L305

Added lines #L304 - L305 were not covered by tests
}

keeperContainerService.resetKeepers(keeperInstanceMeta);
return checkKeeperActive(activeIp, srcKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES);
return checkKeeperActive(activeIp, activeKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES);

Check warning on line 309 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L308-L309

Added lines #L308 - L309 were not covered by tests

} catch (Exception e) {
logger.error("[switchMaster] switch master failed", e);
return false;

Check warning on line 313 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L311-L313

Added lines #L311 - L313 were not covered by tests
}
}
Expand All @@ -310,39 +330,25 @@ public void success(String message) {

@Override
public void fail(Throwable throwable) {
logger.error("[switchMaster] ", throwable);
logger.error("[switchMaster] keeper: {}:{}", ip, port, throwable);
}

Check warning on line 334 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L333-L334

Added lines #L333 - L334 were not covered by tests
});
if (isMaster[0] == expectActive) break;
Thread.sleep(interval);

Check warning on line 337 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L337

Added line #L337 was not covered by tests
}
return !expectActive ^ isMaster[0];
return isMaster[0] == expectActive;
} catch (Exception e) {
logger.error("[switchMaster] check keeper active error", e);
logger.error("[switchMaster] check keeper active error, keeper: {}:{}", ip, port, e);
return false;

Check warning on line 342 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L340-L342

Added lines #L340 - L342 were not covered by tests
} finally {
try {
keyedObjectPool.clear(activeKey);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error", e);
logger.error("[clear] clear keyed object pool error, keeper: {}:{}", ip, port, e);
}

Check warning on line 348 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L345-L348

Added lines #L345 - L348 were not covered by tests
}
}

@Resource(name = REDIS_COMMAND_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = REDIS_SESSION_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI)));

@VisibleForTesting
public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) {
this.keyedObjectPool = pool;
}

@VisibleForTesting
public InfoCommand generteInfoCommand(Endpoint key) {
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
Expand Down Expand Up @@ -420,6 +426,7 @@ private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel s
protected class FullSyncJudgeTask implements Runnable{

private String activeIp;

private String backUpIp;
private final InfoCommand activeInfoCommand;
private final InfoCommand backupInfoCommand;
Expand All @@ -433,7 +440,6 @@ protected class FullSyncJudgeTask implements Runnable{
private ShardModel shardModel;
private long startTime = 0;
private ScheduledFuture<?> scheduledFuture;

public FullSyncJudgeTask(String activeIp, String backUpIp, InfoCommand activeInfoCommand, InfoCommand backupInfoCommand, long expireTime, long intervalTime,
String dcName, String clusterName, ShardModel shardModel) {
this.activeIp = activeIp;
Expand Down Expand Up @@ -512,6 +518,7 @@ public void setBackupMasterReplOffset(long offset) {
this.backupMasterReplOffset = offset;
}


}
private <V> void addHookAndExecute(AbstractRedisCommand<V> command, Callbackable<V> callback) {
silentCommand(command);
Expand All @@ -532,7 +539,6 @@ public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
throw new RuntimeException(e);

Check warning on line 539 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java#L538-L539

Added lines #L538 - L539 were not covered by tests
}
}

private void silentCommand(LoggableRedisCommand command) {
command.logRequest(false);
command.logResponse(false);
Expand All @@ -544,5 +550,10 @@ public void setExecutor(ScheduledThreadPoolExecutor executor) {
this.executor = executor;
}

@VisibleForTesting
public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) {
this.keyedObjectPool = pool;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class ResourceConfig extends AbstractRedisConfigContext {

private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8"));

private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1"));

@Bean(name = REDIS_COMMAND_EXECUTOR)
public ScheduledExecutorService getRedisCommandExecutor() {
int corePoolSize = OsUtils.getCpuCount();
Expand Down Expand Up @@ -54,6 +56,14 @@ public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws E
return keyedObjectPool;
}

@Bean(name = MIGRATE_KEEPER_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = PING_DELAY_INFO_EXECUTORS)
public ExecutorService getDelayPingExecturos() {
return DefaultExecutorFactory.createAllowCoreTimeoutAbortPolicy("RedisHealthCheckInstance-").createExecutorService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
Expand Down Expand Up @@ -75,20 +76,20 @@ public void testMigrationKeeperContainer() {
model.setSrcKeeperContainer(src).setTargetKeeperContainer(target).setMigrateKeeperCount(3).setMigrateShards(migrationShards);
models.add(model);
service.beginMigrateKeeperContainers(models);
Assert.assertEquals(3, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());
Assert.assertEquals(0, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());

models.clear();
model.setSwitchActive(true);
models.add(model);
service.beginMigrateKeeperContainers(models);
Assert.assertEquals(6, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());
Assert.assertEquals(3, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());

models.clear();
model.setSwitchActive(false);
model.setKeeperPairOverload(true);
models.add(model);
service.beginMigrateKeeperContainers(models);
Assert.assertEquals(9, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());
Assert.assertEquals(6, service.getMigrationProcess().get(0).getMigrateKeeperCompleteCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@
import com.ctrip.xpipe.redis.console.model.ClusterTbl;
import com.ctrip.xpipe.redis.console.model.KeeperContainerInfoModel;
import com.ctrip.xpipe.redis.console.model.KeepercontainerTbl;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
import com.ctrip.xpipe.spring.RestTemplateFactory;
import com.ctrip.xpipe.utils.StringUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.*;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.web.client.RestTemplate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;


/**
* @author wenchao.meng
Expand Down Expand Up @@ -288,6 +299,34 @@ public void testUpdateKeeperContainerByInfoModel() {

}

@Test
public void getAllKeepersTest() {
RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
keeperContainerService.setRestTemplate(restTemplate);
ResponseEntity<List<KeeperInstanceMeta>> response = Mockito.mock(ResponseEntity.class);
List<KeeperInstanceMeta> list = new ArrayList<>();
Mockito.when(response.getBody()).thenReturn(list);
Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.GET), Mockito.isNull(), Mockito.any(ParameterizedTypeReference.class))).thenReturn(response);
List<KeeperInstanceMeta> allKeepers = keeperContainerService.getAllKeepers("keeperContainerIp");
Assert.assertEquals(list, allKeepers);
}

@Test
public void resetKeepersTest() {
KeeperTransMeta keeperInstanceMeta = new KeeperInstanceMeta();
KeeperMeta meta = new KeeperMeta();
meta.setIp("");
keeperInstanceMeta.setKeeperMeta(meta);
RestTemplate restTemplate = Mockito.mock(RestTemplate.class);
keeperContainerService.setRestTemplate(restTemplate);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

HttpEntity<KeeperTransMeta> requestEntity = new HttpEntity<>(keeperInstanceMeta, headers);
Mockito.when(restTemplate.exchange(anyString(), eq(HttpMethod.POST), eq(requestEntity), eq(Void.class))).thenReturn(null);
keeperContainerService.resetKeepers(keeperInstanceMeta);
}

@Test
public void testUpdateKeeperContainerByInfoModelFail() {
KeeperContainerInfoModel keeper = keeperContainerService.findKeeperContainerInfoModelById(30);
Expand Down

0 comments on commit 97cb610

Please sign in to comment.