Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,9 @@ public String getPhysicalCluster(String clusterName) {
if (acg == null || System.currentTimeMillis() - acg.getUnavailableSince()
> policy.getFailoverFailureThreshold() * Config.heartbeat_interval_second * 1000) {
switchActiveStandby(cg, acgName, scgName);
String acgId = acg == null ? clusterNameToId.get(acgName) : acg.getId();
MetricRepo.increaseVirtualComputeGroupSwitch(cg.getId(), cg.getName(), acgId,
acgName, scg.getId(), scgName);
policy.setActiveComputeGroup(scgName);
policy.setStandbyComputeGroup(acgName);
cg.setNeedRebuildFileCache(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class CloudMetrics {
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_GLOBAL_BALANCE_NUM;
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM;
protected static AutoMappedMetric<LongCounterMetric> CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM;
protected static AutoMappedMetric<LongCounterMetric> VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER;

// Per-method meta-service RPC metrics
public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_TOTAL;
Expand Down Expand Up @@ -138,6 +139,10 @@ protected static void init() {
"cloud_warm_up_balance_num", MetricUnit.NOUNIT,
"current cluster cloud warm up cache sync edit log number"));

VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER = new AutoMappedMetric<>(name -> new LongCounterMetric(
"virtual_compute_group_switch_total", MetricUnit.NOUNIT,
"virtual compute group active standby switch count"));

// Per-method meta-service RPC metrics
META_SERVICE_RPC_TOTAL = MetricRepo.addLabeledMetrics("method", () ->
new LongCounterMetric("meta_service_rpc_total", MetricUnit.NOUNIT,
Expand Down
39 changes: 39 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,33 @@ public static void updateClusterCloudBalanceNum(String clusterName, String clust
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
}

public static void increaseVirtualComputeGroupSwitch(String virtualComputeGroupId, String virtualComputeGroupName,
String srcComputeGroupId, String srcComputeGroupName,
String dstComputeGroupId, String dstComputeGroupName) {
if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(virtualComputeGroupId)
|| Strings.isNullOrEmpty(virtualComputeGroupName) || Strings.isNullOrEmpty(srcComputeGroupId)
|| Strings.isNullOrEmpty(srcComputeGroupName) || Strings.isNullOrEmpty(dstComputeGroupId)
|| Strings.isNullOrEmpty(dstComputeGroupName)) {
return;
}
String key = virtualComputeGroupId + CloudMetrics.CLOUD_CLUSTER_DELIMITER + srcComputeGroupId
+ CloudMetrics.CLOUD_CLUSTER_DELIMITER + dstComputeGroupId;
LongCounterMetric counter = CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER.getOrAdd(key);
List<MetricLabel> labels = new ArrayList<>();
counter.increase(1L);
labels.add(new MetricLabel("virtual_compute_group_id", virtualComputeGroupId));
labels.add(new MetricLabel("virtual_compute_group_name", virtualComputeGroupName));
labels.add(new MetricLabel("src_compute_group_id", srcComputeGroupId));
labels.add(new MetricLabel("src_compute_group_name", srcComputeGroupName));
labels.add(new MetricLabel("dst_compute_group_id", dstComputeGroupId));
labels.add(new MetricLabel("dst_compute_group_name", dstComputeGroupName));
if (!counter.getLabels().isEmpty()) {
MetricRepo.DORIS_METRIC_REGISTER.removeMetricsByNameAndLabels(counter.getName(), counter.getLabels());
}
counter.setLabels(labels);
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(counter);
}

public static void unregisterCloudMetrics(String clusterId, String clusterName, List<Backend> backends) {
if (!MetricRepo.isInit || Config.isNotCloudMode() || Strings.isNullOrEmpty(clusterId)) {
return;
Expand Down Expand Up @@ -1961,6 +1988,18 @@ public static void unregisterCloudMetrics(String clusterId, String clusterName,
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(clusterCloudWarmUpBalanceNum.getName(), labels);

String delimiter = CloudMetrics.CLOUD_CLUSTER_DELIMITER;
for (String key : new ArrayList<>(
CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER.getMetrics().keySet())) {
if (key.startsWith(clusterId + delimiter) || key.contains(delimiter + clusterId + delimiter)
|| key.endsWith(delimiter + clusterId)) {
LongCounterMetric switchCounter = CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER.getOrAdd(key);
CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER.remove(key);
MetricRepo.DORIS_METRIC_REGISTER
.removeMetricsByNameAndLabels(switchCounter.getName(), switchCounter.getLabels());
}
}

METRIC_REGISTER.getHistograms().keySet().stream()
.filter(k -> k.contains(clusterId))
.forEach(METRIC_REGISTER::remove);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.ComputeGroup;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.MetaServiceProxy;
import org.apache.doris.common.Config;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,6 +46,7 @@ public class CloudSystemInfoServiceTest {
public void setUp() {
// Enable cloud mode for testing
Config.cloud_unique_id = "test_cloud_unique_id";
Config.meta_service_endpoint = "127.0.0.1:5000";
}

@Test
Expand Down Expand Up @@ -267,6 +274,65 @@ public void testGetPhysicalClusterStandby3AliveBe() {
Assert.assertEquals(pcgName2, res);
}

@Test
public void testGetPhysicalClusterSwitchActiveStandbyMetric() throws Exception {
infoService = new CloudSystemInfoService();

String vcgName = "v_cluster_1";
String vcgId = "id1";
String pcgName1 = "p_cluster_1";
String pcgName2 = "p_cluster_2";

ComputeGroup vcg = new ComputeGroup(vcgId, vcgName, ComputeGroup.ComputeTypeEnum.VIRTUAL);
ComputeGroup.Policy policy = new ComputeGroup.Policy();
policy.setActiveComputeGroup(pcgName1);
policy.setStandbyComputeGroup(pcgName2);
policy.setUnhealthyNodeThresholdPercent(100);
vcg.setPolicy(policy);

ComputeGroup pcg2 = new ComputeGroup("id3", pcgName2, ComputeGroup.ComputeTypeEnum.COMPUTE);
infoService.addComputeGroup(vcgId, vcg);
infoService.clusterNameToId.put(pcgName1, "id2");
infoService.addComputeGroup("id3", pcg2);

List<Backend> toAdd2 = new ArrayList<>();
for (int i = 0; i < 3; ++i) {
Backend b = new Backend(i + 4, "", i);
Map<String, String> newTagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
newTagMap.put(Tag.CLOUD_CLUSTER_NAME, pcgName2);
newTagMap.put(Tag.CLOUD_CLUSTER_ID, "id3");
b.setTagMap(newTagMap);
b.setAlive(true);
toAdd2.add(b);
}
infoService.updateCloudClusterMapNoLock(toAdd2, new ArrayList<>());
Assert.assertNull(infoService.getComputeGroupByName(pcgName1));
Assert.assertTrue(infoService.isComputeGroupAvailable(pcgName2, policy.getUnhealthyNodeThresholdPercent()));

CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
Mockito.when(cloudEnv.getCloudInstanceId()).thenReturn("instance_id");
MetaServiceProxy metaServiceProxy = Mockito.mock(MetaServiceProxy.class);
Cloud.AlterClusterResponse response = Cloud.AlterClusterResponse.newBuilder()
.setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
.setCode(Cloud.MetaServiceCode.OK)
.setMsg("OK"))
.build();
Mockito.when(metaServiceProxy.alterCluster(Mockito.any())).thenReturn(response);

try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class);
MockedStatic<MetaServiceProxy> mockedMetaServiceProxy = Mockito.mockStatic(MetaServiceProxy.class);
MockedStatic<MetricRepo> mockedMetricRepo = Mockito.mockStatic(MetricRepo.class)) {
mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
mockedMetaServiceProxy.when(MetaServiceProxy::getInstance).thenReturn(metaServiceProxy);

String res = infoService.getPhysicalCluster(vcgName);

Assert.assertEquals(pcgName2, res);
mockedMetricRepo.verify(() ->
MetricRepo.increaseVirtualComputeGroupSwitch(vcgId, vcgName, "id2", pcgName1, "id3", pcgName2));
}
}

// active has 1 alive be and 2 dead be, standby has 3 alive be
@Test
public void testGetPhysicalClusterActive1AliveBe2DeadBe() {
Expand Down
36 changes: 36 additions & 0 deletions fe/fe-core/src/test/java/org/apache/doris/metric/MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.metric;

import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.JsonUtil;
import org.apache.doris.metric.Metric.MetricUnit;
Expand Down Expand Up @@ -89,6 +90,41 @@ public void testUserQueryMetrics() {

}

@Test
public void testVirtualComputeGroupSwitchMetricRename() {
String originCloudUniqueId = Config.cloud_unique_id;
AutoMappedMetric<LongCounterMetric> originMetric = CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER;
try {
Config.cloud_unique_id = "test_cloud_unique_id";
CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER = new AutoMappedMetric<>(name -> new LongCounterMetric(
"virtual_compute_group_switch_total", MetricUnit.NOUNIT,
"virtual compute group active standby switch count"));

MetricRepo.increaseVirtualComputeGroupSwitch("virtual_id", "virtual_name",
"src_id", "src_old_name", "dst_id", "dst_name");
MetricRepo.increaseVirtualComputeGroupSwitch("virtual_id", "virtual_name",
"src_id", "src_new_name", "dst_id", "dst_name");

MetricVisitor visitor = new PrometheusMetricVisitor();
MetricRepo.DORIS_METRIC_REGISTER.accept(visitor);
String metricResult = visitor.finish();
Assert.assertTrue(metricResult.contains("# TYPE doris_fe_virtual_compute_group_switch_total counter"));
Assert.assertTrue(metricResult.contains("src_compute_group_name=\"src_new_name\""));
Assert.assertTrue(metricResult.contains("doris_fe_virtual_compute_group_switch_total"
+ "{virtual_compute_group_id=\"virtual_id\", virtual_compute_group_name=\"virtual_name\", "
+ "src_compute_group_id=\"src_id\", src_compute_group_name=\"src_new_name\", "
+ "dst_compute_group_id=\"dst_id\", dst_compute_group_name=\"dst_name\"} 2"));
Assert.assertFalse(metricResult.contains("src_compute_group_name=\"src_old_name\""));
} finally {
MetricRepo.DORIS_METRIC_REGISTER.removeMetrics("virtual_compute_group_switch_total");
if (CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER != null) {
CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER.getMetrics().clear();
}
CloudMetrics.VIRTUAL_COMPUTE_GROUP_SWITCH_COUNTER = originMetric;
Config.cloud_unique_id = originCloudUniqueId;
}
}

@Test
public void testGc() {
PrometheusMetricVisitor visitor = new PrometheusMetricVisitor();
Expand Down
Loading