Skip to content

Commit

Permalink
自动匀keeeper-执行迁移计划:主备切换添加验证
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 28, 2024
1 parent 43b57ca commit fd90a6e
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;
import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel;
import com.ctrip.xpipe.redis.console.service.KeeperContainerAnalyzerService;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
Expand Down Expand Up @@ -55,7 +56,11 @@ public List<MigrationKeeperContainerDetailModel> getMigrationPlans(Map<String, K
keeperContainerAnalyzerService.initStandard(models);
List<KeeperContainerUsedInfoModel> modelsWithoutResource = new ArrayList<>();
models.forEach(a -> modelsWithoutResource.add(KeeperContainerUsedInfoModel.cloneKeeperContainerUsedInfoModel(a)));
analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain, metaCache.getXpipeMeta().getDcs().get(currentDc));
DcMeta dcMeta = null;
if (metaCache.getXpipeMeta() != null) {
dcMeta = metaCache.getXpipeMeta().getDcs().get(currentDc);
}
analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain, dcMeta);
analyzerContext.initKeeperPairData(modelsWithoutResource, modelsMap);
analyzerContext.initAvailablePool(modelsWithoutResource);
for (KeeperContainerUsedInfoModel model : modelsWithoutResource) {
Expand All @@ -82,7 +87,7 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod
}
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, dcClusterShard, backUpKeeper, (Boolean) cause[1], false);
if (bestKeeperContainer == null) {
break;
continue;
}
analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper);
analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]);
Expand All @@ -105,7 +110,7 @@ private void generatePairOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod
KeeperContainerUsedInfoModel backUpKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelB : modelA;
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, dcClusterShard, activeKeeperContainer, (Boolean) cause[1], true);
if (bestKeeperContainer == null) {
break;
continue;
}
if (!filterChain.isMigrateKeeperPairOverload(dcClusterShard, backUpKeeperContainer, bestKeeperContainer, analyzerContext)) {
analyzerContext.addMigrationPlan(backUpKeeperContainer, bestKeeperContainer, false, true, (String) cause[0], dcClusterShard, activeKeeperContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class DefaultKeeperContainerUsedInfoAnalyzerContext implements KeeperCont
public DefaultKeeperContainerUsedInfoAnalyzerContext(KeeperContainerFilterChain filterChain, DcMeta dcMeta) {
this.filterChain = filterChain;
this.dcMeta = dcMeta;
if (dcMeta == null) {
logger.warn("[DefaultKeeperContainerUsedInfoAnalyzerContext dcMeta is null!");
}
}

@Override
Expand Down Expand Up @@ -120,7 +123,10 @@ public void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer,
}

@Override
public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel srcKeeper, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isActiveEntry) {
public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel srcKeeper, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isMigrateShardBackUp) {
if (srcKeeper == null || srcKeeperPair == null) {
return null;
}
String org = srcKeeper.getOrg();
String az = srcKeeper.getAz();
PriorityQueue<KeeperContainerUsedInfoModel> queue = isPeerDataOverload ? minPeerDataKeeperContainers : minInputFlowKeeperContainers;
Expand All @@ -130,8 +136,8 @@ public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedIn
if ((org == null || org.equals(target.getOrg()))
&& (az == null || az.equals(target.getAz()))
&& !Objects.equals(target.getKeeperIp(), srcKeeperPair.getKeeperIp())
&& ((!isActiveEntry && filterChain.canMigrate(dcClusterShard, srcKeeperPair, target, this))
|| (isActiveEntry && !filterChain.isMigrateKeeperPairOverload(dcClusterShard, srcKeeperPair, target, this)))) {
&& ((!isMigrateShardBackUp && filterChain.canMigrate(dcClusterShard, srcKeeperPair, target, this))
|| (isMigrateShardBackUp && !filterChain.isMigrateKeeperPairOverload(dcClusterShard, srcKeeperPair, target, this)))) {
return target;
}
temp.add(target);
Expand Down Expand Up @@ -165,6 +171,9 @@ public void initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap, M
}

