Skip to content

Commit b74452d

Browse files
committed
[hotfix-32959][core][launcher]修复双层group by缺少撤回数据,导致结算结果错误,主要是将维表中的row类型改成baserow,将baserow再次进行转换。
1 parent d886e68 commit b74452d

File tree

19 files changed

+88
-40
lines changed

19 files changed

+88
-40
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
110110
if (equalObj == null) {
111111
if (sideInfo.getJoinType() == JoinType.LEFT) {
112112
BaseRow row = fillData(input, null);
113-
RowDataComplete.collectRow(out, row);
113+
RowDataComplete.collectBaseRow(out, row);
114114
}
115115
return;
116116
}
@@ -123,7 +123,7 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
123123
if (CollectionUtils.isEmpty(cacheList)) {
124124
if (sideInfo.getJoinType() == JoinType.LEFT) {
125125
BaseRow row = fillData(input, null);
126-
RowDataComplete.collectRow(out, row);
126+
RowDataComplete.collectBaseRow(out, row);
127127
} else {
128128
return;
129129
}
@@ -133,7 +133,7 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
133133

134134
for (Map<String, Object> one : cacheList) {
135135
BaseRow row = fillData(input, one);
136-
RowDataComplete.collectRow(out, row);
136+
RowDataComplete.collectBaseRow(out, row);
137137
}
138138

139139
}

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
199199
}
200200
rowList.add(row);
201201
}
202-
RowDataComplete.completeRow(resultFuture, rowList);
202+
RowDataComplete.completeBaseRow(resultFuture, rowList);
203203
if (openCache()) {
204204
putCache(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
205205
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ protected void sendOutputRow(BaseRow value, Object sideInput, Collector<BaseRow>
9898
return;
9999
}
100100
BaseRow row = fillData(value, sideInput);
101-
RowDataComplete.collectRow(out, row);
101+
RowDataComplete.collectBaseRow(out, row);
102102
}
103103

104104
@Override

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ protected void dealMissKey(BaseRow input, ResultFuture<BaseRow> resultFuture) {
141141
//Reserved left table data
142142
try {
143143
BaseRow row = fillData(input, null);
144-
RowDataComplete.completeRow(resultFuture, row);
144+
RowDataComplete.completeBaseRow(resultFuture, row);
145145
} catch (Exception e) {
146146
dealFillDataError(input, resultFuture, e);
147147
}
@@ -223,7 +223,7 @@ private void invokeWithCache(Map<String, Object> inputParams, BaseRow input, Res
223223
} else if (ECacheContentType.SingleLine == val.getType()) {
224224
try {
225225
BaseRow row = fillData(input, val.getContent());
226-
RowDataComplete.completeRow(resultFuture, row);
226+
RowDataComplete.completeBaseRow(resultFuture, row);
227227
} catch (Exception e) {
228228
dealFillDataError(input, resultFuture, e);
229229
}
@@ -234,7 +234,7 @@ private void invokeWithCache(Map<String, Object> inputParams, BaseRow input, Res
234234
BaseRow row = fillData(input, one);
235235
rowList.add(row);
236236
}
237-
RowDataComplete.completeRow(resultFuture,rowList);
237+
RowDataComplete.completeBaseRow(resultFuture,rowList);
238238
} catch (Exception e) {
239239
dealFillDataError(input, resultFuture, e);
240240
}

core/src/main/java/com/dtstack/flink/sql/util/RowDataComplete.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,19 @@
3434
*/
3535
public class RowDataComplete {
3636

37-
public static void completeRow(ResultFuture<BaseRow> resultFuture, BaseRow row) {
38-
resultFuture.complete(Collections.singleton(row));
37+
public static void completeBaseRow(ResultFuture<BaseRow> resultFuture, BaseRow row) {
38+
resultFuture.complete(Collections.singleton(RowDataConvert.convertToBaseRow(row)));
3939
}
4040

41-
public static void completeRow(ResultFuture<BaseRow> resultFuture, List<BaseRow> rowList) {
42-
41+
public static void completeBaseRow(ResultFuture<BaseRow> resultFuture, List<BaseRow> rowList) {
4342
List<BaseRow> baseRowList = Lists.newArrayList();
4443
for (BaseRow baseRow : rowList) {
45-
baseRowList.add(baseRow);
44+
baseRowList.add(RowDataConvert.convertToBaseRow(baseRow));
4645
}
47-
4846
resultFuture.complete(baseRowList);
4947
}
5048

51-
public static void collectRow(Collector<BaseRow> out, BaseRow row) {
52-
out.collect(row);
49+
public static void collectBaseRow(Collector<BaseRow> out, BaseRow row) {
50+
out.collect(RowDataConvert.convertToBaseRow(row));
5351
}
54-
5552
}

core/src/main/java/com/dtstack/flink/sql/util/RowDataConvert.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,63 @@ public static BaseRow convertToBaseRow(Tuple2<Boolean, Row> input) {
9191
}
9292
}
9393

94-
if(input.f0){
94+
if (input.f0) {
9595
BaseRowUtil.setAccumulate(genericRow);
9696
} else {
9797
BaseRowUtil.setRetract(genericRow);
9898
}
9999

100100
return genericRow;
101101
}
102+
103+
104+
public static BaseRow convertToBaseRow(BaseRow input) {
105+
GenericRow row = (GenericRow) input;
106+
int length = row.getArity();
107+
GenericRow genericRow = new GenericRow(length);
108+
genericRow.setHeader(row.getHeader());
109+
for (int i = 0; i < length; i++) {
110+
if (row.getField(i) == null) {
111+
genericRow.setField(i, row.getField(i));
112+
} else if (row.getField(i) instanceof String) {
113+
genericRow.setField(i, BinaryString.fromString((String) row.getField(i)));
114+
} else if (row.getField(i) instanceof Timestamp) {
115+
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i)));
116+
genericRow.setField(i, newTimestamp);
117+
} else if (row.getField(i) instanceof LocalDateTime) {
118+
genericRow.setField(i, SqlTimestamp.fromLocalDateTime((LocalDateTime) row.getField(i)));
119+
} else if (row.getField(i) instanceof Time) {
120+
genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i)));
121+
} else if (row.getField(i) instanceof Double || row.getField(i).getClass().equals(double.class)) {
122+
genericRow.setField(i, DataFormatConverters.DoubleConverter.INSTANCE.toInternal((Double) row.getField(i)));
123+
} else if (row.getField(i) instanceof Float || row.getField(i).getClass().equals(float.class)) {
124+
genericRow.setField(i, DataFormatConverters.FloatConverter.INSTANCE.toInternal((Float) row.getField(i)));
125+
} else if (row.getField(i) instanceof Long || row.getField(i).getClass().equals(long.class)) {
126+
genericRow.setField(i, DataFormatConverters.LongConverter.INSTANCE.toInternal((Long) row.getField(i)));
127+
} else if (row.getField(i) instanceof Boolean || row.getField(i).getClass().equals(boolean.class)) {
128+
genericRow.setField(i, DataFormatConverters.BooleanConverter.INSTANCE.toInternal((Boolean) row.getField(i)));
129+
} else if (row.getField(i) instanceof Integer || row.getField(i).getClass().equals(int.class)) {
130+
genericRow.setField(i, DataFormatConverters.IntConverter.INSTANCE.toInternal((Integer) row.getField(i)));
131+
} else if (row.getField(i) instanceof Short || row.getField(i).getClass().equals(short.class)) {
132+
genericRow.setField(i, DataFormatConverters.ShortConverter.INSTANCE.toInternal((Short) row.getField(i)));
133+
} else if (row.getField(i) instanceof Byte || row.getField(i).getClass().equals(byte.class)) {
134+
genericRow.setField(i, DataFormatConverters.ByteConverter.INSTANCE.toInternal((Byte) row.getField(i)));
135+
} else if (row.getField(i) instanceof Date) {
136+
genericRow.setField(i, DataFormatConverters.DateConverter.INSTANCE.toInternal((Date) row.getField(i)));
137+
} else if (row.getField(i) instanceof LocalDate) {
138+
genericRow.setField(i, DataFormatConverters.LocalDateConverter.INSTANCE.toInternal((LocalDate) row.getField(i)));
139+
} else if (row.getField(i) instanceof LocalTime) {
140+
genericRow.setField(i, DataFormatConverters.LocalTimeConverter.INSTANCE.toInternal((LocalTime) row.getField(i)));
141+
} else if (row.getField(i) instanceof BigDecimal) {
142+
BigDecimal tempDecimal = (BigDecimal) row.getField(i);
143+
int precision = ((BigDecimal) row.getField(i)).precision();
144+
int scale = ((BigDecimal) row.getField(i)).scale();
145+
DataFormatConverters.DecimalConverter decimalConverter = new DataFormatConverters.DecimalConverter(precision, scale);
146+
genericRow.setField(i, decimalConverter.toExternal(Decimal.fromBigDecimal(tempDecimal, precision, scale)));
147+
} else {
148+
genericRow.setField(i, row.getField(i));
149+
}
150+
}
151+
return genericRow;
152+
}
102153
}

