Skip to content

correct one wrong spelling #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
* @param pluginLoadMode
* @return
*/
public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, String deployMode, String pluginLoadMode) {
private static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, String deployMode, String pluginLoadMode) {
if (StringUtils.isEmpty(remoteSqlPluginPath)) {
return StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.SHIPFILE.name())
|| StringUtils.equalsIgnoreCase(deployMode, ClusterMode.local.name());
Expand Down Expand Up @@ -176,7 +176,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
}


public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.IOException {
private static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.IOException {
List<URL> jarUrlList = Lists.newArrayList();
if (Strings.isNullOrEmpty(addJarListStr)) {
return jarUrlList;
Expand Down Expand Up @@ -240,7 +240,7 @@ private static void sqlTranslation(String localSqlPluginPath,
}
}

public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
throws IllegalAccessException, InvocationTargetException {
// udf和tableEnv须由同一个类加载器加载
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
Expand Down Expand Up @@ -269,9 +269,9 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
* @return
* @throws Exception
*/
public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
Set<URL> pluginClassPatshSets = Sets.newHashSet();
Set<URL> pluginClassPathSets = Sets.newHashSet();
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {

Expand Down Expand Up @@ -325,26 +325,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
registerTableCache.put(tableInfo.getName(), regTable);

URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sourceTablePathUrl);
pluginClassPathSets.add(sourceTablePathUrl);
} else if (tableInfo instanceof TargetTableInfo) {

TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);

URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sinkTablePathUrl);
pluginClassPathSets.add(sinkTablePathUrl);
} else if (tableInfo instanceof SideTableInfo) {
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);

URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
pluginClassPatshSets.add(sideTablePathUrl);
pluginClassPathSets.add(sideTablePathUrl);
} else {
throw new RuntimeException("not support table type:" + tableInfo.getType());
}
}
return pluginClassPatshSets;
return pluginClassPathSets;
}

/**
Expand All @@ -353,7 +353,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
* @param env
* @param classPathSet
*/
public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
private static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
int i = 0;
for (URL url : classPathSet) {
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
Expand All @@ -362,7 +362,7 @@ public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env,
}
}

public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
StreamExecutionEnvironment.getExecutionEnvironment() :
new MyLocalStreamEnvironment();
Expand Down