private void getProblemKeeperContainer(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> entry) {
if (dcMeta == null) {
return;
}
dcMeta.getClusters().values().forEach(clusterMeta -> {
if (entry.getKey().getClusterId().equals(clusterMeta.getId())) {
clusterMeta.getShards().values().forEach(shardMeta -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface KeeperContainerUsedInfoAnalyzerContext {

void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, boolean isPeerDataOverload);

KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isActiveEntry);
KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, KeeperContainerUsedInfoModel srcKeeperPair, boolean isPeerDataOverload, boolean isMigrateShardBackUp);

String getBackUpKeeperIp(DcClusterShard activeKeeper);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,11 @@ public void beginMigrateKeeperContainers(List<MigrationKeeperContainerDetailMode
event = KEEPER_MIGRATION_BACKUP_FAIL;
}
}else {
try {
shardModelService.migrateAutoBalanceKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
srcKeeperContainerIp, targetKeeperContainerIp);
if (shardModelService.migrateAutoBalanceKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
srcKeeperContainerIp, targetKeeperContainerIp)) {
keeperContainer.migrateKeeperCompleteCountIncrease();
event = KEEPER_MIGRATION_ACTIVE_START_SUCCESS;
} catch (Throwable e) {
} else {
event = KEEPER_MIGRATION_ACTIVE_START_FAIL;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ boolean migrateShardKeepers(String dcName, String clusterName, ShardModel shardM

boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel);

void migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel,
boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel,
String srcKeeperContainerIp, String targetKeeperContainerIp);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ public class ShardModelServiceImpl implements ShardModelService{
private final ExecutorService FIXED_THREAD_POOL = Executors
.newFixedThreadPool(6, XpipeThreadFactory.create(getClass().getSimpleName()));

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(20);
private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(20);

private final long SWITCH_MASTER_CHECK_INTERVAL = 1000;

private final int SWITCH_MASTER_CHECK_TIMES = 10;

private final long KEEPER_BALANCE_FULL_SYNC_EXPIRE_TIME = 10L * 60 * 1000;

private final long KEEPER_BALANCE_FULL_SYNC_INTERVAL_TIME = 1000;

@Override
public List<ShardModel> getAllShardModel(String dcName, String clusterName) {
Expand Down Expand Up @@ -254,35 +262,71 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel
}

@Override
public boolean switchMaster(String srcIp, String targetIp, ShardModel shardModel) {
public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) {
try {
List<RedisTbl> keepers = shardModel.getKeepers();
int srcKeeperPort = keepers.stream()
.filter(r -> r.getRedisIp().equals(srcIp))
.filter(r -> r.getRedisIp().equals(activeIp))
.findFirst()
.map(RedisTbl::getRedisPort)
.orElseThrow(() -> new RuntimeException("No source keeper found"));

String targetKeeperIp = keepers.stream()
.filter(r -> !r.getRedisIp().equals(srcIp))
.filter(r -> !r.getRedisIp().equals(activeIp))
.findFirst()
.map(RedisTbl::getRedisIp)
.orElseThrow(() -> new RuntimeException("No target keeper found"));

if (!targetKeeperIp.equals(targetIp)) {
if (!targetKeeperIp.equals(backupIp)) {
return false;
}

KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(srcIp).stream()
KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(activeIp).stream()
.filter(k -> k.getKeeperMeta().getPort() == srcKeeperPort)
.findFirst()
.orElseThrow(() -> new RuntimeException("No keeper instance found"));

keeperContainerService.resetKeepers(keeperInstanceMeta);
return checkKeeperActive(activeIp, srcKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES);

} catch (Exception e) {
return false;
}
return true;
}

private boolean checkKeeperActive(String ip, int port, boolean expectActive, long interval, int maxRetryTimes) {
DefaultEndPoint activeKey = new DefaultEndPoint(ip, port);
InfoCommand infoCommand = generteInfoCommand(activeKey);
final boolean[] isMaster = new boolean[1];
try {
int time = 0;
while (time < maxRetryTimes){
time ++;
addHookAndExecute(infoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
isMaster[0] = new InfoResultExtractor(message).getKeeperActive();
}

@Override
public void fail(Throwable throwable) {
logger.error("[switchMaster] ", throwable);
}
});
if (isMaster[0] == expectActive) break;
Thread.sleep(interval);
}
return !expectActive ^ isMaster[0];
} catch (Exception e) {
logger.error("[switchMaster] check keeper active error", e);
return false;
} finally {
try {
keyedObjectPool.clear(activeKey);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error", e);
}
}
}

@Resource(name = REDIS_COMMAND_EXECUTOR)
Expand All @@ -308,26 +352,30 @@ public InfoCommand generteInfoCommand(Endpoint key) {
}

@Override
public void migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) {
public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) {
List<RedisTbl> oldKeepers = shardModel.getKeepers();
List<RedisTbl> newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel,
srcKeeperContainerIp, targetKeeperContainerIp, true);
if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) {
throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel));
}
shardModel.setKeepers(newKeepers);
RedisTbl active = newKeepers.get(0);
RedisTbl backup = newKeepers.get(1);
DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort());
DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort());
InfoCommand activeInfoCommand = generteInfoCommand(activeKey);
InfoCommand backupInfoCommand = generteInfoCommand(backupKey);
long expireTime = 10L * 60 * 1000;
long intervalTime = 1000;