elasticsearch6/elasticsearch6-side/elasticsearch6-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/Elasticsearch6AsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void onResponse(SearchResponse searchResponse) {
125125
searchHits = searchResponse.getHits().getHits();
126126
}
127127
dealCacheData(key, CacheObj.buildCacheObj(ECacheContentType.MultiLine, cacheContent));
128-
RowDataComplete.completeRow(resultFuture, rowList);
128+
RowDataComplete.completeBaseRow(resultFuture, rowList);
129129
} catch (Exception e) {
130130
dealFillDataError(input, resultFuture, e);
131131
} finally {

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
153153
if (equalObj == null) {
154154
if (sideInfo.getJoinType() == JoinType.LEFT) {
155155
BaseRow data = fillData(input, null);
156-
RowDataComplete.collectRow(out, data);
156+
RowDataComplete.collectBaseRow(out, data);
157157
}
158158
return;
159159
}
@@ -171,13 +171,13 @@ public void flatMap(BaseRow input, Collector<BaseRow> out) throws Exception {
171171
if (entry.getKey().startsWith(rowKeyStr)) {
172172
cacheList = cacheRef.get().get(entry.getKey());
173173
BaseRow row = fillData(input, cacheList);
174-
RowDataComplete.collectRow(out, row);
174+
RowDataComplete.collectBaseRow(out, row);
175175
}
176176
}
177177
} else {
178178
cacheList = cacheRef.get().get(rowKeyStr);
179179
BaseRow row = fillData(input, cacheList);
180-
RowDataComplete.collectRow(out, row);
180+
RowDataComplete.collectBaseRow(out, row);
181181
}
182182

183183
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ protected void dealMissKey(BaseRow input, ResultFuture<BaseRow> resultFuture) {
7878
try {
7979
//保留left 表数据
8080
BaseRow row = fillData(input, null);
81-
RowDataComplete.completeRow(resultFuture, row);
81+
RowDataComplete.completeBaseRow(resultFuture, row);
8282
} catch (Exception e) {
8383
resultFuture.completeExceptionally(e);
8484
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
136136
}
137137

138138
if (rowList.size() > 0){
139-
RowDataComplete.completeRow(resultFuture, rowList);
139+
RowDataComplete.completeBaseRow(resultFuture, rowList);
140140
}
141141

142142
if(openCache){

0 commit comments

Comments
 (0)