Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dolphinscheduler-extract-common module #15266

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils;
import org.apache.dolphinscheduler.common.utils.LogUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
Expand All @@ -37,25 +35,15 @@
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -108,8 +96,7 @@ public Result<ResponseTaskLog> queryLog(User loginUser, int taskInstId, int skip
log.error("Host of task instance is null, taskInstanceId:{}.", taskInstId);
return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
}
Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId);
projectService.checkProjectAndAuthThrowException(loginUser, project, VIEW_LOG);
projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG);
Result<ResponseTaskLog> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance, skipLineNum, limit);
int lineNum = log.split("\\r\\n").length;
Expand Down Expand Up @@ -199,7 +186,6 @@ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
final String logPath = taskInstance.getLogPath();
final String host = taskInstance.getHost();
log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}",
taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath);
StringBuilder sb = new StringBuilder();
Expand All @@ -211,48 +197,24 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
sb.append(head);
}

String logContent = null;
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogPageQueryRequest logicTaskInstanceLogPageQueryRequest =
new LogicTaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
LogicTaskInstanceLogPageQueryResponse logicTaskInstanceLogPageQueryResponse =
masterLogService.pageQueryLogicTaskInstanceLog(logicTaskInstanceLogPageQueryRequest);
logContent = logicTaskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogPageQueryRequest taskInstanceLogPageQueryRequest =
new TaskInstanceLogPageQueryRequest(taskInstance.getId(), logPath, skipLineNum, limit);
TaskInstanceLogPageQueryResponse taskInstanceLogPageQueryResponse =
iWorkerLogService.pageQueryTaskInstanceLog(taskInstanceLogPageQueryRequest);
logContent = taskInstanceLogPageQueryResponse.getLogContent();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest.builder()
.taskInstanceId(taskInstance.getId())
.taskInstanceLogAbsolutePath(logPath)
.skipLineNum(skipLineNum)
.limit(limit)
.build();
TaskInstanceLogPageQueryResponse response = iLogService.pageQueryTaskInstanceLog(request);
String logContent = response.getLogContent();
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
} catch (Throwable ex) {
throw new ServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, ex);
}
if (logContent == null && RemoteLogUtils.isRemoteLoggingEnable()) {
// When getting the log for the first time (skipLineNum=0) returns empty, get the log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
List<String> lines = LogUtils.readPartFileContentFromLocal(logPath, skipLineNum, limit);
logContent = LogUtils.rollViewLogLines(lines);
FileUtils.delete(new File(logPath));
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
}
if (logContent != null) {
sb.append(logContent);
}
return sb.toString();
}

/**
Expand All @@ -271,45 +233,19 @@ private byte[] getLogBytes(TaskInstance taskInstance) {
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);

byte[] logBytes = new byte[0];
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
try {
LogicTaskInstanceLogFileDownloadRequest logicTaskInstanceLogFileDownloadRequest =
new LogicTaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
LogicTaskInstanceLogFileDownloadResponse logicTaskInstanceLogFileDownloadResponse =
masterLogService.getLogicTaskInstanceWholeLogFileBytes(logicTaskInstanceLogFileDownloadRequest);
logBytes = logicTaskInstanceLogFileDownloadResponse.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
} else {
IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(host, IWorkerLogService.class);
try {
TaskInstanceLogFileDownloadRequest taskInstanceLogFileDownloadRequest =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse taskInstanceWholeLogFileBytes =
iWorkerLogService.getTaskInstanceWholeLogFileBytes(taskInstanceLogFileDownloadRequest);
logBytes = taskInstanceWholeLogFileBytes.getLogBytes();
} catch (Exception ex) {
log.error("Query LogicTaskInstance log error", ex);
}
}

if ((logBytes == null || logBytes.length == 0) && RemoteLogUtils.isRemoteLoggingEnable()) {
// get task log from remote target
try {
log.info("Get log {} from remote target", logPath);
RemoteLogUtils.getRemoteLog(logPath);
File logFile = new File(logPath);
logBytes = FileUtils.readFileToByteArray(logFile);
FileUtils.delete(logFile);
} catch (IOException e) {
log.error("Error while getting log from remote target", e);
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
try {
TaskInstanceLogFileDownloadRequest request =
new TaskInstanceLogFileDownloadRequest(taskInstance.getId(), logPath);
TaskInstanceLogFileDownloadResponse response = iLogService.getTaskInstanceWholeLogFileBytes(request);
logBytes = response.getLogBytes();
return Bytes.concat(head, logBytes);
} catch (Exception ex) {
log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex);
throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR);
}

return Bytes.concat(head, logBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory;
import org.apache.dolphinscheduler.extract.master.IMasterLogService;
import org.apache.dolphinscheduler.extract.common.ILogService;
import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import org.apache.dolphinscheduler.extract.worker.IWorkerLogService;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillResponse;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -381,18 +379,9 @@ public void deleteByWorkflowInstanceId(Integer workflowInstanceId) {
return;
}
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
if (TaskUtils.isLogicTask(taskInstance.getTaskType())) {
IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IMasterLogService.class);
masterLogService.removeLogicTaskInstanceLog(taskInstance.getLogPath());
} else {
IWorkerLogService workerLogService = SingletonJdkDynamicRpcClientProxyFactory
.getProxyClient(taskInstance.getHost(), IWorkerLogService.class);
workerLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}
}
ILogService iLogService =
SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(taskInstance.getHost(), ILogService.class);
iLogService.removeTaskInstanceLog(taskInstance.getLogPath());
}

dqExecuteResultDao.deleteByWorkflowInstanceId(workflowInstanceId);
Expand Down
Loading
Loading