Skip to content

Commit

Permalink
use singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
todd5167 committed Apr 14, 2020
1 parent 716cad5 commit 0a7f3f9
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 19 deletions.
Expand Up @@ -39,13 +39,10 @@
* @author maqi
*/
public class StandaloneExecutor {

StandaloneClientFactory standaloneClientFactory;
JobParamsInfo jobParamsInfo;

public StandaloneExecutor(JobParamsInfo jobParamsInfo) {
this.jobParamsInfo = jobParamsInfo;
standaloneClientFactory = new StandaloneClientFactory();
}

public void exec() throws Exception {
Expand All @@ -62,7 +59,7 @@ public void exec() throws Exception {

JobGraphBuildUtil.fillJobGraphClassPath(jobGraph);

ClusterDescriptor clusterDescriptor = standaloneClientFactory.createClusterDescriptor("", flinkConfiguration);
ClusterDescriptor clusterDescriptor = StandaloneClientFactory.INSTANCE.createClusterDescriptor("", flinkConfiguration);
ClusterClientProvider clusterClientProvider = clusterDescriptor.retrieve(StandaloneClusterId.getInstance());
ClusterClient clusterClient = clusterClientProvider.getClusterClient();

Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.dtstack.flink.sql.launcher.factory.YarnClusterClientFactory;
import com.dtstack.flink.sql.launcher.utils.JobGraphBuildUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
Expand All @@ -39,10 +38,8 @@

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;


Expand All @@ -58,12 +55,10 @@ public class YarnJobClusterExecutor {
private static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";

YarnClusterClientFactory yarnClusterClientFactory;
JobParamsInfo jobParamsInfo;

public YarnJobClusterExecutor(JobParamsInfo jobParamsInfo) {
this.jobParamsInfo = jobParamsInfo;
yarnClusterClientFactory = new YarnClusterClientFactory();
}

public void exec() throws Exception {
Expand All @@ -75,12 +70,13 @@ public void exec() throws Exception {
Configuration flinkConfiguration = JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir(), jobParamsInfo.getConfProperties());
appendApplicationConfig(flinkConfiguration, jobParamsInfo);

YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) yarnClusterClientFactory.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
YarnClusterDescriptor clusterDescriptor = (YarnClusterDescriptor) YarnClusterClientFactory.INSTANCE
.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);

List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
clusterDescriptor.addShipFiles(shipFiles);

ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(flinkConfiguration);
ClusterSpecification clusterSpecification = YarnClusterClientFactory.INSTANCE.getClusterSpecification(flinkConfiguration);
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);

String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
Expand Down
Expand Up @@ -34,26 +34,23 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;

import java.io.UnsupportedEncodingException;

/**
* Date: 2020/3/4
* Company: www.dtstack.com
* @author maqi
*/
public class YarnSessionClusterExecutor {
YarnClusterClientFactory yarnClusterClientFactory;
JobParamsInfo jobParamsInfo;

public YarnSessionClusterExecutor(JobParamsInfo jobParamsInfo) {
this.jobParamsInfo = jobParamsInfo;
yarnClusterClientFactory = new YarnClusterClientFactory();
}

public void exec() throws Exception {
JobGraph jobGraph = JobGraphBuildUtil.buildJobGraph(jobParamsInfo);
Configuration flinkConfiguration = JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir(), jobParamsInfo.getConfProperties());
ClusterDescriptor clusterDescriptor = yarnClusterClientFactory.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
ClusterDescriptor clusterDescriptor = YarnClusterClientFactory.INSTANCE.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);

Object yid = jobParamsInfo.getYarnSessionConfProperties().get("yid");
if (null == yid) {
Expand Down
Expand Up @@ -33,9 +33,9 @@
* Company: www.dtstack.com
* @author maqi
*/
public abstract class AbstractClusterClientFactory {
public interface AbstractClusterClientFactory {

public ClusterSpecification getClusterSpecification(Configuration configuration) {
default ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);

final int jobManagerMemoryMb = ConfigurationUtils
Expand Down
Expand Up @@ -29,7 +29,9 @@
* Company: www.dtstack.com
* @author maqi
*/
public class StandaloneClientFactory extends AbstractClusterClientFactory {
public enum StandaloneClientFactory implements AbstractClusterClientFactory {
INSTANCE;

@Override
public ClusterDescriptor createClusterDescriptor(String clusterConfPath, Configuration flinkConfig) {
checkNotNull(flinkConfig);
Expand Down
Expand Up @@ -41,8 +41,11 @@
* Company: www.dtstack.com
* @author maqi
*/
public class YarnClusterClientFactory extends AbstractClusterClientFactory {
public enum YarnClusterClientFactory implements AbstractClusterClientFactory {
INSTANCE;

private static final String XML_FILE_EXTENSION = "xml";

@Override
public ClusterDescriptor createClusterDescriptor(String yarnConfDir, Configuration flinkConfig) {

Expand Down

0 comments on commit 0a7f3f9

Please sign in to comment.