Skip to content

Commit 5553ee5

Browse files
committed
Merge branch '1.8_release_3.10.x' into 1.10_release_4.0.x
# Conflicts: # cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java # cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java # core/pom.xml # core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java # core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java # core/src/main/java/com/dtstack/flink/sql/side/BaseAllReqRow.java # hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java # hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java # kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java # kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java # kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java # mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java # mongo/mongo-side/mongo-async-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAsyncReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/AbstractRdbAllReqRow.java # rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java # redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideReqRow.java
2 parents 22a819a + b929fcc commit 5553ee5

File tree

39 files changed

+860
-486
lines changed

39 files changed

+860
-486
lines changed

cassandra/cassandra-side/cassandra-all-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAllReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.calcite.sql.JoinType;
3333
import org.apache.commons.collections.CollectionUtils;
3434
import org.apache.commons.lang3.StringUtils;
35-
import org.apache.flink.api.java.tuple.Tuple2;
3635
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3736
import org.apache.flink.table.dataformat.BaseRow;
3837
import org.apache.flink.types.Row;

core/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,43 @@
105105
<version>${guava.version}</version>
106106
</dependency>
107107

108+
<dependency>
109+
<groupId>org.apache.flink</groupId>
110+
<artifactId>flink-cep-scala_2.11</artifactId>
111+
<version>${flink.version}</version>
112+
</dependency>
113+
114+
<dependency>
115+
<groupId>org.apache.flink</groupId>
116+
<artifactId>flink-scala_2.11</artifactId>
117+
<version>${flink.version}</version>
118+
</dependency>
119+
120+
<dependency>
121+
<groupId>org.apache.flink</groupId>
122+
<artifactId>flink-yarn_2.11</artifactId>
123+
<version>${flink.version}</version>
124+
<exclusions>
125+
<exclusion>
126+
<groupId>org.apache.flink</groupId>
127+
<artifactId>flink-shaded-hadoop2</artifactId>
128+
</exclusion>
129+
</exclusions>
130+
</dependency>
131+
132+
<dependency>
133+
<groupId>org.apache.flink</groupId>
134+
<artifactId>flink-shaded-hadoop2</artifactId>
135+
<version>2.7.5-1.8.1</version>
136+
</dependency>
137+
138+
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-statebackend-rocksdb -->
139+
<dependency>
140+
<groupId>org.apache.flink</groupId>
141+
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
142+
<version>${flink.version}</version>
143+
</dependency>
144+
108145
<dependency>
109146
<groupId>junit</groupId>
110147
<artifactId>junit</artifactId>
@@ -117,6 +154,7 @@
117154
<artifactId>joda-time</artifactId>
118155
<version>2.5</version>
119156
</dependency>
157+
120158
</dependencies>
121159

122160
<build>

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
2728
import org.apache.flink.api.java.typeutils.RowTypeInfo;
2829
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -34,6 +35,7 @@
3435
import org.apache.flink.types.Row;
3536

3637
import java.io.IOException;
38+
import java.lang.reflect.Array;
3739
import java.sql.Date;
3840
import java.sql.Time;
3941
import java.sql.Timestamp;
@@ -43,7 +45,7 @@
4345

