Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,15 @@ public class Config extends ConfigBase {
})
public static int backup_restore_batch_task_num_per_rpc = 10000;

@ConfField(mutable = true, masterOnly = true, description = {
"一个 BE 同时执行的恢复任务的并发数",
"The number of concurrent restore tasks per be"})
public static int restore_task_concurrency_per_be = 5000;

@ConfField(mutable = true, description = {"执行 agent task 时,BE心跳超过多长时间,认为BE不可用",
"The time after which BE is considered unavailable if the heartbeat is not received"})
public static int agent_task_be_unavailable_heartbeat_timeout_second = 300;

@ConfField(description = {"是否开启通过http接口获取log文件的功能",
"Whether to enable the function of getting log files through http interface"})
public static boolean enable_get_log_file_api = false;
Expand Down
42 changes: 16 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentBoundedBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
Expand Down Expand Up @@ -680,7 +681,7 @@ private void checkAndPrepareMeta() {
Map<Long, TabletRef> tabletBases = new HashMap<>();

// Check and prepare meta objects.
Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
Map<Long, AgentBoundedBatchTask> batchTaskPerTable = new HashMap<>();

// The tables that are restored but not committed, because the table name may be changed.
List<Table> stagingRestoreTables = Lists.newArrayList();
Expand Down Expand Up @@ -949,9 +950,10 @@ private void checkAndPrepareMeta() {
BackupPartitionInfo backupPartitionInfo
= jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());

AgentBatchTask batchTask = batchTaskPerTable.get(localTbl.getId());
AgentBoundedBatchTask batchTask = batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTask = new AgentBoundedBatchTask(
Config.backup_restore_batch_task_num_per_rpc, Config.restore_task_concurrency_per_be);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
Expand All @@ -965,9 +967,10 @@ private void checkAndPrepareMeta() {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart : restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId());
AgentBoundedBatchTask batchTask = batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTask = new AgentBoundedBatchTask(Config.backup_restore_batch_task_num_per_rpc,
Config.restore_task_concurrency_per_be);
batchTaskPerTable.put(restoreTbl.getId(), batchTask);
}
createReplicas(db, batchTask, restoreOlapTable, restorePart, tabletBases);
Expand Down Expand Up @@ -1019,7 +1022,6 @@ private void checkAndPrepareMeta() {
for (AgentTask task : batchTask.getAllTasks()) {
createReplicaTasksLatch.addMark(task.getBackendId(), task.getTabletId());
((CreateReplicaTask) task).setLatch(createReplicaTasksLatch);
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
}
Expand Down Expand Up @@ -1235,7 +1237,8 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
taskProgress.clear();
taskErrMsg.clear();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
Config.backup_restore_batch_task_num_per_rpc, Config.restore_task_concurrency_per_be);
db.readLock();
try {
for (Map.Entry<IdChain, IdChain> entry : fileMapping.getMapping().entrySet()) {
Expand Down Expand Up @@ -1277,10 +1280,6 @@ private void prepareAndSendSnapshotTaskForOlapTable(Database db) {
return;
}

// send tasks
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
LOG.info("finished to send snapshot tasks, num: {}. {}", batchTask.getTaskNum(), this);
}
Expand Down Expand Up @@ -1724,7 +1723,8 @@ private void downloadRemoteSnapshots() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
Config.backup_restore_batch_task_num_per_rpc, Config.restore_task_concurrency_per_be);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);

Expand Down Expand Up @@ -1853,10 +1853,6 @@ private void downloadRemoteSnapshots() {
}
}

// send task
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);

state = RestoreJobState.DOWNLOADING;
Expand All @@ -1876,7 +1872,8 @@ private void downloadLocalSnapshots() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
Config.backup_restore_batch_task_num_per_rpc, Config.restore_task_concurrency_per_be);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);

Expand Down Expand Up @@ -2019,10 +2016,6 @@ private void downloadLocalSnapshots() {
}
}

// send task
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);

state = RestoreJobState.DOWNLOADING;
Expand Down Expand Up @@ -2051,7 +2044,8 @@ private void commit() {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
AgentBatchTask batchTask = new AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
AgentBoundedBatchTask batchTask = new AgentBoundedBatchTask(
Config.backup_restore_batch_task_num_per_rpc, Config.restore_task_concurrency_per_be);
// tablet id->(be id -> download info)
for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
SnapshotInfo info = cell.getValue();
Expand All @@ -2063,10 +2057,6 @@ private void commit() {
unfinishedSignatureToId.put(signature, info.getTabletId());
}

// send task
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);

state = RestoreJobState.COMMITTING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private void finishCreateReplica(AgentTask task, TFinishTaskRequest request) {
createReplicaTask.countDownToZero(task.getBackendId() + ": "
+ request.getTaskStatus().getErrorMsgs().toString());
} else {
createReplicaTask.setFinished(true);
long tabletId = createReplicaTask.getTabletId();

if (request.isSetFinishTabletInfos()) {
Expand Down Expand Up @@ -590,6 +591,7 @@ private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest request)

private void finishMakeSnapshot(AgentTask task, TFinishTaskRequest request) {
SnapshotTask snapshotTask = (SnapshotTask) task;
task.setFinished(true);
if (snapshotTask.isCopyTabletTask()) {
snapshotTask.setResultSnapshotPath(request.getSnapshotPath());
snapshotTask.countDown(task.getBackendId(), task.getTabletId());
Expand All @@ -609,13 +611,15 @@ private void finishUpload(AgentTask task, TFinishTaskRequest request) {

private void finishDownloadTask(AgentTask task, TFinishTaskRequest request) {
DownloadTask downloadTask = (DownloadTask) task;
task.setFinished(true);
if (Env.getCurrentEnv().getBackupHandler().handleDownloadSnapshotTask(downloadTask, request)) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DOWNLOAD, task.getSignature());
}
}

private void finishMoveDirTask(AgentTask task, TFinishTaskRequest request) {
DirMoveTask dirMoveTask = (DirMoveTask) task;
task.setFinished(true);
if (Env.getCurrentEnv().getBackupHandler().handleDirMoveTask(dirMoveTask, request)) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.MOVE, task.getSignature());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@
public class AgentBatchTask implements Runnable {
private static final Logger LOG = LogManager.getLogger(AgentBatchTask.class);

private int batchSize = Integer.MAX_VALUE;
protected int batchSize = Integer.MAX_VALUE;

// backendId -> AgentTask List
private Map<Long, List<AgentTask>> backendIdToTasks;
protected Map<Long, List<AgentTask>> backendIdToTasks;

public AgentBatchTask() {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
Expand Down Expand Up @@ -246,7 +246,7 @@ private static void submitTasks(long backendId,
}
}

private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
protected TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
tAgentTaskRequest.setSignature(task.getSignature());
Expand Down
Loading
Loading