Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ public class FlinkOperation {
private static final String CONNECTOR_JAR_PATTERN = "^sort-connector-(?i)(%s).*jar$";
private static final String ALL_CONNECTOR_JAR_PATTERN = "^sort-connector-.*jar$";
private static Properties properties;
private final FlinkService flinkService;

public FlinkOperation(FlinkService flinkService) {
this.flinkService = flinkService;
private static class FlinkOperationHolder {

private static final FlinkOperation INSTANCE = new FlinkOperation();
}

public static FlinkOperation getInstance() {
return FlinkOperationHolder.INSTANCE;
}

/**
Expand Down Expand Up @@ -103,41 +107,41 @@ private String getConnectorJarPattern(String dataSourceType) {
* Restart the Flink job.
*/
public void restart(FlinkInfo flinkInfo) throws Exception {
String jobId = flinkInfo.getJobId();
boolean terminated = isNullOrTerminated(jobId);
boolean terminated = isNullOrTerminated(flinkInfo);
if (terminated) {
String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, jobId);
String message = String.format("restart job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}

Future<?> future = TaskRunService.submit(
new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.RESTART.getCode()));
new IntegrationTaskRunner(flinkInfo, TaskCommitType.RESTART.getCode()));
future.get();
}

/**
* Start the Flink job, if the job id was not empty, restore it.
*/
public void start(FlinkInfo flinkInfo) throws Exception {
String jobId = flinkInfo.getJobId();
try {
// Start a new task without savepoint
if (StringUtils.isEmpty(jobId)) {
IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
if (StringUtils.isEmpty(flinkInfo.getJobId())) {
IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkInfo,
TaskCommitType.START_NOW.getCode());
Future<?> future = TaskRunService.submit(taskRunner);
future.get();
} else {
// Restore an old task with savepoint
boolean noSavepoint = isNullOrTerminated(jobId) || StringUtils.isEmpty(flinkInfo.getSavepointPath());
boolean noSavepoint =
isNullOrTerminated(flinkInfo) || StringUtils.isEmpty(flinkInfo.getSavepointPath());
if (noSavepoint) {
String message = String.format("restore job failed, as " + JOB_TERMINATED_MSG, jobId);
String message =
String.format("restore job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}

IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkService, flinkInfo,
IntegrationTaskRunner taskRunner = new IntegrationTaskRunner(flinkInfo,
TaskCommitType.RESUME.getCode());
Future<?> future = TaskRunService.submit(taskRunner);
future.get();
Expand All @@ -148,37 +152,6 @@ public void start(FlinkInfo flinkInfo) throws Exception {
}
}

/**
* Check whether there are duplicate NodeIds in different relations.
* <p/>
* The JSON data in the dataflow is in the reverse order of the nodes in the actual dataflow.
* For example, data flow A -> B -> C, the generated topological relationship is [[B,C],[A,B]],
* then the input node B in the first relation [B,C] is the second output node B in relation [A,B].
* <p/>
* The example of dataflow:
* <blockquote><pre>
* {
* "groupId": "test_group",
* "streams": [
* {
* "streamId": "test_stream",
* "relations": [
* {
* "type": "baseRelation",
* "inputs": [ "node_3" ],
* "outputs": [ "node_4" ]
* },
* {
* "type": "innerJoin",
* "inputs": [ "node_1", "node_2" ],
* "outputs": [ "node_3" ]
* }
* ]
* }
* ]
* }
* </pre></blockquote>
*/
private void checkNodeIds(String dataflow) throws Exception {
JsonNode relations = JsonUtils.parseTree(dataflow).get(InlongConstants.STREAMS)
.get(0).get(InlongConstants.RELATIONS);
Expand Down Expand Up @@ -293,16 +266,15 @@ public void genPath(FlinkInfo flinkInfo, String dataflow) throws Exception {
* Stop the Flink job.
*/
public void stop(FlinkInfo flinkInfo) throws Exception {
String jobId = flinkInfo.getJobId();
boolean terminated = isNullOrTerminated(jobId);
boolean terminated = isNullOrTerminated(flinkInfo);
if (terminated) {
String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, jobId);
String message = String.format("stop job failed, as " + JOB_TERMINATED_MSG, flinkInfo.getJobId());
log.error(message);
throw new Exception(message);
}

Future<?> future = TaskRunService.submit(
new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.STOP.getCode()));
new IntegrationTaskRunner(flinkInfo, TaskCommitType.STOP.getCode()));
future.get();
}

Expand All @@ -311,7 +283,7 @@ public void stop(FlinkInfo flinkInfo) throws Exception {
*/
public void delete(FlinkInfo flinkInfo) throws Exception {
String jobId = flinkInfo.getJobId();
JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo);
if (jobDetailsInfo == null) {
throw new Exception(String.format("delete job failed as the job [%s] not found", jobId));
}
Expand All @@ -324,7 +296,7 @@ public void delete(FlinkInfo flinkInfo) throws Exception {
}

Future<?> future = TaskRunService.submit(
new IntegrationTaskRunner(flinkService, flinkInfo, TaskCommitType.DELETE.getCode()));
new IntegrationTaskRunner(flinkInfo, TaskCommitType.DELETE.getCode()));
future.get();
}

Expand All @@ -343,7 +315,7 @@ public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus) throws Ex
int retryTimes = 0;
while (retryTimes <= TRY_MAX_TIMES) {
try {
JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo);
if (jobDetailsInfo == null) {
log.error("job detail not found by {}", jobId);
throw new Exception(String.format("job detail not found by %s", jobId));
Expand Down Expand Up @@ -371,11 +343,11 @@ public void pollJobStatus(FlinkInfo flinkInfo, JobStatus expectStatus) throws Ex
/**
* Check whether the job was terminated by the given job id.
*/
private boolean isNullOrTerminated(String jobId) throws Exception {
JobDetailsInfo jobDetailsInfo = flinkService.getJobDetail(jobId);
private boolean isNullOrTerminated(FlinkInfo flinkInfo) throws Exception {
JobDetailsInfo jobDetailsInfo = FlinkService.getInstance().getJobDetail(flinkInfo);
boolean terminated = jobDetailsInfo == null || jobDetailsInfo.getJobStatus() == null;
if (terminated) {
log.warn("job detail or job status was null for [{}]", jobId);
log.warn("job detail or job status was null for [{}]", flinkInfo.getJobId());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
import org.apache.inlong.manager.plugin.util.FlinkUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -64,54 +63,47 @@ public class FlinkService {
private final FlinkConfig flinkConfig;
private final Integer parallelism;
private final String savepointDirectory;
private final Configuration configuration;
private final FlinkClientService clientService;
// map endpoint to Configuration
private final Map<String, Configuration> configurations = new HashMap<>();
// map Configuration to FlinkClientService
private final Map<Configuration, FlinkClientService> flinkClientServices = new HashMap<>();

/**
* Constructor of FlinkService.
*/
public FlinkService(String endpoint) throws Exception {
FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
flinkConfig = flinkConfiguration.getFlinkConfig();
public FlinkService() throws Exception {
flinkConfig = FlinkUtils.getFlinkConfigFromFile();
parallelism = flinkConfig.getParallelism();
savepointDirectory = flinkConfig.getSavepointDirectory();
}

private static class FlinkServiceHolder {

configuration = new Configuration();
Integer jobManagerPort = flinkConfig.getJobManagerPort();
configuration.setInteger(JobManagerOptions.PORT, jobManagerPort);

Integer port;
String address;
if (StringUtils.isEmpty(endpoint)) {
address = flinkConfig.getAddress();
port = flinkConfig.getPort();
} else {
Map<String, String> ipPort = translateFromEndpoint(endpoint);
if (ipPort.isEmpty()) {
throw new BusinessException("get address:port failed from endpoint " + endpoint);
private static final FlinkService INSTANCE;
static {
try {
INSTANCE = new FlinkService();
} catch (Exception e) {
throw new RuntimeException(e);
}
address = ipPort.get("address");
port = Integer.valueOf(ipPort.get("port"));
}
configuration.setString(JobManagerOptions.ADDRESS, address);
configuration.setInteger(RestOptions.PORT, port);
}

clientService = (FlinkClientService) FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig);
public static FlinkService getInstance() {
return FlinkServiceHolder.INSTANCE;
}

/**
* Translate the Endpoint to address & port
*/
private Map<String, String> translateFromEndpoint(String endpoint) throws Exception {
private Map<String, String> translateFromEndpoint(String endpoint) {
Map<String, String> map = new HashMap<>(2);
Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
if (matcher.find()) {
map.put("address", matcher.group(1));
map.put("port", matcher.group(2));
return map;
} else {
throw new Exception("endpoint [" + endpoint + "] was not match address:port");
}
return map;
}

/**
Expand All @@ -124,15 +116,52 @@ public FlinkConfig getFlinkConfig() {
/**
* Get the job status by the given job id.
*/
public JobStatus getJobStatus(String jobId) throws Exception {
return clientService.getJobStatus(jobId);
public JobStatus getJobStatus(String endpoint, String jobId) throws Exception {
Configuration configuration = getFlinkConfiguration(endpoint);
return getFlinkClientService(configuration).getJobStatus(jobId);
}

public JobStatus getJobStatus(FlinkInfo flinkInfo) throws Exception {
Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint());
return getFlinkClientService(configuration).getJobStatus(flinkInfo.getJobId());
}

private FlinkClientService getFlinkClientService(Configuration configuration) {
return flinkClientServices.computeIfAbsent(configuration,
k -> (FlinkClientService) FlinkUtils.getFlinkClientService(configuration, flinkConfig));
}

private Configuration getFlinkConfiguration(String endpoint) {
return configurations.computeIfAbsent(endpoint,
k -> {
Integer port;
String address;
if (StringUtils.isEmpty(endpoint)) {
address = flinkConfig.getAddress();
port = flinkConfig.getPort();
} else {
Map<String, String> ipPort = translateFromEndpoint(endpoint);
if (ipPort.isEmpty()) {
throw new BusinessException("get address:port failed from endpoint " + endpoint);
}
address = ipPort.get("address");
port = Integer.valueOf(ipPort.get("port"));
}
// build flink configuration
Configuration configuration = new Configuration();
configuration.setInteger(JobManagerOptions.PORT, flinkConfig.getJobManagerPort());
configuration.setString(JobManagerOptions.ADDRESS, address);
configuration.setInteger(RestOptions.PORT, port);
return configuration;
});
}

/**
* Get job detail by the given job id.
*/
public JobDetailsInfo getJobDetail(String jobId) throws Exception {
return clientService.getJobDetail(jobId);
public JobDetailsInfo getJobDetail(FlinkInfo flinkInfo) throws Exception {
Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint());
return getFlinkClientService(configuration).getJobDetail(flinkInfo.getJobId());
}

/**
Expand Down Expand Up @@ -182,6 +211,8 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting
}
}).filter(Objects::nonNull).collect(Collectors.toList());

Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint());

PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
.setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
Expand All @@ -192,23 +223,26 @@ private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSetting
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
jobGraph.addJars(connectorJars);

RestClusterClient<StandaloneClusterId> client = clientService.getFlinkClient();
RestClusterClient<StandaloneClusterId> client = getFlinkClientService(configuration).getFlinkClient();
CompletableFuture<JobID> result = client.submitJob(jobGraph);
return result.get().toString();
}

/**
* Stop the Flink job with the savepoint.
*/
public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception {
return clientService.stopJob(jobId, request.isDrain(), request.getTargetDirectory());
public String stopJob(FlinkInfo flinkInfo, StopWithSavepointRequest request) throws Exception {
Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint());
return getFlinkClientService(configuration).stopJob(flinkInfo.getJobId(), request.isDrain(),
request.getTargetDirectory());
}

/**
* Cancel the Flink job.
*/
public void cancelJob(String jobId) throws Exception {
clientService.cancelJob(jobId);
public void cancelJob(FlinkInfo flinkInfo) throws Exception {
Configuration configuration = getFlinkConfiguration(flinkInfo.getEndpoint());
getFlinkClientService(configuration).cancelJob(flinkInfo.getJobId());
}

/**
Expand Down
Loading