4446
/**
4547
* source data parse to json format
46-
*
48+
* <p>
4749
* Date: 2019/12/12
4850
* Company: www.dtstack.com
4951
*
@@ -53,53 +55,35 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
5355

5456
private final ObjectMapper objectMapper = new ObjectMapper();
5557

56-
private Map<String, String> rowAndFieldMapping;
57-
private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
58+
private final Map<String, String> rowAndFieldMapping;
59+
private final Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
5860

5961
private final String[] fieldNames;
6062
private final TypeInformation<?>[] fieldTypes;
61-
private List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
6263
private TypeInformation<Row> typeInfo;
64+
private final List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos;
65+
private final String charsetName;
6366

64-
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos) {
67+
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
68+
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
69+
String charsetName) {
6570
this.typeInfo = typeInfo;
6671
this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
6772
this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
6873
this.rowAndFieldMapping = rowAndFieldMapping;
6974
this.fieldExtraInfos = fieldExtraInfos;
75+
this.charsetName = charsetName;
7076
}
7177

7278
@Override
7379
public Row deserialize(byte[] message) throws IOException {
74-
JsonNode root = objectMapper.readTree(message);
80+
String decoderStr = new String(message, charsetName);
81+
JsonNode root = objectMapper.readTree(decoderStr);
7582
this.parseTree(root, null);
76-
Row row = new Row(fieldNames.length);
77-
78-
try {
79-
for (int i = 0; i < fieldNames.length; i++) {
80-
JsonNode node = getIgnoreCase(fieldNames[i]);
81-
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
82-
83-
if (node == null) {
84-
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
85-
throw new IllegalStateException("Failed to find field with name '"
86-
+ fieldNames[i] + "'.");
87-
} else {
88-
row.setField(i, null);
89-
}
90-
} else {
91-
// Read the value as specified type
92-
Object value = convert(node, fieldTypes[i]);
93-
row.setField(i, value);
94-
}
95-
}
96-
return row;
97-
} finally {
98-
nodeAndJsonNodeMapping.clear();
99-
}
83+
return convertTopRow();
10084
}
10185

102-
private void parseTree(JsonNode jsonNode, String prefix){
86+
private void parseTree(JsonNode jsonNode, String prefix) {
10387
if (jsonNode.isArray()) {
10488
ArrayNode array = (ArrayNode) jsonNode;
10589
for (int i = 0; i < array.size(); i++) {
@@ -118,15 +102,15 @@ private void parseTree(JsonNode jsonNode, String prefix){
118102
return;
119103
}
120104
Iterator<String> iterator = jsonNode.fieldNames();
121-
while (iterator.hasNext()){
105+
while (iterator.hasNext()) {
122106
String next = iterator.next();
123107
JsonNode child = jsonNode.get(next);
124108
String nodeKey = getNodeKey(prefix, next);
125109

126110
nodeAndJsonNodeMapping.put(nodeKey, child);
127-
if(child.isArray()){
111+
if (child.isArray()) {
128112
parseTree(child, nodeKey);
129-
}else {
113+
} else {
130114
parseTree(child, nodeKey);
131115
}
132116
}
@@ -137,8 +121,8 @@ private JsonNode getIgnoreCase(String key) {
137121
return nodeAndJsonNodeMapping.get(nodeMappingKey);
138122
}
139123

140-
private String getNodeKey(String prefix, String nodeName){
141-
if(Strings.isNullOrEmpty(prefix)){
124+
private String getNodeKey(String prefix, String nodeName) {
125+
if (Strings.isNullOrEmpty(prefix)) {
142126
return nodeName;
143127
}
144128
return prefix + "." + nodeName;
@@ -162,15 +146,19 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
162146
} else {
163147
return node.asText();
164148
}
165-
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
149+
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
166150
return Date.valueOf(node.asText());
167151
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
168152
// local zone
169153
return Time.valueOf(node.asText());
170154
} else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
171155
// local zone
172156
return Timestamp.valueOf(node.asText());
173-
} else {
157+
} else if (info instanceof RowTypeInfo) {
158+
return convertRow(node, (RowTypeInfo) info);
159+
} else if (info instanceof ObjectArrayTypeInfo) {
160+
return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo());
161+
} else {
174162
// for types that were specified without JSON schema
175163
// e.g. POJOs
176164
try {
@@ -181,6 +169,57 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
181169
}
182170
}
183171

172+
private Row convertTopRow() {
173+
Row row = new Row(fieldNames.length);
174+
try {
175+
for (int i = 0; i < fieldNames.length; i++) {
176+
JsonNode node = getIgnoreCase(fieldNames[i]);
177+
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
178+
179+
if (node == null) {
180+
if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
181+
throw new IllegalStateException("Failed to find field with name '"
182+
+ fieldNames[i] + "'.");
183+
} else {
184+
row.setField(i, null);
185+
}
186+
} else {
187+
// Read the value as specified type
188+
Object value = convert(node, fieldTypes[i]);
189+
row.setField(i, value);
190+
}
191+
}
192+
return row;
193+
} finally {
194+
nodeAndJsonNodeMapping.clear();
195+
}
196+
}
197+
198+
private Row convertRow(JsonNode node, RowTypeInfo info) {
199+
final String[] names = info.getFieldNames();
200+
final TypeInformation<?>[] types = info.getFieldTypes();
201+
202+
final Row row = new Row(names.length);
203+
for (int i = 0; i < names.length; i++) {
204+
final String name = names[i];
205+
final JsonNode subNode = node.get(name);
206+
if (subNode == null) {
207+
row.setField(i, null);
208+
} else {
209+
row.setField(i, convert(subNode, types[i]));
210+
}
211+
}
212+
213+
return row;
214+
}
215+
216+
private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType) {
217+
final Object[] array = (Object[]) Array.newInstance(elementType.getTypeClass(), node.size());
218+
for (int i = 0; i < node.size(); i++) {
219+
array[i] = convert(node.get(i), elementType);
220+
}
221+
return array;
222+
}
184223
@Override
185224
public TypeInformation<Row> getProducedType() {
186225
return typeInfo;

core/src/main/java/com/dtstack/flink/sql/parser/CreateTmpTableParser.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,7 @@
2121
package com.dtstack.flink.sql.parser;
2222

2323
import com.dtstack.flink.sql.util.DtStringUtil;
24-
import org.apache.calcite.sql.SqlBasicCall;
25-
import org.apache.calcite.sql.SqlJoin;
26-
import org.apache.calcite.sql.SqlKind;
27-
import org.apache.calcite.sql.SqlNode;
28-
import org.apache.calcite.sql.SqlSelect;
24+
import org.apache.calcite.sql.*;
2925
import com.google.common.collect.Lists;
3026
import java.util.List;
3127
import java.util.regex.Matcher;
@@ -160,6 +156,10 @@ private static void parseNode(SqlNode sqlNode, CreateTmpTableParser.SqlParserRes
160156
parseNode(unionRight, sqlParseResult);
161157
}
162158
break;
159+
case MATCH_RECOGNIZE:
160+
SqlMatchRecognize node = (SqlMatchRecognize) sqlNode;
161+
sqlParseResult.addSourceTable(node.getTableRef().toString());
162+
break;
163163
default:
164164
//do nothing
165165
break;

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void dealOneEqualCon(SqlNode sqlNode, String sideTableName){
164164
}
165165
}
166166
if(equalFieldIndex == -1){
167-
throw new RuntimeException("can't find equal field " + rightField);
167+
throw new RuntimeException("can't find equal field " + leftField);
168168
}
169169

170170
equalValIndex.add(equalFieldIndex);

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableParser.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
* limitations under the License.
1717
*/
1818

