Skip to content

Commit e052b0b

Browse files
committed
[hotfix-36409] 维表设置允许错误数限制不生效问题
1 parent 7c2555f commit e052b0b

File tree

10 files changed

+41
-9
lines changed

10 files changed

+41
-9
lines changed

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
9191
cassandraSideTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase())));
9292
cassandraSideTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase())));
9393

94+
if (MathUtil.getLongVal(props.get(cassandraSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
95+
cassandraSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(cassandraSideTableInfo.ERROR_LIMIT.toLowerCase())));
96+
}
97+
9498
return cassandraSideTableInfo;
9599
}
96100

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public abstract class AbstractTableInfo implements Serializable {
6868
* error data limit. Task will failed once {@link AbstractDtRichOutputFormat#outDirtyRecords}
6969
* count over limit. Default 1000L;
7070
*/
71-
private Long errorLimit = 0L;
71+
private Long errorLimit = Long.MAX_VALUE;
7272

7373
public String[] getFieldTypes() {
7474
return fieldTypes;

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7171
elasticsearch6SideTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES6_USERNAME.toLowerCase())));
7272
elasticsearch6SideTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES6_PASSWORD.toLowerCase())));
7373
}
74+
75+
if (MathUtil.getLongVal(props.get(elasticsearch6SideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
76+
elasticsearch6SideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(elasticsearch6SideTableInfo.ERROR_LIMIT.toLowerCase())));
77+
}
78+
7479
elasticsearch6SideTableInfo.check();
7580
return elasticsearch6SideTableInfo;
7681
}

elasticsearch7/elasticsearch7-side/elasticsearch7-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/table/Elasticsearch7SideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7171
elasticsearch7SideTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES7_USERNAME.toLowerCase())));
7272
elasticsearch7SideTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES7_PASSWORD.toLowerCase())));
7373
}
74+
75+
if (MathUtil.getLongVal(props.get(elasticsearch7SideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
76+
elasticsearch7SideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(elasticsearch7SideTableInfo.ERROR_LIMIT.toLowerCase())));
77+
}
78+
7479
elasticsearch7SideTableInfo.check();
7580
return elasticsearch7SideTableInfo;
7681
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7272
hbaseTableInfo.setPreRowKey(MathUtil.getBoolean(props.get(PRE_ROW_KEY.toLowerCase()), false));
7373
hbaseTableInfo.setCacheType((String) props.get(CACHE));
7474
hbaseTableInfo.setKerberosAuthEnable(MathUtil.getBoolean(props.get(KERBEROS_ENABLE), false));
75+
76+
if (MathUtil.getLongVal(props.get(hbaseTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
77+
hbaseTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(hbaseTableInfo.ERROR_LIMIT.toLowerCase())));
78+
}
79+
7580
props.entrySet().stream()
7681
.filter(entity -> entity.getKey().contains("."))
7782
.map(entity -> hbaseTableInfo.getHbaseConfig().put(entity.getKey(), String.valueOf(entity.getValue())))

impala/impala-side/impala-side-core/src/main/java/com/dtstack/flink/sql/side/impala/table/ImpalaSideParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6565
impalaSideTableInfo.setFastCheck(MathUtil.getBoolean(props.getOrDefault(RdbSideTableInfo.FAST_CHECK.toLowerCase(), true)));
6666
impalaSideTableInfo.setCheckProperties();
6767

68+
if (MathUtil.getLongVal(props.get(impalaSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
69+
impalaSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(impalaSideTableInfo.ERROR_LIMIT.toLowerCase())));
70+
}
6871

6972
//set authmech params
7073
Integer authMech = MathUtil.getIntegerVal(props.get(ImpalaSideTableInfo.AUTHMECH_KEY.toLowerCase()));

kudu/kudu-side/kudu-side-core/src/main/java/com/dtstack/flink/sql/side/kudu/table/KuduSideParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
8080
);
8181
kuduSideTableInfo.judgeKrbEnable();
8282

83+
if (MathUtil.getLongVal(props.get(kuduSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
84+
kuduSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(kuduSideTableInfo.ERROR_LIMIT.toLowerCase())));
85+
}
86+
8387
return kuduSideTableInfo;
8488

8589
}

mongo/mongo-side/mongo-side-core/src/main/java/com/dtstack/flink/sql/side/mongo/table/MongoSideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5959
mongoSideTableInfo.setDatabase(MathUtil.getString(props.get(DATABASE_KEY.toLowerCase())));
6060
mongoSideTableInfo.setUserName(MathUtil.getString(props.get(USER_NAME_KEY.toLowerCase())));
6161
mongoSideTableInfo.setPassword(MathUtil.getString(props.get(PASSWORD_KEY.toLowerCase())));
62+
63+
if (MathUtil.getLongVal(props.get(mongoSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
64+
mongoSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(mongoSideTableInfo.ERROR_LIMIT.toLowerCase())));
65+
}
66+
6267
mongoSideTableInfo.check();
6368
return mongoSideTableInfo;
6469
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/table/RdbSideParser.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5252
rdbTableInfo.setSchema(MathUtil.getString(props.get(RdbSideTableInfo.SCHEMA_KEY.toLowerCase())));
5353
rdbTableInfo.setDriverName(MathUtil.getString(props.get(RdbSideTableInfo.DRIVER_NAME)));
5454
rdbTableInfo.setFastCheck(MathUtil.getBoolean(props.getOrDefault(RdbSideTableInfo.FAST_CHECK.toLowerCase(), false)));
55-
rdbTableInfo.setErrorLimit(
56-
MathUtil.getLongVal(
57-
props.getOrDefault(
58-
RdbSideTableInfo.ERROR_LIMIT.toLowerCase(),
59-
1000L
60-
)
61-
)
62-
);
55+
56+
if (MathUtil.getLongVal(props.get(RdbSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
57+
rdbTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(RdbSideTableInfo.ERROR_LIMIT.toLowerCase())));
58+
}
6359

6460
rdbTableInfo.setCheckProperties();
6561

redis5/redis5-side/redis-side-core/src/main/java/com/dtstack/flink/sql/side/redis/table/RedisSideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5151
redisSideTableInfo.setMasterName(MathUtil.getString(props.get(RedisSideTableInfo.MASTER_NAME.toLowerCase())));
5252
redisSideTableInfo.setRedisType(MathUtil.getString(props.get(RedisSideTableInfo.REDIS_TYPE.toLowerCase())));
5353

54+
if (MathUtil.getLongVal(props.get(RedisSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
55+
redisSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(RedisSideTableInfo.ERROR_LIMIT.toLowerCase())));
56+
}
57+
58+
redisSideTableInfo.check();
5459
return redisSideTableInfo;
5560
}
5661
}

0 commit comments

Comments
 (0)