Skip to content

Commit e6ed398

Browse files
committed
Merge branch '1.10_release_4.0.x' into hotfix_1.10_4.0.x_30935
# Conflicts: # core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java # elasticsearch6/elasticsearch6-side/elasticsearch6-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AllReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java
2 parents 006236b + 1e56647 commit e6ed398

File tree

77 files changed

+1897
-1541
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+1897
-1541
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ lib/
1414
.DS_Store
1515
bin/nohup.out
1616
.DS_Store
17-
bin/sideSql.txt
17+
bin/sideSql.txt
18+
*.keytab
19+
krb5.conf

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,18 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import com.datastax.driver.core.*;
21+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22+
import org.apache.flink.types.Row;
23+
import org.apache.flink.util.Collector;
24+
25+
import com.datastax.driver.core.Cluster;
26+
import com.datastax.driver.core.ConsistencyLevel;
27+
import com.datastax.driver.core.HostDistance;
28+
import com.datastax.driver.core.PoolingOptions;
29+
import com.datastax.driver.core.QueryOptions;
30+
import com.datastax.driver.core.ResultSet;
31+
import com.datastax.driver.core.Session;
32+
import com.datastax.driver.core.SocketOptions;
2233
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2334
import com.datastax.driver.core.policies.RetryPolicy;
2435
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
@@ -32,10 +43,7 @@
3243
import org.apache.calcite.sql.JoinType;
3344
import org.apache.commons.collections.CollectionUtils;
3445
import org.apache.commons.lang3.StringUtils;
35-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3646
import org.apache.flink.table.dataformat.BaseRow;
37-
import org.apache.flink.types.Row;
38-
import org.apache.flink.util.Collector;
3947
import org.slf4j.Logger;
4048
import org.slf4j.LoggerFactory;
4149

@@ -72,27 +80,6 @@ public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<Field
7280
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
7381
}
7482

75-
@Override
76-
public Row fillData(Row input, Object sideInput) {
77-
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
78-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
79-
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
80-
Object obj = input.getField(entry.getValue());
81-
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
82-
row.setField(entry.getKey(), obj);
83-
}
84-
85-
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
86-
if (cacheInfo == null) {
87-
row.setField(entry.getKey(), null);
88-
} else {
89-
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
90-
}
91-
}
92-
93-
return row;
94-
}
95-
9683
@Override
9784
protected void initCache() throws SQLException {
9885
Map<String, List<Map<String, Object>>> newCache = Maps.newConcurrentMap();

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncSideInfo.java

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -72,57 +72,4 @@ public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInf
7272
LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
7373
}
7474

75-
76-
@Override
77-
public void dealOneEqualCon(SqlNode sqlNode, String sideTableName) {
78-
if (sqlNode.getKind() != SqlKind.EQUALS) {
79-
throw new RuntimeException("not equal operator.");
80-
}
81-
82-
SqlIdentifier left = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[0];
83-
SqlIdentifier right = (SqlIdentifier) ((SqlBasicCall) sqlNode).getOperands()[1];
84-
85-
String leftTableName = left.getComponent(0).getSimple();
86-
String leftField = left.getComponent(1).getSimple();
87-
88-
String rightTableName = right.getComponent(0).getSimple();
89-
String rightField = right.getComponent(1).getSimple();
90-
91-
if (leftTableName.equalsIgnoreCase(sideTableName)) {
92-
equalFieldList.add(leftField);
93-
int equalFieldIndex = -1;
94-
for (int i = 0; i < getFieldNames().length; i++) {
95-
String fieldName = getFieldNames()[i];
96-
if (fieldName.equalsIgnoreCase(rightField)) {
97-
equalFieldIndex = i;
98-
}
99-
}
100-
if (equalFieldIndex == -1) {
101-
throw new RuntimeException("can't deal equal field: " + sqlNode);
102-
}
103-
104-
equalValIndex.add(equalFieldIndex);
105-
106-
} else if (rightTableName.equalsIgnoreCase(sideTableName)) {
107-
108-
equalFieldList.add(rightField);
109-
int equalFieldIndex = -1;
110-
for (int i = 0; i < getFieldNames().length; i++) {
111-
String fieldName = getFieldNames()[i];
112-
if (fieldName.equalsIgnoreCase(leftField)) {
113-
equalFieldIndex = i;
114-
}
115-
}
116-
if (equalFieldIndex == -1) {
117-
throw new RuntimeException("can't deal equal field: " + sqlNode.toString());
118-
}
119-
120-
equalValIndex.add(equalFieldIndex);
121-
122-
} else {
123-
throw new RuntimeException("resolve equalFieldList error:" + sqlNode.toString());
124-
}
125-
126-
}
127-
12875
}

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5858
protected Integer readTimeoutMillis;
5959
protected Integer connectTimeoutMillis;
6060
protected Integer poolTimeoutMillis;
61+
protected Integer parallelism = 1;
62+
protected String registerTableName;
6163

