Skip to content

Commit 6943cff

Browse files
committed
fix
1 parent 5a37f82 commit 6943cff

File tree

58 files changed

+1639
-1262
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1639
-1262
lines changed

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import com.datastax.driver.core.SocketOptions;
4949
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
5050
import com.datastax.driver.core.policies.RetryPolicy;
51-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
51+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
5252
import org.apache.flink.api.common.typeinfo.TypeInformation;
5353
import org.apache.flink.api.java.tuple.Tuple;
5454
import org.apache.flink.api.java.tuple.Tuple2;
@@ -69,7 +69,7 @@
6969
* @see Tuple
7070
* @see DriverManager
7171
*/
72-
public class CassandraOutputFormat extends MetricOutputFormat {
72+
public class CassandraOutputFormat extends DtRichOutputFormat {
7373
private static final long serialVersionUID = -7994311331389155692L;
7474

7575
private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);

console/console-sink/src/main/java/com/dtstack/flink/sql/sink/console/ConsoleOutputFormat.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package com.dtstack.flink.sql.sink.console;
2020

21-
import com.dtstack.flink.sql.sink.MetricOutputFormat;
21+
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
2222
import com.dtstack.flink.sql.sink.console.table.TablePrintUtil;
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,7 +37,7 @@
3737
*
3838
* @author xuqianjin
3939
*/
40-
public class ConsoleOutputFormat extends MetricOutputFormat {
40+
public class ConsoleOutputFormat extends DtRichOutputFormat {
4141

4242
private static final Logger LOG = LoggerFactory.getLogger(ConsoleOutputFormat.class);
4343

core/src/main/java/com/dtstack/flink/sql/format/AbsDeserialization.java

Lines changed: 0 additions & 110 deletions
This file was deleted.

core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DeserializationMetricWrapper extends AbstractDeserializationSchema<
4545

4646
private static int dataPrintFrequency = 1000;
4747

48-
private transient DeserializationSchema<Row> deserializationSchema;
48+
private DeserializationSchema<Row> deserializationSchema;
4949

5050
private transient RuntimeContext runtimeContext;
5151

core/src/main/java/com/dtstack/flink/sql/format/FormatType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66
* create: 2019/12/24
77
*/
88
public enum FormatType {
9-
JSON, AVRO, CSV
9+
DT_NEST, JSON, AVRO, CSV
1010
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
///*
2+
// * Licensed to the Apache Software Foundation (ASF) under one
3+
// * or more contributor license agreements. See the NOTICE file
4+
// * distributed with this work for additional information
5+
// * regarding copyright ownership. The ASF licenses this file
6+
// * to you under the Apache License, Version 2.0 (the
7+
// * "License"); you may not use this file except in compliance
8+
// * with the License. You may obtain a copy of the License at
9+
// *
10+
// * http://www.apache.org/licenses/LICENSE-2.0
11+
// *
12+
// * Unless required by applicable law or agreed to in writing, software
13+
// * distributed under the License is distributed on an "AS IS" BASIS,
14+
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// * See the License for the specific language governing permissions and
16+
// * limitations under the License.
17+
// */
18+
//
19+
//package com.dtstack.flink.sql.format;
20+
//
21+
//import com.dtstack.flink.sql.table.TableInfo;
22+
//import com.google.common.base.Strings;
23+
//import com.google.common.collect.Maps;
24+
//import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
//import org.apache.flink.api.common.typeinfo.Types;
26+
//import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
//import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
28+
//import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
29+
//import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
30+
//import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
31+
//import org.apache.flink.types.Row;
32+
//
33+
//import java.io.IOException;
34+
//import java.io.Serializable;
35+
//import java.sql.Date;
36+
//import java.sql.Time;
37+
//import java.sql.Timestamp;
38+
//import java.util.Iterator;
39+
//import java.util.List;
40+
//import java.util.Map;
41+
//
42+
///**
43+
// * source data parse to json format
44+
// *
45+
// * Date: 2019/12/12
46+
// * Company: www.dtstack.com
47+
// * @author maqi
48+
// */
49+
//public class JsonDataParser implements Serializable {
50+
//
51+
// private final ObjectMapper objectMapper = new ObjectMapper();
52+
//
53+
// private Map<String, String> rowAndFieldMapping;
54+
// private Map<String, JsonNode> nodeAndJsonNodeMapping = Maps.newHashMap();
55+
//
56+
// private final String[] fieldNames;
57+
// private final TypeInformation<?>[] fieldTypes;
58+
// private List<TableInfo.FieldExtraInfo> fieldExtraInfos;
59+
//
60+
// public JsonDataParser(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping, List<TableInfo.FieldExtraInfo> fieldExtraInfos) {
61+
// this.fieldNames = ((RowTypeInfo) typeInfo).getFieldNames();
62+
// this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes();
63+
// this.rowAndFieldMapping = rowAndFieldMapping;
64+
// this.fieldExtraInfos = fieldExtraInfos;
65+
// }
66+
//
67+
//
68+
// public Row parseData(byte[] data) throws IOException {
69+
// JsonNode root = objectMapper.readTree(data);
70+
// parseTree(root, null);
71+
// Row row = new Row(fieldNames.length);
72+
//
73+
// try {
74+
// for (int i = 0; i < fieldNames.length; i++) {
75+
// JsonNode node = getIgnoreCase(fieldNames[i]);
76+
// TableInfo.FieldExtraInfo fieldExtraInfo = fieldExtraInfos.get(i);
77+
//
78+
// if (node == null) {
79+
// if (fieldExtraInfo != null && fieldExtraInfo.getNotNull()) {
80+
// throw new IllegalStateException("Failed to find field with name '"
81+
// + fieldNames[i] + "'.");
82+
// } else {
83+
// row.setField(i, null);
84+
// }
85+
// } else {
86+
// // Read the value as specified type
87+
// Object value = convert(node, fieldTypes[i]);
88+
// row.setField(i, value);
89+
// }
90+
// }
91+
// return row;
92+
// } finally {
93+
// nodeAndJsonNodeMapping.clear();
94+
// }
95+
// }
96+
//
97+
// private void parseTree(JsonNode jsonNode, String prefix){
98+
// if (jsonNode.isArray()) {
99+
// ArrayNode array = (ArrayNode) jsonNode;
100+
// for (int i = 0; i < array.size(); i++) {
101+
// JsonNode child = array.get(i);
102+
// String nodeKey = getNodeKey(prefix, i);
103+
//
104+
// if (child.isValueNode()) {
105+
// nodeAndJsonNodeMapping.put(nodeKey, child);
106+
// } else {
107+
// if (rowAndFieldMapping.containsValue(nodeKey)) {
108+
// nodeAndJsonNodeMapping.put(nodeKey, child);
109+
// }
110+
// parseTree(child, nodeKey);
111+
// }
112+
// }
113+
// return;
114+
// }
115+
// Iterator<String> iterator = jsonNode.fieldNames();
116+
// while (iterator.hasNext()){
117+
// String next = iterator.next();
118+
// JsonNode child = jsonNode.get(next);
119+
// String nodeKey = getNodeKey(prefix, next);
120+
//
121+
// if (child.isValueNode()){
122+
// nodeAndJsonNodeMapping.put(nodeKey, child);
123+
// }else if(child.isArray()){
124+
// parseTree(child, nodeKey);
125+
// }else {
126+
// parseTree(child, nodeKey);
127+
// }
128+
// }
129+
// }
130+
//
131+
// private JsonNode getIgnoreCase(String key) {
132+
// String nodeMappingKey = rowAndFieldMapping.getOrDefault(key, key);
133+
// return nodeAndJsonNodeMapping.get(nodeMappingKey);
134+
// }
135+
//
136+
// private String getNodeKey(String prefix, String nodeName){
137+
// if(Strings.isNullOrEmpty(prefix)){
138+
// return nodeName;
139+
// }
140+
// return prefix + "." + nodeName;
141+
// }
142+
//
143+
// private String getNodeKey(String prefix, int i) {
144+
// if (Strings.isNullOrEmpty(prefix)) {
145+
// return "[" + i + "]";
146+
// }
147+
// return prefix + "[" + i + "]";
148+
// }
149+
//
150+
// private Object convert(JsonNode node, TypeInformation<?> info) {
151+
// if (info.getTypeClass().equals(Types.BOOLEAN.getTypeClass())) {
152+
// return node.asBoolean();
153+
// } else if (info.getTypeClass().equals(Types.STRING.getTypeClass())) {
154+
// return node.asText();
155+
// } else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
156+
// return Date.valueOf(node.asText());
157+
// } else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
158+
// // local zone
159+
// return Time.valueOf(node.asText());
160+
// } else if (info.getTypeClass().equals(Types.SQL_TIMESTAMP.getTypeClass())) {
161+
// // local zone
162+
// return Timestamp.valueOf(node.asText());
163+
// } else {
164+
// // for types that were specified without JSON schema
165+
// // e.g. POJOs
166+
// try {
167+
// return objectMapper.treeToValue(node, info.getTypeClass());
168+
// } catch (JsonProcessingException e) {
169+
// throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node);
170+
// }
171+
// }
172+
// }
173+
//}

0 commit comments

Comments
 (0)