From f66eaa6c62568c709f2f7338eef6e65d50b1763a Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 5 Mar 2024 12:15:19 +0800 Subject: [PATCH] [INLONG-9768] Optimize flink job building and manage procedure --- .../manager/plugin/flink/FlinkOperation.java | 80 ++++--------- .../manager/plugin/flink/FlinkService.java | 110 +++++++++++------ .../plugin/flink/IntegrationTaskRunner.java | 14 +-- .../plugin/listener/DeleteSortListener.java | 4 +- .../plugin/listener/RestartSortListener.java | 5 +- .../plugin/listener/StartupSortListener.java | 6 +- .../listener/StartupStreamListener.java | 6 +- .../plugin/listener/SuspendSortListener.java | 5 +- .../plugin/poller/SortStatusPoller.java | 4 +- .../plugin/util/FlinkConfiguration.java | 113 ------------------ .../plugin/util/FlinkServiceUtils.java | 59 --------- .../manager/plugin/util/FlinkUtils.java | 113 ++++++++++-------- 12 files changed, 175 insertions(+), 344 deletions(-) delete mode 100644 inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java delete mode 100644 inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java index 9f5d9f7708f..bf2ece030bc 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java @@ -68,10 +68,14 @@ public class FlinkOperation { private static final String CONNECTOR_JAR_PATTERN = "^sort-connector-(?i)(%s).*jar$"; private static final String ALL_CONNECTOR_JAR_PATTERN = "^sort-connector-.*jar$"; private static Properties properties; - private final FlinkService flinkService; - public FlinkOperation(FlinkService flinkService) { - this.flinkService = flinkService; + private static class FlinkOperationHolder { + + private static final FlinkOperation INSTANCE = new FlinkOperation(); + } + + public static FlinkOperation getInstance() { + return FlinkOperationHolder.INSTANCE; } /** @@ -103,16 +107,15 @@ private String getConnectorJarPattern(String dataSourceType) { * Restart the Flink job. */ public void restart(FlinkInfo flinkInfo) throws Exception { - String jobId = flinkInfo.getJobId(); - boolean terminated = isNullOrTerminated(jobId); + boolean terminated = isNullOrTerminated(flinkInfo); if (terminated) { - String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId); + String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId()); log.error(message); throw new Exception(message); } Future future = TaskRunService.submit( - new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.RESTART.getCode())); + new IntegrationTaskRunner(flinkInfo, TaskCommitType.RESTART.getCode())); future.get(); } @@ -120,24 +123,25 @@ public void restart(FlinkInfo flinkInfo) throws Exception { * Start the Flink job, if the job id was not empty, restore it. */ public void start(FlinkInfo flinkInfo) throws Exception { - String jobId = flinkInfo.getJobId(); try { // Start a new task without savepoint - if (StringUtils.isEmpty(jobId)) { - IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo, + if (StringUtils.isEmpty(flinkInfo.getJobId())) { + IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkInfo, TaskCommitType.START_NOW.getCode()); Future future = TaskRunService.submit(taskRunner); future.get(); } else { // Restore an old task with savepoint - boolean noSavepoint = isNullOrTerminated(jobId) || StringUtils.isEmpty(flinkInfo.getSavepointPath()); + boolean noSavepoint = + isNullOrTerminated(flinkInfo) || StringUtils.isEmpty(flinkInfo.getSavepointPath()); if (noSavepoint) { - String message = String.format("restore job failed, as " + JOB_TERMINATED_MSG, jobId); + String message = + String.format("restore job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId()); log.error(message); throw new Exception(message); } - IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo, + IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkInfo, TaskCommitType.RESUME.getCode()); Future future = TaskRunService.submit(taskRunner); future.get(); @@ -148,37 +152,6 @@ public void start(FlinkInfo flinkInfo) throws Exception { } } - /** - * Check whether there are duplicate NodeIds in different relations. - *

- * The JSON data in the dataflow is in the reverse order of the nodes in the actual dataflow. - * For example, data flow A -> B -> C, the generated topological relationship is [[B,C],[A,B]], - * then the input node B in the first relation [B,C] is the second output node B in relation [A,B]. - *

- * The example of dataflow: - *

-     * {
-     *     "groupId": "test_group",
-     *     "streams": [
-     *         {
-     *             "streamId": "test_stream",
-     *             "relations": [
-     *                 {
-     *                     "type": "baseRelation",
-     *                     "inputs": [ "node_3" ],
-     *                     "outputs": [ "node_4" ]
-     *                 },
-     *                 {
-     *                     "type": "innerJoin",
-     *                     "inputs": [ "node_1", "node_2" ],
-     *                     "outputs": [ "node_3"  ]
-     *                 }
-     *             ]
-     *         }
-     *     ]
-     * }
-     * 
- */ private void checkNodeIds(String dataflow) throws Exception { JsonNode relations = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS) .get(0).get(InlongConstants.RELATIONS); @@ -293,16 +266,15 @@ public void genPath(FlinkInfo flinkInfo, String dataflow) throws Exception { * Stop the Flink job. */ public void stop(FlinkInfo flinkInfo) throws Exception { - String jobId = flinkInfo.getJobId(); - boolean terminated = isNullOrTerminated(jobId); + boolean terminated = isNullOrTerminated(flinkInfo); if (terminated) { - String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, jobId); + String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId()); log.error(message); throw new Exception(message); } Future future = TaskRunService.submit( - new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.STOP.getCode())); + new IntegrationTaskRunner(flinkInfo, TaskCommitType.STOP.getCode())); future.get(); } @@ -311,7 +283,7 @@ public void stop(FlinkInfo flinkInfo) throws Exception { */ public void delete(FlinkInfo flinkInfo) throws Exception { String jobId = flinkInfo.getJobId(); - JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId); + JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo); if (jobDetailsInfo == null) { throw new Exception(String.format("delete job failed as the job [%s] not found", jobId)); } @@ -324,7 +296,7 @@ public void delete(FlinkInfo flinkInfo) throws Exception { } Future future = TaskRunService.submit( - new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.DELETE.getCode())); + new IntegrationTaskRunner(flinkInfo, TaskCommitType.DELETE.getCode())); future.get(); } @@ -343,7 +315,7 @@ public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus) throws Ex int retryTimes = 0; while (retryTimes <= TRY_MAX_TIMES) { try { - JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId); + JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo); if (jobDetailsInfo == null) { log.error("job detail not found by {}", jobId); throw new Exception(String.format("job detail not found by %s", jobId)); @@ -371,11 +343,11 @@ public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus) throws Ex /** * Check whether the job was terminated by the given job id. */ - private boolean isNullOrTerminated(String jobId) throws Exception { - JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId); + private boolean isNullOrTerminated(FlinkInfo flinkInfo) throws Exception { + JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo); boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null; if (terminated) { - log.warn("job detail or job status was null for [{}]", jobId); + log.warn("job detail or job status was null for [{}]", flinkInfo.getJobId()); return true; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index 37322798897..69a32b73fa5 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -22,8 +22,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest; import org.apache.inlong.manager.plugin.flink.enums.Constants; -import org.apache.inlong.manager.plugin.util.FlinkConfiguration; -import org.apache.inlong.manager.plugin.util.FlinkServiceUtils; +import org.apache.inlong.manager.plugin.util.FlinkUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -64,54 +63,47 @@ public class FlinkService { private final FlinkConfig flinkConfig; private final Integer parallelism; private final String savepointDirectory; - private final Configuration configuration; - private final FlinkClientService clientService; + // map endpoint to Configuration + private final Map configurations = new HashMap<>(); + // map Configuration to FlinkClientService + private final Map flinkClientServices = new HashMap<>(); /** * Constructor of FlinkService. */ - public FlinkService(String endpoint) throws Exception { - FlinkConfiguration flinkConfiguration = new FlinkConfiguration(); - flinkConfig = flinkConfiguration.getFlinkConfig(); + public FlinkService() throws Exception { + flinkConfig = FlinkUtils.getFlinkConfigFromFile(); parallelism = flinkConfig.getParallelism(); savepointDirectory = flinkConfig.getSavepointDirectory(); + } + + private static class FlinkServiceHolder { - configuration = new Configuration(); - Integer jobManagerPort = flinkConfig.getJobManagerPort(); - configuration.setInteger(JobManagerOptions.PORT, jobManagerPort); - - Integer port; - String address; - if (StringUtils.isEmpty(endpoint)) { - address = flinkConfig.getAddress(); - port = flinkConfig.getPort(); - } else { - Map ipPort = translateFromEndpoint(endpoint); - if (ipPort.isEmpty()) { - throw new BusinessException("get address:port failed from endpoint " + endpoint); + private static final FlinkService INSTANCE; + static { + try { + INSTANCE = new FlinkService(); + } catch (Exception e) { + throw new RuntimeException(e); } - address = ipPort.get("address"); - port = Integer.valueOf(ipPort.get("port")); } - configuration.setString(JobManagerOptions.ADDRESS, address); - configuration.setInteger(RestOptions.PORT, port); + } - clientService = (FlinkClientService) FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig); + public static FlinkService getInstance() { + return FlinkServiceHolder.INSTANCE; } /** * Translate the Endpoint to address & port */ - private Map translateFromEndpoint(String endpoint) throws Exception { + private Map translateFromEndpoint(String endpoint) { Map map = new HashMap<>(2); Matcher matcher = IP_PORT_PATTERN.matcher(endpoint); if (matcher.find()) { map.put("address", matcher.group(1)); map.put("port", matcher.group(2)); - return map; - } else { - throw new Exception("endpoint [" + endpoint + "] was not match address:port"); } + return map; } /** @@ -124,15 +116,52 @@ public FlinkConfig getFlinkConfig() { /** * Get the job status by the given job id. */ - public JobStatus getJobStatus(String jobId) throws Exception { - return clientService.getJobStatus(jobId); + public JobStatus getJobStatus(String endpoint, String jobId) throws Exception { + Configuration configuration = getFlinkConfiguration(endpoint); + return getFlinkClientService(configuration).getJobStatus(jobId); + } + + public JobStatus getJobStatus(FlinkInfo flinkInfo) throws Exception { + Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); + return getFlinkClientService(configuration).getJobStatus(flinkInfo.getJobId()); + } + + private FlinkClientService getFlinkClientService(Configuration configuration) { + return flinkClientServices.computeIfAbsent(configuration, + k -> (FlinkClientService) FlinkUtils.getFlinkClientService(configuration, flinkConfig)); + } + + private Configuration getFlinkConfiguration(String endpoint) { + return configurations.computeIfAbsent(endpoint, + k -> { + Integer port; + String address; + if (StringUtils.isEmpty(endpoint)) { + address = flinkConfig.getAddress(); + port = flinkConfig.getPort(); + } else { + Map ipPort = translateFromEndpoint(endpoint); + if (ipPort.isEmpty()) { + throw new BusinessException("get address:port failed from endpoint " + endpoint); + } + address = ipPort.get("address"); + port = Integer.valueOf(ipPort.get("port")); + } + // build flink configuration + Configuration configuration = new Configuration(); + configuration.setInteger(JobManagerOptions.PORT, flinkConfig.getJobManagerPort()); + configuration.setString(JobManagerOptions.ADDRESS, address); + configuration.setInteger(RestOptions.PORT, port); + return configuration; + }); } /** * Get job detail by the given job id. */ - public JobDetailsInfo getJobDetail(String jobId) throws Exception { - return clientService.getJobDetail(jobId); + public JobDetailsInfo getJobDetail(FlinkInfo flinkInfo) throws Exception { + Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); + return getFlinkClientService(configuration).getJobDetail(flinkInfo.getJobId()); } /** @@ -182,6 +211,8 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting } }).filter(Objects::nonNull).collect(Collectors.toList()); + Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); + PackagedProgram program = PackagedProgram.newBuilder() .setConfiguration(configuration) .setEntryPointClassName(Constants.ENTRYPOINT_CLASS) @@ -192,7 +223,7 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false); jobGraph.addJars(connectorJars); - RestClusterClient client = clientService.getFlinkClient(); + RestClusterClient client = getFlinkClientService(configuration).getFlinkClient(); CompletableFuture result = client.submitJob(jobGraph); return result.get().toString(); } @@ -200,15 +231,18 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting /** * Stop the Flink job with the savepoint. */ - public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception { - return clientService.stopJob(jobId, request.isDrain(), request.getTargetDirectory()); + public String stopJob(FlinkInfo flinkInfo, StopWithSavepointRequest request) throws Exception { + Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); + return getFlinkClientService(configuration).stopJob(flinkInfo.getJobId(), request.isDrain(), + request.getTargetDirectory()); } /** * Cancel the Flink job. */ - public void cancelJob(String jobId) throws Exception { - clientService.cancelJob(jobId); + public void cancelJob(FlinkInfo flinkInfo) throws Exception { + Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint()); + getFlinkClientService(configuration).cancelJob(flinkInfo.getJobId()); } /** diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java index 22003b29259..212c25397cb 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java @@ -40,10 +40,10 @@ public class IntegrationTaskRunner implements Runnable { private final FlinkInfo flinkInfo; private final Integer commitType; - public IntegrationTaskRunner(FlinkService flinkService, FlinkInfo flinkInfo, Integer commitType) { - this.flinkService = flinkService; + public IntegrationTaskRunner(FlinkInfo flinkInfo, Integer commitType) { this.flinkInfo = flinkInfo; this.commitType = commitType; + flinkService = FlinkService.getInstance(); } @Override @@ -84,12 +84,12 @@ public void run() { FlinkConfig flinkConfig = flinkService.getFlinkConfig(); stopWithSavepointRequest.setDrain(flinkConfig.isDrain()); stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory()); - String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest); + String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest); flinkInfo.setSavepointPath(location); log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location); int times = 0; while (times < TRY_MAX_TIMES) { - JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId()); + JobStatus jobStatus = flinkService.getJobStatus(flinkInfo); // restore job if (jobStatus == FINISHED) { try { @@ -123,7 +123,7 @@ public void run() { FlinkConfig flinkConfig = flinkService.getFlinkConfig(); stopWithSavepointRequest.setDrain(flinkConfig.isDrain()); stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory()); - String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest); + String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest); flinkInfo.setSavepointPath(location); log.info("the jobId {} savepoint: {} ", flinkInfo.getJobId(), location); } catch (Exception e) { @@ -136,9 +136,9 @@ public void run() { break; case DELETE: try { - flinkService.cancelJob(flinkInfo.getJobId()); + flinkService.cancelJob(flinkInfo); log.info("delete job {} success in backend", flinkInfo.getJobId()); - JobStatus jobStatus = flinkService.getJobStatus(flinkInfo.getJobId()); + JobStatus jobStatus = flinkService.getJobStatus(flinkInfo); if (jobStatus.isTerminalState()) { log.info("delete job {} success in backend", flinkInfo.getJobId()); } else { diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index a7918c640a6..009374e2a62 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -22,7 +22,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; -import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; @@ -117,8 +116,7 @@ public ListenerResult listen(WorkflowContext context) throws Exception { String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); + FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.delete(flinkInfo); log.info("job delete success for jobId={}", jobId); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index 9a95da354aa..0c6828c9846 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java @@ -22,7 +22,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; -import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -127,9 +126,7 @@ public ListenerResult listen(WorkflowContext context) throws Exception { flinkInfo.setJobName(jobName); String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); + FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); // todo Currently, savepoint is not being used to restart, but will be improved in the future diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index 8fa72f1c4b7..0b0e55e3692 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -23,7 +23,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; -import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -139,10 +138,7 @@ public ListenerResult listen(WorkflowContext context) throws Exception { String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - + FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); flinkOperation.start(flinkInfo); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java index 99c02451686..c66f76d4677 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java @@ -23,7 +23,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; -import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.enums.Constants; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -132,10 +131,7 @@ public ListenerResult listen(WorkflowContext context) throws Exception { String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); - + FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); flinkOperation.start(flinkInfo); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java index 06a76e1bf70..72794e5006d 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java @@ -22,7 +22,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.plugin.flink.FlinkOperation; -import org.apache.inlong.manager.plugin.flink.FlinkService; import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; @@ -116,9 +115,7 @@ public ListenerResult listen(WorkflowContext context) throws Exception { flinkInfo.setJobId(jobId); String sortUrl = kvConf.get(InlongConstants.SORT_URL); flinkInfo.setEndpoint(sortUrl); - - FlinkService flinkService = new FlinkService(flinkInfo.getEndpoint()); - FlinkOperation flinkOperation = new FlinkOperation(flinkService); + FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { // todo Currently, savepoint is not being used to stop, but will be improved in the future flinkOperation.delete(flinkInfo); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java index 84509b8a0e4..88c984c7606 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java @@ -101,9 +101,9 @@ public List pollSortStatus(List streamInfos, S } String sortUrl = kvConf.get(InlongConstants.SORT_URL); - FlinkService flinkService = new FlinkService(sortUrl); statusInfo.setSortStatus( - JOB_SORT_STATUS_MAP.getOrDefault(flinkService.getJobStatus(jobId), SortStatus.UNKNOWN)); + JOB_SORT_STATUS_MAP.getOrDefault(FlinkService.getInstance().getJobStatus(sortUrl, jobId), + SortStatus.UNKNOWN)); statusInfos.add(statusInfo); } catch (Exception e) { log.error("polling sort status failed for groupId=" + streamInfo.getInlongGroupId() + " streamId=" diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java deleted file mode 100644 index d5f3d8ed6e1..00000000000 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.plugin.util; - -import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.Properties; - -import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT; -import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY; - -/** - * Configuration file for Flink, only one instance in the process. - * Basically it used properties file to store. - */ -public class FlinkConfiguration { - - private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfiguration.class); - - private static final String DEFAULT_CONFIG_FILE = "flink-sort-plugin.properties"; - private static final String INLONG_MANAGER = "inlong-manager"; - - private final FlinkConfig flinkConfig; - - /** - * load config from flink file. - */ - public FlinkConfiguration() throws Exception { - String path = formatPath(); - flinkConfig = getFlinkConfigFromFile(path); - } - - /** - * fetch DEFAULT_CONFIG_FILE full path - */ - private String formatPath() throws Exception { - String path = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath(); - LOGGER.info("format first path {}", path); - - int index = path.indexOf(INLONG_MANAGER); - if (index == -1) { - throw new Exception(INLONG_MANAGER + " path not found in " + path); - } - - path = path.substring(0, index); - String confPath = path + INLONG_MANAGER + File.separator + "plugins" + File.separator + DEFAULT_CONFIG_FILE; - File file = new File(confPath); - if (!file.exists()) { - String message = String.format("not found %s in path %s", DEFAULT_CONFIG_FILE, confPath); - LOGGER.error(message); - throw new Exception(message); - } - - LOGGER.info("after format, {} located in {}", DEFAULT_CONFIG_FILE, confPath); - return confPath; - } - - /** - * get flink config - */ - public FlinkConfig getFlinkConfig() { - return flinkConfig; - } - - /** - * parse properties - */ - private FlinkConfig getFlinkConfigFromFile(String fileName) throws IOException { - Properties properties = new Properties(); - try (BufferedReader bufferedReader = new BufferedReader(new FileReader(fileName))) { - properties.load(bufferedReader); - } - FlinkConfig flinkConfig = new FlinkConfig(); - flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT))); - flinkConfig.setAddress(properties.getProperty(ADDRESS)); - flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM))); - flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY)); - flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT))); - flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN))); - flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY)); - flinkConfig.setVersion(properties.getProperty(FLINK_VERSION)); - return flinkConfig; - } - -} diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java deleted file mode 100644 index 82cd665538c..00000000000 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.plugin.util; - -import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; -import org.apache.inlong.manager.plugin.flink.enums.Constants; - -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.configuration.Configuration; - -import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Path; -import java.nio.file.Paths; - -@Slf4j -public class FlinkServiceUtils { - - private static final String DEFAULT_PLUGINS = "plugins"; - - private static final String FILE_PREFIX = "file://"; - - public static Object getFlinkClientService(Configuration configuration, FlinkConfig flinkConfig) { - log.info("Flink version {}", flinkConfig.getVersion()); - - Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath(); - String flinkJarName = String.format(Constants.FLINK_JAR_NAME, flinkConfig.getVersion()); - String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + flinkJarName; - log.info("Start to load Flink jar: {}", flinkClientPath); - - try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new URL(flinkClientPath)}, Thread.currentThread() - .getContextClassLoader())) { - Class flinkClientService = classLoader.loadClass(Constants.FLINK_CLIENT_CLASS); - Object flinkService = flinkClientService.getDeclaredConstructor(Configuration.class) - .newInstance(configuration); - log.info("Successfully loaded Flink service"); - return flinkService; - } catch (Exception e) { - log.error("Failed to loaded Flink service, please check flink client jar path: {}", flinkClientPath); - throw new RuntimeException(e); - } - } -} diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java index c3daefecfbf..61e7aac4845 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java @@ -17,25 +17,40 @@ package org.apache.inlong.manager.plugin.util; +import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; +import org.apache.inlong.manager.plugin.flink.enums.Constants; + import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; +import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Objects; +import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY; + /** * Util of flink. */ @@ -43,27 +58,9 @@ public class FlinkUtils { public static final String BASE_DIRECTORY = "config"; - - public static final List FLINK_VERSION_COLLECTION = Collections.singletonList("Flink-1.13"); - - /** - * getLatestFlinkVersion - */ - public static String getLatestFlinkVersion(String[] supportedFlink) { - if (Objects.isNull(supportedFlink)) { - return null; - } - Arrays.sort(supportedFlink, Collections.reverseOrder()); - String latestFinkVersion = null; - for (String flinkVersion : supportedFlink) { - latestFinkVersion = FLINK_VERSION_COLLECTION.stream() - .filter(v -> v.equals(flinkVersion)).findFirst().orElse(null); - if (Objects.nonNull(latestFinkVersion)) { - return latestFinkVersion; - } - } - return latestFinkVersion; - } + private static final String DEFAULT_PLUGINS = "plugins"; + private static final String FILE_PREFIX = "file://"; + private static final String DEFAULT_CONFIG_FILE = "flink-sort-plugin.properties"; /** * print exception @@ -127,13 +124,6 @@ public static List listFiles(String baseDirName, String pattern, int lim return result; } - /** - * get value - */ - public static String getValue(String key, String defaultValue) { - return StringUtils.isNotEmpty(key) ? key : defaultValue; - } - /** * getConfigDirectory * @@ -171,23 +161,46 @@ public static boolean writeConfigToFile(String configJobDirectory, String config return true; } - /** - * Delete configuration file - * - * @param name file config info - * @return whether sucess - */ - public static boolean deleteConfigFile(String name) { - String configDirectory = getConfigDirectory(name); - File file = new File(configDirectory); - if (file.exists()) { - try { - FileUtils.deleteDirectory(file); - } catch (IOException e) { - log.error("delete {} failed", configDirectory, e); - return false; - } + public static Object getFlinkClientService(Configuration configuration, FlinkConfig flinkConfig) { + log.info("Flink version {}", flinkConfig.getVersion()); + + Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath(); + String flinkJarName = String.format(Constants.FLINK_JAR_NAME, flinkConfig.getVersion()); + String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + flinkJarName; + log.info("Start to load Flink jar: {}", flinkClientPath); + + try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new URL(flinkClientPath)}, Thread.currentThread() + .getContextClassLoader())) { + Class flinkClientService = classLoader.loadClass(Constants.FLINK_CLIENT_CLASS); + Object flinkService = flinkClientService.getDeclaredConstructor(Configuration.class) + .newInstance(configuration); + log.info("Successfully loaded Flink service"); + return flinkService; + } catch (Exception e) { + log.error("Failed to loaded Flink service, please check flink client jar path: {}", flinkClientPath); + throw new RuntimeException(e); } - return true; + } + + public static FlinkConfig getFlinkConfigFromFile() throws Exception { + Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath(); + String defaultConfigFilePath = pluginPath + File.separator + DEFAULT_CONFIG_FILE; + + log.info("Start to load Flink config from file: {}", defaultConfigFilePath); + + Properties properties = new Properties(); + try (BufferedReader bufferedReader = new BufferedReader(new FileReader(defaultConfigFilePath))) { + properties.load(bufferedReader); + } + FlinkConfig flinkConfig = new FlinkConfig(); + flinkConfig.setPort(Integer.valueOf(properties.getProperty(PORT))); + flinkConfig.setAddress(properties.getProperty(ADDRESS)); + flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM))); + flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY)); + flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT))); + flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN))); + flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY)); + flinkConfig.setVersion(properties.getProperty(FLINK_VERSION)); + return flinkConfig; } }