Skip to content

Commit d886e68

Browse files
committed
[hotfix-32959][core][launcher]修复双层group by缺少撤回数据,导致结算结果错误,主要是将维表中的row类型改成baserow
1 parent 42f99ab commit d886e68

File tree

30 files changed

+920
-283
lines changed

30 files changed

+920
-283
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818

1919
package com.dtstack.flink.sql.side.cassandra;
2020

21-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
22-
import org.apache.flink.types.Row;
23-
import org.apache.flink.util.Collector;
24-
2521
import com.datastax.driver.core.Cluster;
2622
import com.datastax.driver.core.ConsistencyLevel;
2723
import com.datastax.driver.core.HostDistance;
@@ -43,7 +39,10 @@
4339
import org.apache.calcite.sql.JoinType;
4440
import org.apache.commons.collections.CollectionUtils;
4541
import org.apache.commons.lang3.StringUtils;
42+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4643
import org.apache.flink.table.dataformat.BaseRow;
44+
import org.apache.flink.table.dataformat.GenericRow;
45+
import org.apache.flink.util.Collector;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -103,13 +102,14 @@ protected void reloadCache() {
103102

104103

105104
@Override
106-
public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
105+
public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
106+
GenericRow genericRow = (GenericRow) input;
107107
List<Object> inputParams = Lists.newArrayList();
108108
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
109-
Object equalObj = input.getField(conValIndex);
109+
Object equalObj = genericRow.getField(conValIndex);
110110
if (equalObj == null) {
111111
if (sideInfo.getJoinType() == JoinType.LEFT) {
112-
Row row = fillData(input, null);
112+
BaseRow row = fillData(input, null);
113113
RowDataComplete.collectRow(out, row);
114114
}
115115
return;
@@ -122,7 +122,7 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
122122
List<Map<String, Object>> cacheList = cacheRef.get().get(key);
123123
if (CollectionUtils.isEmpty(cacheList)) {
124124
if (sideInfo.getJoinType() == JoinType.LEFT) {
125-
Row row = fillData(input, null);
125+
BaseRow row = fillData(input, null);
126126
RowDataComplete.collectRow(out, row);
127127
} else {
128128
return;
@@ -132,7 +132,7 @@ public void flatMap(Row input, Collector<BaseRow> out) throws Exception {
132132
}
133133

134134
for (Map<String, Object> one : cacheList) {
135-
Row row = fillData(input, one);
135+
BaseRow row = fillData(input, one);
136136
RowDataComplete.collectRow(out, row);
137137
}
138138

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,22 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22-
import com.datastax.driver.core.*;
22+
import com.datastax.driver.core.Cluster;
23+
import com.datastax.driver.core.ConsistencyLevel;
24+
import com.datastax.driver.core.HostDistance;
25+
import com.datastax.driver.core.PoolingOptions;
26+
import com.datastax.driver.core.QueryOptions;
27+
import com.datastax.driver.core.ResultSet;
28+
import com.datastax.driver.core.Session;
29+
import com.datastax.driver.core.SocketOptions;
2330
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
2431
import com.datastax.driver.core.policies.RetryPolicy;
2532
import com.dtstack.flink.sql.enums.ECacheContentType;
26-
import com.dtstack.flink.sql.side.*;
33+
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
34+
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
35+
import com.dtstack.flink.sql.side.CacheMissVal;
36+
import com.dtstack.flink.sql.side.FieldInfo;
37+
import com.dtstack.flink.sql.side.JoinInfo;
2738
import com.dtstack.flink.sql.side.cache.CacheObj;
2839
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
2940
import com.dtstack.flink.sql.util.RowDataComplete;
@@ -38,7 +49,7 @@
3849
import org.apache.flink.configuration.Configuration;
3950
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4051
import org.apache.flink.table.dataformat.BaseRow;
41-
import org.apache.flink.types.Row;
52+
import org.apache.flink.table.dataformat.GenericRow;
4253
import org.slf4j.Logger;
4354
import org.slf4j.LoggerFactory;
4455

@@ -149,7 +160,7 @@ private void connCassandraDB(CassandraSideTableInfo tableInfo) {
149160
}
150161

151162
@Override
152-
public void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
163+
public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception {
153164

154165
String key = buildCacheKey(inputParams);
155166
//connect Cassandra
@@ -180,9 +191,9 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
180191
cluster.closeAsync();
181192
if (rows.size() > 0) {
182193
List<com.datastax.driver.core.Row> cacheContent = Lists.newArrayList();
183-
List<Row> rowList = Lists.newArrayList();
194+
List<BaseRow> rowList = Lists.newArrayList();
184195
for (com.datastax.driver.core.Row line : rows) {
185-
Row row = fillData(input, line);
196+
BaseRow row = fillData(input, line);
186197
if (openCache()) {
187198
cacheContent.add(line);
188199
}
@@ -230,11 +241,13 @@ private String buildWhereCondition(Map<String, Object> inputParams){
230241
}
231242

232243
@Override
233-
public Row fillData(Row input, Object line) {
244+
public BaseRow fillData(BaseRow input, Object line) {
245+
GenericRow genericRow = (GenericRow) input;
234246
com.datastax.driver.core.Row rowArray = (com.datastax.driver.core.Row) line;
235-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
247+
GenericRow row = new GenericRow(sideInfo.getOutFieldInfoList().size());
248+
row.setHeader(genericRow.getHeader());
236249
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
237-
Object obj = input.getField(entry.getValue());
250+
Object obj = genericRow.getField(entry.getValue());
238251
obj = convertTimeIndictorTypeInfo(entry.getValue(), obj);
239252
row.setField(entry.getKey(), obj);
240253
}

core/pom.xml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
<properties>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1919
<project.package.name>core</project.package.name>
20-
<calcite.server.version>1.16.0</calcite.server.version>
2120
<jackson.version>2.7.9</jackson.version>
2221
<guava.version>19.0</guava.version>
2322
<logger.tool.version>1.0.0-SNAPSHOT</logger.tool.version>
@@ -79,20 +78,6 @@
7978
<version>${flink.version}</version>
8079
</dependency>
8180

82-
83-
<dependency>
84-
<groupId>org.apache.calcite</groupId>
85-
<artifactId>calcite-server</artifactId>
86-
<!-- When updating the Calcite version, make sure to update the dependency exclusions -->
87-
<version>${calcite.server.version}</version>
88-
<exclusions>
89-
<exclusion>
90-
<artifactId>jackson-databind</artifactId>
91-
<groupId>com.fasterxml.jackson.core</groupId>
92-
</exclusion>
93-
</exclusions>
94-
</dependency>
95-
9681
<dependency>
9782
<groupId>com.fasterxml.jackson.core</groupId>
9883
<artifactId>jackson-databind</artifactId>

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public static Set<URL> registerTable(SqlTree sqlTree,
323323
}
324324

325325
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
326-
tableEnv.registerTable(tableInfo.getName(), regTable);
326+
tableEnv.createTemporaryView(tableInfo.getName(), regTable);
327327
if (LOG.isInfoEnabled()) {
328328
LOG.info("registe table {} success.", tableInfo.getName());
329329
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
import org.apache.flink.api.common.typeinfo.TypeInformation;
2727
import org.apache.flink.configuration.Configuration;
2828
import org.apache.flink.table.dataformat.BaseRow;
29+
import org.apache.flink.table.dataformat.GenericRow;
2930
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
30-
import org.apache.flink.types.Row;
3131
import org.apache.flink.util.Collector;
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
@@ -37,7 +37,6 @@
3737
import java.time.LocalDateTime;
3838
import java.util.Map;
3939
import java.util.TimeZone;
40-
import java.util.TimeZone;
4140
import java.util.concurrent.ScheduledExecutorService;
4241
import java.util.concurrent.ScheduledThreadPoolExecutor;
4342
import java.util.concurrent.TimeUnit;
@@ -50,7 +49,7 @@
5049
* @author xuchao
5150
*/
5251

53-
public abstract class BaseAllReqRow extends RichFlatMapFunction<Row, BaseRow> implements ISideReqRow {
52+
public abstract class BaseAllReqRow extends RichFlatMapFunction<BaseRow, BaseRow> implements ISideReqRow {
5453

5554
private static final Logger LOG = LoggerFactory.getLogger(BaseAllReqRow.class);
5655

@@ -94,22 +93,23 @@ protected Object convertTimeIndictorTypeInfo(Integer index, Object obj) {
9493
return obj;
9594
}
9695

97-
protected void sendOutputRow(Row value, Object sideInput, Collector<BaseRow> out) {
96+
protected void sendOutputRow(BaseRow value, Object sideInput, Collector<BaseRow> out) {
9897
if (sideInput == null && sideInfo.getJoinType() != JoinType.LEFT) {
9998
return;
10099
}
101-
Row row = fillData(value, sideInput);
100+
BaseRow row = fillData(value, sideInput);
102101
RowDataComplete.collectRow(out, row);
103102
}
104103

105104
@Override
106-
public Row fillData(Row input, Object sideInput) {
105+
public BaseRow fillData(BaseRow input, Object sideInput) {
106+
GenericRow genericRow = (GenericRow) input;
107107
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
108-
Row row = new Row(sideInfo.getOutFieldInfoList().size());
109-
108+
GenericRow row = new GenericRow(sideInfo.getOutFieldInfoList().size());
109+
row.setHeader(genericRow.getHeader());
110110
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {
111111
// origin value
112-
Object obj = input.getField(entry.getValue());
112+
Object obj = genericRow.getField(entry.getValue());
113113
obj = dealTimeAttributeType(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass(), obj);
114114
row.setField(entry.getKey(), obj);
115115
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@
3838
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
3939
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
4040
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
41-
import org.apache.flink.table.api.DataTypes;
4241
import org.apache.flink.table.dataformat.BaseRow;
42+
import org.apache.flink.table.dataformat.GenericRow;
4343
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
44-
import org.apache.flink.types.Row;
4544
import org.slf4j.Logger;
4645
import org.slf4j.LoggerFactory;
4746

@@ -64,7 +63,7 @@
6463
* @author xuchao
6564
*/
6665

67-
public abstract class BaseAsyncReqRow extends RichAsyncFunction<Row, BaseRow> implements ISideReqRow {
66+
public abstract class BaseAsyncReqRow extends RichAsyncFunction<BaseRow, BaseRow> implements ISideReqRow {
6867
private static final Logger LOG = LoggerFactory.getLogger(BaseAsyncReqRow.class);
6968
private static final long serialVersionUID = 2098635244857937717L;
7069
private RuntimeContext runtimeContext;
@@ -137,11 +136,11 @@ protected boolean openCache() {
137136
return sideInfo.getSideCache() != null;
138137
}
139138

140-
protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
139+
protected void dealMissKey(BaseRow input, ResultFuture<BaseRow> resultFuture) {
141140
if (sideInfo.getJoinType() == JoinType.LEFT) {
142141
//Reserved left table data
143142
try {
144-
Row row = fillData(input, null);
143+
BaseRow row = fillData(input, null);
145144
RowDataComplete.completeRow(resultFuture, row);
146145
} catch (Exception e) {
147146
dealFillDataError(input, resultFuture, e);
@@ -158,7 +157,7 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
158157
}
159158

160159
@Override
161-
public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Exception {
160+
public void timeout(BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception {
162161

163162
if (timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0) {
164163
LOG.info("Async function call has timed out. input:{}, timeOutNum:{}", input.toString(), timeOutNum);
@@ -175,32 +174,32 @@ public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Except
175174
resultFuture.complete(Collections.EMPTY_LIST);
176175
}
177176

178-
protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture)
177+
protected void preInvoke(BaseRow input, ResultFuture<BaseRow> resultFuture)
179178
throws InvocationTargetException, IllegalAccessException {
180179
registerTimerAndAddToHandler(input, resultFuture);
181180
}
182181

183182
@Override
184-
public void asyncInvoke(Row row, ResultFuture<BaseRow> resultFuture) throws Exception {
185-
Row input = Row.copy(row);
186-
preInvoke(input, resultFuture);
187-
Map<String, Object> inputParams = parseInputParam(input);
183+
public void asyncInvoke(BaseRow row, ResultFuture<BaseRow> resultFuture) throws Exception {
184+
preInvoke(row, resultFuture);
185+
Map<String, Object> inputParams = parseInputParam(row);
188186
if (MapUtils.isEmpty(inputParams)) {
189-
dealMissKey(input, resultFuture);
187+
dealMissKey(row, resultFuture);
190188
return;
191189
}
192190
if (isUseCache(inputParams)) {
193-
invokeWithCache(inputParams, input, resultFuture);
191+
invokeWithCache(inputParams, row, resultFuture);
194192
return;
195193
}
196-
handleAsyncInvoke(inputParams, input, resultFuture);
194+
handleAsyncInvoke(inputParams, row, resultFuture);
197195
}
198196

199-
private Map<String, Object> parseInputParam(Row input) {
197+
private Map<String, Object> parseInputParam(BaseRow input) {
198+
GenericRow genericRow = (GenericRow) input;
200199
Map<String, Object> inputParams = Maps.newLinkedHashMap();
201200
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
202201
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
203-
Object equalObj = input.getField(conValIndex);
202+
Object equalObj = genericRow.getField(conValIndex);
204203
if (equalObj == null) {
205204
return inputParams;
206205
}
@@ -214,7 +213,7 @@ protected boolean isUseCache(Map<String, Object> inputParams) {
214213
return openCache() && getFromCache(buildCacheKey(inputParams)) != null;
215214
}
216215

217-
private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) {
216+
private void invokeWithCache(Map<String, Object> inputParams, BaseRow input, ResultFuture<BaseRow> resultFuture) {
218217
if (openCache()) {
219218
CacheObj val = getFromCache(buildCacheKey(inputParams));
220219
if (val != null) {
@@ -223,16 +222,16 @@ private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultF
223222
return;
224223
} else if (ECacheContentType.SingleLine == val.getType()) {
225224
try {
226-
Row row = fillData(input, val.getContent());
225+
BaseRow row = fillData(input, val.getContent());
227226
RowDataComplete.completeRow(resultFuture, row);
228227
} catch (Exception e) {
229228
dealFillDataError(input, resultFuture, e);
230229
}
231230
} else if (ECacheContentType.MultiLine == val.getType()) {
232231
try {
233-
List<Row> rowList = Lists.newArrayList();
232+
List<BaseRow> rowList = Lists.newArrayList();
234233
for (Object one : (List) val.getContent()) {
235-
Row row = fillData(input, one);
234+
BaseRow row = fillData(input, one);
236235
rowList.add(row);
237236
}
238237
RowDataComplete.completeRow(resultFuture,rowList);
@@ -247,22 +246,22 @@ private void invokeWithCache(Map<String, Object> inputParams, Row input, ResultF
247246
}
248247
}
249248

250-
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, Row input, ResultFuture<BaseRow> resultFuture) throws Exception;
249+
public abstract void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, ResultFuture<BaseRow> resultFuture) throws Exception;
251250

252251
public abstract String buildCacheKey(Map<String, Object> inputParams);
253252

254253
private ProcessingTimeService getProcessingTimeService() {
255254
return ((StreamingRuntimeContext) this.runtimeContext).getProcessingTimeService();
256255
}
257256

258-
protected ScheduledFuture<?> registerTimer(Row input, ResultFuture<BaseRow> resultFuture) {
257+
protected ScheduledFuture<?> registerTimer(BaseRow input, ResultFuture<BaseRow> resultFuture) {
259258
long timeoutTimestamp = sideInfo.getSideTableInfo().getAsyncTimeout() + getProcessingTimeService().getCurrentProcessingTime();
260259
return getProcessingTimeService().registerTimer(
261260
timeoutTimestamp,
262261
timestamp -> timeout(input, resultFuture));
263262
}
264263

265-
protected void registerTimerAndAddToHandler(Row input, ResultFuture<BaseRow> resultFuture)
264+
protected void registerTimerAndAddToHandler(BaseRow input, ResultFuture<BaseRow> resultFuture)
266265
throws InvocationTargetException, IllegalAccessException {
267266
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
268267
// resultFuture 是ResultHandler 的实例
@@ -272,7 +271,7 @@ protected void registerTimerAndAddToHandler(Row input, ResultFuture<BaseRow> res
272271
}
273272

274273

275-
protected void dealFillDataError(Row input, ResultFuture<BaseRow> resultFuture, Throwable e) {
274+
protected void dealFillDataError(BaseRow input, ResultFuture<BaseRow> resultFuture, Throwable e) {
276275
parseErrorRecords.inc();
277276
if (parseErrorRecords.getCount() > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
278277
LOG.info("dealFillDataError", e);

core/src/main/java/com/dtstack/flink/sql/side/ISideReqRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.side;
2020

21-
import org.apache.flink.types.Row;
21+
import org.apache.flink.table.dataformat.BaseRow;
2222

2323
/**
2424
*
@@ -28,6 +28,6 @@
2828
*/
2929
public interface ISideReqRow {
3030

31-
Row fillData(Row input, Object sideInput);
31+
BaseRow fillData(BaseRow input, Object sideInput);
3232

3333
}

0 commit comments

Comments
 (0)