Skip to content

Commit

Permalink
Merge pull request #2177 from wangyu096/issue_1697_3.7.x
Browse files Browse the repository at this point in the history
perf: 作业包含大量主机,执行作业请求响应时间过长 #1697
  • Loading branch information
jsonwan committed Jun 29, 2023
2 parents 40c26f7 + bae63ff commit 7b4159a
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 99 deletions.
Expand Up @@ -25,44 +25,67 @@
package com.tencent.bk.job.common.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* 分批工具
* 常用的集合工具
*/
public class BatchUtil {
public class CollectionUtil {
/**
* 分批
* 对 List 进行分片,每一分片的大小相同(最后一个分片可能小于)
*
* @param elements 用于分批的集合
* @param batchSize 每一批的最大数量
* @return 分批结果
* @param list 需要分片的集合
* @param size 分片大小
* @return 集合分片
*/
public static <E> List<List<E>> buildBatchList(List<E> elements, int batchSize) {
public static <E> List<List<E>> partitionList(List<E> list, int size) {
List<List<E>> batchList = new ArrayList<>();
int total = elements.size();
if (total <= batchSize) {
batchList.add(elements);
int total = list.size();
if (total <= size) {
batchList.add(list);
return batchList;
}

int batch = total / batchSize;
int left = total % batchSize;
int batch = total / size;
int left = total % size;
if (left > 0) {
batch += 1;
}
int startIndex = 0;
for (int i = 1; i <= batch; i++) {
List<E> subList;
if (i == batch && left > 0) {
subList = elements.subList(startIndex, startIndex + left);
subList = list.subList(startIndex, startIndex + left);
startIndex += left;
} else {
subList = elements.subList(startIndex, startIndex + batchSize);
startIndex += batchSize;
subList = list.subList(startIndex, startIndex + size);
startIndex += size;
}
batchList.add(subList);
}
return batchList;
}

/**
* 对集合进行分片,每一分片的大小相同(最后一个分片可能小于)
*
* @param collection 需要分片的集合
* @param size 分片大小
* @return 集合分片
*/
public static <E> List<List<E>> partitionCollection(Collection<E> collection, int size) {
int limit = (collection.size() + size - 1) / size;
return Stream.iterate(0, n -> n + 1)
.limit(limit)
.parallel()
.map(a -> collection.stream()
.skip(a * size)
.limit(size)
.parallel()
.collect(Collectors.toList()))
.collect(Collectors.toList());
}
}

This file was deleted.

@@ -0,0 +1,141 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.util;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

class CollectionUtilTest {
@Test
void testPartitionList() {
List<String> list = new ArrayList<>();
list.add("a");
List<List<String>> partitionLists = CollectionUtil.partitionList(list, 1);
assertThat(partitionLists).hasSize(1);
assertThat(partitionLists.get(0)).hasSize(1);

list.add("b");
partitionLists = CollectionUtil.partitionList(list, 3);
assertThat(partitionLists).hasSize(1);
assertThat(partitionLists.get(0)).hasSize(2);

list.add("c");
partitionLists = CollectionUtil.partitionList(list, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(1);

list.add("d");
partitionLists = CollectionUtil.partitionList(list, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(2);
}

@Nested
class TestPartitionCollection {
@Test
void testPartitionList() {
List<String> list = new ArrayList<>();
list.add("a");
List<List<String>> partitionLists = CollectionUtil.partitionCollection(list, 1);
assertThat(partitionLists).hasSize(1);
assertThat(partitionLists.get(0)).hasSize(1);

list.add("b");
partitionLists = CollectionUtil.partitionCollection(list, 3);
assertThat(partitionLists).hasSize(1);
assertThat(partitionLists.get(0)).hasSize(2);

list.add("c");
partitionLists = CollectionUtil.partitionCollection(list, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(1);

list.add("d");
partitionLists = CollectionUtil.partitionCollection(list, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(2);
}

@Test
void testPartitionHashSet() {
Set<String> set1 = new HashSet<>();
set1.add("a");
List<List<String>> partitionLists = CollectionUtil.partitionCollection(set1, 1);
assertThat(partitionLists).hasSize(1);
assertThat(partitionLists.get(0)).hasSize(1);

Set<String> set2 = new HashSet<>();
set2.add("a");
set2.add("b");
partitionLists = CollectionUtil.partitionCollection(set2, 3);
assertThat(partitionLists).hasSize(1);

Set<String> set3 = new HashSet<>();
set3.add("a");
set3.add("b");
set3.add("c");
partitionLists = CollectionUtil.partitionCollection(set3, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(1);

Set<String> set4 = new HashSet<>();
set4.add("a");
set4.add("b");
set4.add("c");
set4.add("d");
partitionLists = CollectionUtil.partitionCollection(set4, 2);
assertThat(partitionLists).hasSize(2);
assertThat(partitionLists.get(0)).hasSize(2);
assertThat(partitionLists.get(1)).hasSize(2);

Set<String> set5 = new HashSet<>();
set5.add("a");
set5.add("b");
set5.add("c");
set5.add("d");
set5.add("e");
set5.add("f");
set5.add("g");
partitionLists = CollectionUtil.partitionCollection(set5, 3);
assertThat(partitionLists).hasSize(3);
assertThat(partitionLists.get(0)).hasSize(3);
assertThat(partitionLists.get(1)).hasSize(3);
assertThat(partitionLists.get(2)).hasSize(1);
}

}
}
Expand Up @@ -27,6 +27,7 @@
import com.tencent.bk.job.common.model.BaseSearchCondition;
import com.tencent.bk.job.common.model.PageData;
import com.tencent.bk.job.common.model.dto.HostDTO;
import com.tencent.bk.job.common.util.CollectionUtil;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.common.constants.TaskStartupModeEnum;
import com.tencent.bk.job.execute.common.constants.TaskTypeEnum;
Expand Down Expand Up @@ -581,20 +582,26 @@ public List<Long> listTaskInstanceId(Long appId, Long fromTime, Long toTime, int
@Override
public void saveTaskInstanceHosts(long taskInstanceId,
Collection<HostDTO> hosts) {
BatchBindStep batchInsert = ctx.batch(
ctx.insertInto(TASK_INSTANCE_HOST, TASK_INSTANCE_HOST.TASK_INSTANCE_ID,
TASK_INSTANCE_HOST.HOST_ID, TASK_INSTANCE_HOST.IP, TASK_INSTANCE_HOST.IPV6)
.values((Long) null, null, null, null)
);

for (HostDTO host : hosts) {
batchInsert = batchInsert.bind(
taskInstanceId,
host.getHostId(),
host.getIp(),
host.getIpv6()
);
if (CollectionUtils.isEmpty(hosts)) {
return;
}
batchInsert.execute();
List<List<HostDTO>> hostBatches = CollectionUtil.partitionCollection(hosts, 2000);
hostBatches.forEach(batchHosts -> {
BatchBindStep batchInsert = ctx.batch(
ctx.insertInto(TASK_INSTANCE_HOST, TASK_INSTANCE_HOST.TASK_INSTANCE_ID,
TASK_INSTANCE_HOST.HOST_ID, TASK_INSTANCE_HOST.IP, TASK_INSTANCE_HOST.IPV6)
.values((Long) null, null, null, null)
);

for (HostDTO host : batchHosts) {
batchInsert = batchInsert.bind(
taskInstanceId,
host.getHostId(),
host.getIp(),
host.getIpv6()
);
}
batchInsert.execute();
});
}
}
Expand Up @@ -31,7 +31,7 @@
import com.tencent.bk.job.common.gse.v2.model.ScriptAgentTaskResult;
import com.tencent.bk.job.common.gse.v2.model.ScriptTaskResult;
import com.tencent.bk.job.common.model.dto.HostDTO;
import com.tencent.bk.job.common.util.BatchUtil;
import com.tencent.bk.job.common.util.CollectionUtil;
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.execute.constants.VariableValueTypeEnum;
Expand Down Expand Up @@ -192,7 +192,7 @@ private void initLogPullProcess(Collection<AgentTaskDTO> agentTasks) {
GseLogBatchPullResult<ScriptTaskResult> pullGseTaskResultInBatches() {
if (pullAgentIdBatches.isEmpty()) {
List<String> queryAgentIdList = new ArrayList<>(notFinishedTargetAgentIds);
pullAgentIdBatches = BatchUtil.buildBatchList(queryAgentIdList, currentBatchSize);
pullAgentIdBatches = CollectionUtil.partitionList(queryAgentIdList, currentBatchSize);
}
return tryPullGseResultWithRetry();
}
Expand Down Expand Up @@ -260,7 +260,7 @@ private boolean tryReduceBatchSizeAndRebuildBatchList() {
}
pullAgentIdBatches.subList(pullResultBatchesIndex.get() - 1, pullAgentIdBatches.size())
.forEach(leftAgentIds::addAll);
newBatchList.addAll(BatchUtil.buildBatchList(leftAgentIds, currentBatchSize));
newBatchList.addAll(CollectionUtil.partitionList(leftAgentIds, currentBatchSize));
pullAgentIdBatches = newBatchList;
return true;
}
Expand Down
Expand Up @@ -28,7 +28,7 @@
import com.tencent.bk.job.common.constant.JobConstants;
import com.tencent.bk.job.common.model.dto.HostDTO;
import com.tencent.bk.job.common.redis.util.LockUtils;
import com.tencent.bk.job.common.util.BatchUtil;
import com.tencent.bk.job.common.util.CollectionUtil;
import com.tencent.bk.job.common.util.date.DateUtils;
import com.tencent.bk.job.common.util.file.ZipUtil;
import com.tencent.bk.job.common.util.json.JsonUtils;
Expand Down Expand Up @@ -380,7 +380,7 @@ void addHost(HostDTO host) {
}

void batchHosts() {
hostBatches = BatchUtil.buildBatchList(hosts, MAX_BATCH_IPS);
hostBatches = CollectionUtil.partitionList(hosts, MAX_BATCH_IPS);
}
}
}

0 comments on commit 7b4159a

Please sign in to comment.