6264
public CassandraSink() {
6365
// TO DO NOTHING
@@ -78,6 +80,8 @@ public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7880
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7981
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8082
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
83+
this.parallelism = cassandraTableInfo.getParallelism();
84+
this.registerTableName = cassandraTableInfo.getTableName();
8185
return this;
8286
}
8387

@@ -106,7 +110,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
106110

107111
CassandraOutputFormat outputFormat = builder.finish();
108112
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
109-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
113+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
114+
.setParallelism(parallelism)
115+
.name(registerTableName);
110116
return dataStreamSink;
111117
}
112118

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/**
2830
* local模式获取sql任务的执行计划
@@ -32,15 +34,19 @@
3234
*/
3335
public class GetPlan {
3436

37+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
38+
3539
public static String getExecutionPlan(String[] args) {
3640
try {
3741
long start = System.currentTimeMillis();
3842
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
43+
paramsInfo.setGetPlan(true);
3944
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4045
String executionPlan = env.getExecutionPlan();
4146
long end = System.currentTimeMillis();
4247
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4348
} catch (Exception e) {
49+
LOG.error("Get plan error", e);
4450
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4551
}
4652
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.parser.CreateFuncParser;
22-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
23-
import com.dtstack.flink.sql.parser.FlinkPlanner;
24-
import com.dtstack.flink.sql.parser.InsertSqlParser;
25-
import com.dtstack.flink.sql.parser.SqlParser;
26-
import com.dtstack.flink.sql.parser.SqlTree;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31-
import org.apache.flink.table.api.*;
32-
import org.apache.flink.table.api.java.StreamTableEnvironment;
33-
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
34-
import org.apache.flink.table.sinks.TableSink;
35-
3621
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
3722
import com.dtstack.flink.sql.enums.ClusterMode;
3823
import com.dtstack.flink.sql.enums.ECacheType;
@@ -42,8 +27,14 @@
4227
import com.dtstack.flink.sql.function.FunctionManager;
4328
import com.dtstack.flink.sql.option.OptionParser;
4429
import com.dtstack.flink.sql.option.Options;
45-
import com.dtstack.flink.sql.side.SideSqlExec;
30+
import com.dtstack.flink.sql.parser.CreateFuncParser;
31+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32+
import com.dtstack.flink.sql.parser.FlinkPlanner;
33+
import com.dtstack.flink.sql.parser.InsertSqlParser;
34+
import com.dtstack.flink.sql.parser.SqlParser;
35+
import com.dtstack.flink.sql.parser.SqlTree;
4636
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
37+
import com.dtstack.flink.sql.side.SideSqlExec;
4738
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4839
import com.dtstack.flink.sql.source.StreamSourceFactory;
4940
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
@@ -62,6 +53,17 @@
6253
import org.apache.calcite.sql.SqlNode;
6354
import org.apache.commons.io.Charsets;
6455
import org.apache.commons.lang3.StringUtils;
56+
import org.apache.flink.api.common.typeinfo.TypeInformation;
57+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
58+
import org.apache.flink.streaming.api.datastream.DataStream;
59+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
60+
import org.apache.flink.table.api.EnvironmentSettings;
61+
import org.apache.flink.table.api.Table;
62+
import org.apache.flink.table.api.TableConfig;
63+
import org.apache.flink.table.api.TableEnvironment;
64+
import org.apache.flink.table.api.java.StreamTableEnvironment;
65+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
66+
import org.apache.flink.table.sinks.TableSink;
6567
import org.slf4j.Logger;
6668
import org.slf4j.LoggerFactory;
6769

@@ -71,13 +73,13 @@
7173
import java.net.URLClassLoader;
7274
import java.net.URLDecoder;
7375
import java.time.ZoneId;
76+
import java.util.ArrayList;
7477
import java.util.Arrays;
7578
import java.util.List;
7679
import java.util.Map;
7780
import java.util.Properties;
7881
import java.util.Set;
7982
import java.util.TimeZone;
80-
import java.util.ArrayList;
8183

8284
/**
8385
* 任务执行时的流程方法
@@ -158,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
158160
Map<String, Table> registerTableCache = Maps.newHashMap();
159161

160162
//register udf
161-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
163+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
162164
//register table schema
163165
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
164166
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -243,13 +245,19 @@ private static void sqlTranslation(String localSqlPluginPath,
243245
}
244246
}
245247

246-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
248+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
247249
throws IllegalAccessException, InvocationTargetException {
248250
// udf和tableEnv须由同一个类加载器加载
249251
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
250252
URLClassLoader classLoader = null;
251253
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
252254
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
256+
if (getPlan) {
257+
URL[] urls = jarUrlList.toArray(new URL[0]);
258+
classLoader = URLClassLoader.newInstance(urls);
259+
}
260+
253261
//classloader
254262
if (classLoader == null) {
255263
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.util.RowDataComplete;
2424
import org.apache.calcite.sql.JoinType;
2525
import org.apache.flink.api.common.functions.RichFlatMapFunction;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.table.dataformat.BaseRow;
2829
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
@@ -34,6 +35,8 @@
3435
import java.sql.SQLException;
3536
import java.sql.Timestamp;
3637
import java.time.LocalDateTime;
38+
import java.util.Map;
39+
import java.util.TimeZone;
3740
import java.util.TimeZone;
3841
import java.util.concurrent.ScheduledExecutorService;
3942
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -99,6 +102,45 @@ protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out
99102
RowDataComplete.collectRow(out, row);
100103
}
101104

105+
@Override
106+
public Row fillData(Row input, Object sideInput) {
107+
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
108+
Row row = new Row(sideInfo.getOutFieldInfoList().size());
109+
110+
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
111+
// origin value
112+
Object obj = input.getField(entry.getValue());
113+
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
114+
row.setField(entry.getKey(), obj);
115+
}
116+
117+
for (Map.Entry<Integer, String> entry : sideInfo.getSideFieldNameIndex().entrySet()) {
118+
if (cacheInfo == null) {
119+
row.setField(entry.getKey(), null);
120+
} else {
121+
row.setField(entry.getKey(), cacheInfo.get(entry.getValue()));
122+
}
123+
}
124+
return row;
125+
}
126+
127+
/**
128+
* covert flink time attribute.Type information for indicating event or processing time.
129+
* However, it behaves like a regular SQL timestamp but is serialized as Long.
130+
*
131+
* @param entry
132+
* @param obj
133+
* @return
134+
*/
135+
protected Object dealTimeAttributeType(Class<? extends TypeInformation> entry, Object obj) {
136+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(entry);
137+
if (obj instanceof Timestamp && isTimeIndicatorTypeInfo) {
138+
//去除上一层OutputRowtimeProcessFunction 调用时区导致的影响
139+
obj = ((Timestamp) obj).getTime() + (long)LOCAL_TZ.getOffset(((Timestamp) obj).getTime());
140+
}
141+
return obj;
142+
}
143+
102144
@Override
103145
public void close() throws Exception {
104146
if (null != es && !es.isShutdown()) {

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void asyncInvoke(Row row, ResultFuture<BaseRow> resultFuture) throws Exce
197197
}
198198

199199
private Map<String, Object> parseInputParam(Row input) {
200-
Map<String, Object> inputParams = Maps.newHashMap();
200+
Map<String, Object> inputParams = Maps.newLinkedHashMap();
201201
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
202202
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
203203
Object equalObj = input.getField(conValIndex);

0 commit comments

Comments
 (0)