FullSyncJudgeTask task = new FullSyncJudgeTask(active.getRedisIp(), backup.getRedisIp(), activeInfoCommand, backupInfoCommand, KEEPER_BALANCE_FULL_SYNC_EXPIRE_TIME, KEEPER_BALANCE_FULL_SYNC_INTERVAL_TIME,
dcName, clusterName, shardModel);
try {
FullSyncJudgeTask task = new FullSyncJudgeTask(active.getRedisIp(), backup.getRedisIp(), activeInfoCommand, backupInfoCommand, expireTime, intervalTime,
dcName, clusterName, shardModel);
ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(task, 1000, 1000, TimeUnit.MILLISECONDS);
ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(task, 0,1000, TimeUnit.MILLISECONDS);
task.setScheduledFuture(scheduledFuture);
scheduledFuture.get();
return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers);
} catch (Exception e) {
logger.error("[clear] clear keyed object pool error", e);
return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers);
} finally {
try {
keyedObjectPool.clear(activeKey);
Expand All @@ -338,6 +386,16 @@ public void migrateAutoBalanceKeepers(String dcName, String clusterName, ShardMo
}
}

private boolean getAutoBalanceResult(FullSyncJudgeTask task, String dcName, String clusterName, ShardModel shardModel, List<RedisTbl> oldKeepers) {
if (task.getResult()) {
return true;
}
if (!doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers)) {
throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel));
}
return false;
}

private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel shardModel, List<RedisTbl> newKeepers) {
if (newKeepers == null) {
logger.debug("[migrateKeepers] no need to replace keepers");
Expand Down Expand Up @@ -398,7 +456,6 @@ public void run() {
if (startTime == 0) {
startTime = System.currentTimeMillis();
}
isSuccess = true;
addHookAndExecute(activeInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
Expand All @@ -407,7 +464,7 @@ public void success(String message) {

@Override
public void fail(Throwable throwable) {
isSuccess = false;
backupMasterReplOffset = -1;
}
});

Expand All @@ -425,15 +482,16 @@ public void success(String message) {

@Override
public void fail(Throwable throwable) {
isSuccess = false;
backupMasterReplOffset = -1;
}
});

if (isSuccess && backupMasterReplOffset > activeMasterReplOffset) {
switchMaster(backUpIp, activeIp, shardModel);
if (backupMasterReplOffset > activeMasterReplOffset) {
isSuccess = true;
switchMaster(activeIp, backUpIp, shardModel);
CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION_ACTIVE_SUCCESS,
String.format("activeKeeper:%s, backupKeeper:%s, dc:%s, cluster:%s, shard:%s",
activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName()));
scheduledFuture.cancel(true);
Thread.currentThread().interrupt();
}
if (System.currentTimeMillis() - startTime > expireTime && !isSuccess) {
Expand All @@ -442,31 +500,49 @@ public void fail(Throwable throwable) {
activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName()));
scheduledFuture.cancel(true);
}

}

private <V> CommandFuture<V> addHookAndExecute(AbstractRedisCommand<V> command, Callbackable<V> callback) {
silentCommand(command);
CommandFuture<V> future = command.execute();
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
callback.fail(commandFuture.cause());
} else {
callback.success(commandFuture.get());
}
}
});
return future;
public boolean getResult() {
return isSuccess;
}

private void silentCommand(LoggableRedisCommand command) {
command.logRequest(false);
command.logResponse(false);
@VisibleForTesting
public void setBackupMasterReplOffset(long offset) {
this.backupMasterReplOffset = offset;
}

}
private <V> void addHookAndExecute(AbstractRedisCommand<V> command, Callbackable<V> callback) {
silentCommand(command);
CommandFuture<V> future = command.execute();
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
callback.fail(commandFuture.cause());
} else {
callback.success(commandFuture.get());
}
}
});
try {
future.get();
} catch (Exception e){
throw new RuntimeException(e);
}
}

private void silentCommand(LoggableRedisCommand command) {
command.logRequest(false);
command.logResponse(false);

}

@VisibleForTesting
public void setExecutor(ScheduledThreadPoolExecutor executor) {
this.executor = executor;
}


}
Loading

0 comments on commit fd90a6e

Please sign in to comment.