Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
* limitations under the License.
*/

package com.dtstack.flink.sql.launcher;
package com.dtstack.flink.sql;

/**
* This class defines three running mode of FlinkX
*
* Company: www.dtstack.com
* @author huyifan.zju@163.com
*/
public class ClusterMode {
public enum ClusterMode {

public static final String MODE_LOCAL = "local";
local(0),standalone(1),yarn(2),yarnPer(3);

public static final String MODE_STANDALONE = "standalone";
private int type;

public static final String MODE_YARN = "yarn";
ClusterMode(int type){
this.type = type;
}

}
6 changes: 2 additions & 4 deletions core/src/main/java/com/dtstack/flink/sql/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ public class Main {

private static final Logger LOG = LoggerFactory.getLogger(Main.class);

private static final String LOCAL_MODE = "local";

private static final int failureRate = 3;

private static final int failureInterval = 6; //min
Expand Down Expand Up @@ -135,7 +133,7 @@ public static void main(String[] args) throws Exception {
Thread.currentThread().setContextClassLoader(dtClassLoader);

URLClassLoader parentClassloader;
if(!LOCAL_MODE.equals(deployMode)){
if(!ClusterMode.local.name().equals(deployMode)){
parentClassloader = (URLClassLoader) threadClassLoader.getParent();
}else{
parentClassloader = dtClassLoader;
Expand Down Expand Up @@ -286,7 +284,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
}

private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws IOException {
StreamExecutionEnvironment env = !LOCAL_MODE.equals(deployMode) ?
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
StreamExecutionEnvironment.getExecutionEnvironment() :
new MyLocalStreamEnvironment();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.dtstack.flink.sql.ClusterMode;
import java.io.File;
import java.io.FilenameFilter;
import java.lang.reflect.Field;
Expand All @@ -42,10 +42,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static com.dtstack.flink.sql.launcher.LauncherOptions.*;

/**
* The Factory of ClusterClient
Expand All @@ -55,29 +53,29 @@
*/
public class ClusterClientFactory {

public static ClusterClient createClusterClient(Properties props) {
String clientType = props.getProperty(OPTION_MODE);
if(clientType.equals(ClusterMode.MODE_STANDALONE)) {
return createStandaloneClient(props);
} else if(clientType.equals(ClusterMode.MODE_YARN)) {
return createYarnClient(props);
public static ClusterClient createClusterClient(LauncherOptions launcherOptions) {
String mode = launcherOptions.getMode();
if(mode.equals(ClusterMode.standalone.name())) {
return createStandaloneClient(launcherOptions);
} else if(mode.equals(ClusterMode.yarn.name())) {
return createYarnClient(launcherOptions);
}
throw new IllegalArgumentException("Unsupported cluster client type: ");
}

public static StandaloneClusterClient createStandaloneClient(Properties props) {
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
public static StandaloneClusterClient createStandaloneClient(LauncherOptions launcherOptions) {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
StandaloneClusterClient clusterClient = descriptor.retrieve(null);
clusterClient.setDetached(true);
return clusterClient;
}

public static YarnClusterClient createYarnClient(Properties props) {
String flinkConfDir = props.getProperty(LauncherOptions.OPTION_FLINK_CONF_DIR);
public static YarnClusterClient createYarnClient(LauncherOptions launcherOptions) {
String flinkConfDir = launcherOptions.getFlinkconf();
Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
String yarnConfDir = props.getProperty(LauncherOptions.OPTION_YARN_CONF_DIR);
String yarnConfDir =launcherOptions.getYarnconf();
org.apache.hadoop.conf.Configuration yarnConf = new YarnConfiguration();
if(StringUtils.isNotBlank(yarnConfDir)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@

import avro.shaded.com.google.common.collect.Lists;
import com.dtstack.flink.sql.Main;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;

import java.io.File;
import java.util.List;

import static com.dtstack.flink.sql.launcher.ClusterMode.MODE_LOCAL;
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_LOCAL_SQL_PLUGIN_PATH;
import static com.dtstack.flink.sql.launcher.LauncherOptions.OPTION_MODE;
import com.dtstack.flink.sql.ClusterMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

/**
* Date: 2017/2/20
Expand All @@ -51,18 +50,21 @@ private static String getLocalCoreJarPath(String localSqlRootJar){

public static void main(String[] args) throws Exception {
LauncherOptionParser optionParser = new LauncherOptionParser(args);
String mode = (String) optionParser.getVal(OPTION_MODE);
LauncherOptions launcherOptions = optionParser.getLauncherOptions();
String mode = launcherOptions.getMode();
List<String> argList = optionParser.getProgramExeArgList();

if(mode.equals(MODE_LOCAL)) {
if(mode.equals(ClusterMode.local.name())) {
String[] localArgs = argList.toArray(new String[argList.size()]);
Main.main(localArgs);
} else {
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(optionParser.getProperties());
String pluginRoot = (String) optionParser.getVal(OPTION_LOCAL_SQL_PLUGIN_PATH);
ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
String pluginRoot = launcherOptions.getLocalSqlPluginPath();
File jarFile = new File(getLocalCoreJarPath(pluginRoot));
String[] remoteArgs = argList.toArray(new String[argList.size()]);
PackagedProgram program = new PackagedProgram(jarFile, Lists.newArrayList(), remoteArgs);
if(StringUtils.isNotBlank(launcherOptions.getSavePointPath())){
program.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(launcherOptions.getSavePointPath(), BooleanUtils.toBoolean(launcherOptions.getAllowNonRestoredState())));
}
clusterClient.run(program, 1);
clusterClient.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@
package com.dtstack.flink.sql.launcher;

import avro.shaded.com.google.common.collect.Lists;
import com.dtstack.flink.sql.util.PluginUtil;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.hadoop.shaded.com.google.common.base.Charsets;
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions;

import java.io.File;
import java.io.FileInputStream;
import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static com.dtstack.flink.sql.launcher.LauncherOptions.*;
import static com.dtstack.flink.sql.launcher.ClusterMode.*;

import com.dtstack.flink.sql.ClusterMode;

/**
* The Parser of Launcher commandline options
Expand All @@ -45,14 +41,36 @@
*/
public class LauncherOptionParser {

public static final String OPTION_MODE = "mode";

public static final String OPTION_NAME = "name";

public static final String OPTION_SQL = "sql";

public static final String OPTION_FLINK_CONF_DIR = "flinkconf";

public static final String OPTION_YARN_CONF_DIR = "yarnconf";

public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath";

public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath";

public static final String OPTION_ADDJAR = "addjar";

public static final String OPTION_CONF_PROP = "confProp";

public static final String OPTION_SAVE_POINT_PATH = "savePointPath";

public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState";

private Options options = new Options();

private BasicParser parser = new BasicParser();

private Properties properties = new Properties();
private LauncherOptions properties = new LauncherOptions();

public LauncherOptionParser(String[] args) {
options.addOption(LauncherOptions.OPTION_MODE, true, "Running mode");
options.addOption(OPTION_MODE, true, "Running mode");
options.addOption(OPTION_SQL, true, "Job sql file");
options.addOption(OPTION_NAME, true, "Job name");
options.addOption(OPTION_FLINK_CONF_DIR, true, "Flink configuration directory");
Expand All @@ -62,11 +80,14 @@ public LauncherOptionParser(String[] args) {
options.addOption(OPTION_CONF_PROP, true, "sql ref prop,eg specify event time");
options.addOption(OPTION_YARN_CONF_DIR, true, "Yarn and hadoop configuration directory");

options.addOption(OPTION_SAVE_POINT_PATH, true, "Savepoint restore path");
options.addOption(OPTION_ALLOW_NON_RESTORED_STATE, true, "Flag indicating whether non restored state is allowed if the savepoint");

try {
CommandLine cl = parser.parse(options, args);
String mode = cl.getOptionValue(OPTION_MODE, MODE_LOCAL);
String mode = cl.getOptionValue(OPTION_MODE, ClusterMode.local.name());
//check mode
properties.put(OPTION_MODE, mode);
properties.setMode(mode);

String job = Preconditions.checkNotNull(cl.getOptionValue(OPTION_SQL),
"Must specify job file using option '" + OPTION_SQL + "'");
Expand All @@ -76,78 +97,65 @@ public LauncherOptionParser(String[] args) {
in.read(filecontent);
String content = new String(filecontent, "UTF-8");
String sql = URLEncoder.encode(content, Charsets.UTF_8.name());
properties.put(OPTION_SQL, sql);

properties.setSql(sql);
String localPlugin = Preconditions.checkNotNull(cl.getOptionValue(OPTION_LOCAL_SQL_PLUGIN_PATH));
properties.put(OPTION_LOCAL_SQL_PLUGIN_PATH, localPlugin);

properties.setLocalSqlPluginPath(localPlugin);
String remotePlugin = cl.getOptionValue(OPTION_REMOTE_SQL_PLUGIN_PATH);
if(!mode.equalsIgnoreCase(ClusterMode.MODE_LOCAL)){
if(!ClusterMode.local.name().equals(mode)){
Preconditions.checkNotNull(remotePlugin);
properties.put(OPTION_REMOTE_SQL_PLUGIN_PATH, remotePlugin);
properties.setRemoteSqlPluginPath(remotePlugin);
}

String name = Preconditions.checkNotNull(cl.getOptionValue(OPTION_NAME));
properties.put(OPTION_NAME, name);

properties.setName(name);
String addJar = cl.getOptionValue(OPTION_ADDJAR);
if(StringUtils.isNotBlank(addJar)){
properties.put(OPTION_ADDJAR, addJar);
properties.setAddjar(addJar);
}

String confProp = cl.getOptionValue(OPTION_CONF_PROP);
if(StringUtils.isNotBlank(confProp)){
properties.put(OPTION_CONF_PROP, confProp);
properties.setConfProp(confProp);
}

String flinkConfDir = cl.getOptionValue(OPTION_FLINK_CONF_DIR);
if(StringUtils.isNotBlank(flinkConfDir)) {
properties.put(OPTION_FLINK_CONF_DIR, flinkConfDir);
properties.setFlinkconf(flinkConfDir);
}

String yarnConfDir = cl.getOptionValue(OPTION_YARN_CONF_DIR);
if(StringUtils.isNotBlank(yarnConfDir)) {
properties.put(OPTION_YARN_CONF_DIR, yarnConfDir);
properties.setYarnconf(yarnConfDir);
}

String savePointPath = cl.getOptionValue(OPTION_SAVE_POINT_PATH);
if(StringUtils.isNotBlank(savePointPath)) {
properties.setSavePointPath(savePointPath);
}

String allow_non = cl.getOptionValue(OPTION_ALLOW_NON_RESTORED_STATE);
if(StringUtils.isNotBlank(allow_non)) {
properties.setAllowNonRestoredState(allow_non);
}

} catch (Exception e) {
throw new RuntimeException(e);
}

}

public Properties getProperties(){
public LauncherOptions getLauncherOptions(){
return properties;
}

public Object getVal(String key){
return properties.get(key);
}

public List<String> getAllArgList(){
public List<String> getProgramExeArgList() throws Exception {
Map<String,Object> mapConf = PluginUtil.ObjectToMap(properties);
List<String> args = Lists.newArrayList();
for(Map.Entry<Object, Object> one : properties.entrySet()){
args.add("-" + one.getKey().toString());
args.add(one.getValue().toString());
}

return args;
}

public List<String> getProgramExeArgList(){
List<String> args = Lists.newArrayList();
for(Map.Entry<Object, Object> one : properties.entrySet()){
String key = one.getKey().toString();
for(Map.Entry<String, Object> one : mapConf.entrySet()){
String key = one.getKey();
if(OPTION_FLINK_CONF_DIR.equalsIgnoreCase(key)
|| OPTION_YARN_CONF_DIR.equalsIgnoreCase(key)){
continue;
}

args.add("-" + key);
args.add(one.getValue().toString());
}

return args;
}

}
Loading