19-
20-
2119
package com.dtstack.flink.sql.table;
2220

2321
import com.dtstack.flink.sql.util.ClassUtil;
@@ -46,7 +44,7 @@ public abstract class AbstractTableParser {
4644
private static final String CHAR_TYPE_NO_LENGTH = "CHAR";
4745

4846
private static Pattern primaryKeyPattern = Pattern.compile("(?i)PRIMARY\\s+KEY\\s*\\((.*)\\)");
49-
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(\\w+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
47+
private static Pattern nestJsonFieldKeyPattern = Pattern.compile("(?i)((@*\\S+\\.)*\\S+)\\s+(.+)\\s+AS\\s+(\\w+)(\\s+NOT\\s+NULL)?$");
5048
private static Pattern physicalFieldFunPattern = Pattern.compile("\\w+\\((\\w+)\\)$");
5149
private static Pattern charTypePattern = Pattern.compile("(?i)CHAR\\((\\d*)\\)$");
5250

@@ -84,32 +82,32 @@ public boolean dealKeyPattern(String fieldRow, AbstractTableInfo tableInfo){
8482
return false;
8583
}
8684

87-
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
85+
public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) {
8886

8987
List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ',');
90-
for(String fieldRow : fieldRows){
88+
89+
for (String fieldRow : fieldRows) {
9190
fieldRow = fieldRow.trim();
9291

93-
if(StringUtils.isBlank(fieldRow)){
92+
if (StringUtils.isBlank(fieldRow)) {
9493
throw new RuntimeException(String.format("table [%s],exists field empty.", tableInfo.getName()));
9594
}
9695

97-
String[] filedInfoArr = fieldRow.split("\\s+");
98-
if(filedInfoArr.length < 2 ){
99-
throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow));
100-
}
96+
String[] fieldInfoArr = fieldRow.split("\\s+");
97+
98+
String errorMsg = String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow);
99+
Preconditions.checkState(fieldInfoArr.length >= 2, errorMsg);
101100

102101
boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo);
103-
if(isMatcherKey){
102+
if (isMatcherKey) {
104103
continue;
105104
}
106105

107106
//Compatible situation may arise in space in the fieldName
108-
String[] filedNameArr = new String[filedInfoArr.length - 1];
109-
System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1);
107+
String[] filedNameArr = new String[fieldInfoArr.length - 1];
108+
System.arraycopy(fieldInfoArr, 0, filedNameArr, 0, fieldInfoArr.length - 1);
110109
String fieldName = String.join(" ", filedNameArr);
111-
String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim();
112-
110+
String fieldType = fieldInfoArr[fieldInfoArr.length - 1 ].trim();
113111

114112
Class fieldClass = null;
115113
AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null;
@@ -123,7 +121,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
123121
fieldClass = dbTypeConvertToJavaType(fieldType);
124122
}
125123

126-
tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]);
124+
tableInfo.addPhysicalMappings(fieldInfoArr[0], fieldInfoArr[0]);
127125
tableInfo.addField(fieldName);
128126
tableInfo.addFieldClass(fieldClass);
129127
tableInfo.addFieldType(fieldType);
@@ -133,7 +131,7 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo){
133131
tableInfo.finish();
134132
}
135133

136-
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo){
134+
public void dealPrimaryKey(Matcher matcher, AbstractTableInfo tableInfo) {
137135
String primaryFields = matcher.group(1).trim();
138136
String[] splitArry = primaryFields.split(",");
139137
List<String> primaryKes = Lists.newArrayList(splitArry);
@@ -172,4 +170,5 @@ protected void addParserHandler(String parserName, Pattern pattern, ITableFieldD
172170
patternMap.put(parserName, pattern);
173171
handlerMap.put(parserName, handler);
174172
}
173+
175174
}

0 commit comments

Comments
 (0)