Skip to content

Commit d65c07d

Browse files
committed
[feat] dirty manager add to kafka source and hbase sink.
1 parent 5063d24 commit d65c07d

File tree

25 files changed

+482
-246
lines changed

25 files changed

+482
-246
lines changed

core/src/main/java/com/dtstack/flink/sql/dirtyManager/consumer/AbstractDirtyDataConsumer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
import org.slf4j.LoggerFactory;
2424

2525
import java.io.Serializable;
26-
import java.util.Map;
26+
import java.util.Properties;
2727
import java.util.concurrent.LinkedBlockingQueue;
28+
import java.util.concurrent.TimeUnit;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicLong;
3031

@@ -65,7 +66,7 @@ public abstract class AbstractDirtyDataConsumer implements Runnable, Serializabl
6566
* @param properties 任务参数
6667
* @throws Exception throw exception
6768
*/
68-
public abstract void init(Map<String, String> properties) throws Exception;
69+
public abstract void init(Properties properties) throws Exception;
6970

7071
/**
7172
* 检验consumer是否正在执行
@@ -77,15 +78,13 @@ public boolean isRunning() {
7778
@Override
7879
public void run() {
7980
try {
80-
LOG.info("start to consume dirty data");
8181
while (isRunning.get()) {
8282
consume();
8383
}
84-
LOG.info("consume dirty data end");
8584
} catch (Exception e) {
8685
LOG.error("consume dirtyData error", e);
8786
errorCount.incrementAndGet();
88-
if (errorCount.get() == errorLimit) {
87+
if (errorCount.get() > errorLimit) {
8988
throw new RuntimeException("The task failed due to the number of dirty data consume failed reached the limit " + errorLimit);
9089
}
9190
}
@@ -95,4 +94,8 @@ public AbstractDirtyDataConsumer setQueue(LinkedBlockingQueue<DirtyDataEntity> q
9594
this.queue = queue;
9695
return this;
9796
}
97+
98+
public void collectDirtyData(DirtyDataEntity dataEntity, long blockingInterval) throws InterruptedException {
99+
queue.offer(dataEntity, blockingInterval, TimeUnit.MILLISECONDS);
100+
}
98101
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.dirtyManager.consumer;
20+
21+
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
22+
import com.dtstack.flink.sql.util.PluginUtil;
23+
24+
import java.io.File;
25+
import java.lang.reflect.Constructor;
26+
import java.util.Objects;
27+
28+
import static com.dtstack.flink.sql.util.PluginUtil.upperCaseFirstChar;
29+
30+
/**
31+
* @author tiezhu
32+
* Company dtstack
33+
* Date 2020/12/21 星期一
34+
*/
35+
public class DirtyConsumerFactory {
36+
private static final String DEFAULT_DIRTY_TYPE = "console";
37+
private static final String DIRTY_CONSUMER_PATH = "dirtyData";
38+
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
39+
private static final String CLASS_POST_STR = "DirtyDataConsumer";
40+
41+
/**
42+
* 通过动态方式去加载脏数据插件
43+
*
44+
* @param dirtyType 脏数据插件类型
45+
* @param pluginPath 脏数据插件直地址
46+
* @param pluginLoadMode 插件加载方式
47+
* @return 脏数据消费者
48+
* @throws Exception exception
49+
*/
50+
public static AbstractDirtyDataConsumer getDirtyConsumer(
51+
String dirtyType
52+
, String pluginPath
53+
, String pluginLoadMode) throws Exception {
54+
if (Objects.isNull(dirtyType)) {
55+
dirtyType = DEFAULT_DIRTY_TYPE;
56+
}
57+
String consumerType = DIRTY_CONSUMER_PATH + File.separator + dirtyType;
58+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, pluginPath, pluginLoadMode);
59+
String className = CLASS_PRE_STR + "." + dirtyType.toLowerCase() + "." + upperCaseFirstChar(dirtyType + CLASS_POST_STR);
60+
return ClassLoaderManager.newInstance(consumerJar, cl -> {
61+
Class<?> clazz = cl.loadClass(className);
62+
Constructor<?> constructor = clazz.getConstructor();
63+
return (AbstractDirtyDataConsumer) constructor.newInstance();
64+
});
65+
}
66+
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/entity/DirtyDataEntity.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,18 @@ public DirtyDataEntity(String dirtyData, Long processDate, String cause, String
8787
this.field = field;
8888
}
8989

90+
public DirtyDataEntity(String dirtyData, Long processDate, String cause) {
91+
this.dirtyData = dirtyData;
92+
this.processDate = timeFormat.format(processDate);
93+
this.cause = cause;
94+
}
95+
9096
@Override
9197
public String toString() {
9298
return "DirtyDataEntity{" +
9399
"dirtyData='" + dirtyData + '\'' +
94100
", processDate=" + processDate +
95101
", cause='" + cause + '\'' +
96-
", field='" + field + '\'' +
97102
'}';
98103
}
99104

@@ -102,6 +107,6 @@ public String toString() {
102107
* @return 脏数据信息字符数组
103108
*/
104109
public String[] get() {
105-
return new String[] {dirtyData, String.valueOf(processDate), cause, field};
110+
return new String[] {dirtyData, String.valueOf(processDate), cause};
106111
}
107112
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 49 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@
1818

1919
package com.dtstack.flink.sql.dirtyManager.manager;
2020

21-
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
21+
import com.alibaba.fastjson.JSONObject;
2222
import com.dtstack.flink.sql.dirtyManager.consumer.AbstractDirtyDataConsumer;
23+
import com.dtstack.flink.sql.dirtyManager.consumer.DirtyConsumerFactory;
2324
import com.dtstack.flink.sql.dirtyManager.entity.DirtyDataEntity;
2425
import com.dtstack.flink.sql.factory.DTThreadFactory;
25-
import com.dtstack.flink.sql.util.PluginUtil;
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

29-
import java.io.File;
3029
import java.io.Serializable;
31-
import java.lang.reflect.Constructor;
32-
import java.util.Map;
30+
import java.util.Properties;
3331
import java.util.concurrent.LinkedBlockingQueue;
3432
import java.util.concurrent.ThreadPoolExecutor;
3533
import java.util.concurrent.TimeUnit;
@@ -42,83 +40,68 @@
4240
*/
4341
public class DirtyDataManager implements Serializable {
4442

43+
public final static int MAX_POOL_SIZE_LIMIT = 5;
4544
private static final long serialVersionUID = 7190970299538893497L;
46-
4745
private static final Logger LOG = LoggerFactory.getLogger(DirtyDataManager.class);
46+
private static final String DIRTY_BLOCK_STR = "blockingInterval";
47+
private static final String DIRTY_LIMIT_RATE_STR = "errorLimitRate";
48+
private final static int MAX_TASK_QUEUE_SIZE = 100;
49+
private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
50+
private final static String DEFAULT_BLOCKING_INTERVAL = "60";
51+
public static AbstractDirtyDataConsumer consumer;
4852

49-
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql.dirty";
50-
51-
private static final String CLASS_POST_STR = "DirtyDataConsumer";
52-
53-
private static final String DIRTY_CONSUMER_PATH = "dirtyData";
54-
55-
/**
56-
* 写入队列阻塞时间
57-
*/
58-
private long blockingInterval;
59-
60-
/**
61-
* 缓存脏数据信息队列
62-
*/
63-
public final LinkedBlockingQueue<DirtyDataEntity> queue = new LinkedBlockingQueue<>();
64-
53+
private static ThreadPoolExecutor dirtyDataConsumer;
6554
/**
6655
* 统计manager收集到的脏数据条数
6756
*/
6857
private final AtomicLong count = new AtomicLong(0);
69-
7058
/**
7159
* 脏数据写入队列失败条数
7260
*/
7361
private final AtomicLong errorCount = new AtomicLong(0);
74-
62+
/**
63+
* 写入队列阻塞时间
64+
*/
65+
private long blockingInterval;
66+
/**
67+
* 任务失败的脏数据比例
68+
*/
7569
private double errorLimitRate;
7670

77-
public static AbstractDirtyDataConsumer consumer;
78-
79-
private static ThreadPoolExecutor dirtyDataConsumer;
80-
81-
public final static int MAX_POOL_SIZE_LIMIT = 5;
82-
83-
private final static int MAX_TASK_QUEUE_SIZE = 100;
84-
85-
private final static String DEFAULT_TYPE = "console";
86-
87-
private final static String DEFAULT_ERROR_LIMIT_RATE = "0.8";
88-
89-
private final static String DEFAULT_BLOCKING_INTERVAL = "60";
90-
9171
/**
9272
* 通过参数生成manager实例,并同时将consumer实例化
9373
*/
94-
public static DirtyDataManager newInstance(Map<String, String> properties) throws Exception {
95-
DirtyDataManager manager = new DirtyDataManager();
96-
manager.blockingInterval = Long.parseLong(properties.getOrDefault("blockingInterval", DEFAULT_BLOCKING_INTERVAL));
97-
manager.errorLimitRate = Double.parseDouble(properties.getOrDefault("errorLimitRate", DEFAULT_ERROR_LIMIT_RATE));
98-
consumer = createConsumer(properties);
99-
consumer.init(properties);
100-
consumer.setQueue(manager.queue);
101-
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
102-
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
103-
dirtyDataConsumer.execute(consumer);
104-
105-
return manager;
74+
public static DirtyDataManager newInstance(Properties properties) {
75+
try {
76+
DirtyDataManager manager = new DirtyDataManager();
77+
manager.blockingInterval = Long.parseLong(String.valueOf(properties.getOrDefault(DIRTY_BLOCK_STR, DEFAULT_BLOCKING_INTERVAL)));
78+
manager.errorLimitRate = Double.parseDouble(String.valueOf(properties.getOrDefault(DIRTY_LIMIT_RATE_STR, DEFAULT_ERROR_LIMIT_RATE)));
79+
consumer = DirtyConsumerFactory.getDirtyConsumer(
80+
properties.getProperty("type")
81+
, properties.getProperty("pluginPath")
82+
, properties.getProperty("pluginLoadMode")
83+
);
84+
consumer.init(properties);
85+
consumer.setQueue(new LinkedBlockingQueue<>());
86+
dirtyDataConsumer = new ThreadPoolExecutor(MAX_POOL_SIZE_LIMIT, MAX_POOL_SIZE_LIMIT, 0, TimeUnit.MILLISECONDS,
87+
new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE), new DTThreadFactory("dirtyDataConsumer"), new ThreadPoolExecutor.CallerRunsPolicy());
88+
dirtyDataConsumer.execute(consumer);
89+
return manager;
90+
} catch (Exception e) {
91+
throw new RuntimeException("create dirtyManager error!", e);
92+
}
10693
}
10794

10895
/**
109-
* 通过动态加载的方式加载Consumer
96+
* 设置脏数据插件默认配置
97+
*
98+
* @return console的默认配置
11099
*/
111-
private static AbstractDirtyDataConsumer createConsumer(Map<String, String> properties) throws Exception {
112-
String type = properties.getOrDefault("type", DEFAULT_TYPE);
113-
String consumerType = DIRTY_CONSUMER_PATH + File.separator + type;
114-
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, properties.getOrDefault("pluginPath", null), "shipfile");
115-
String className = CLASS_PRE_STR + "." + type.toLowerCase() + "." + upperCaseFirstChar(type + CLASS_POST_STR);
116-
117-
return ClassLoaderManager.newInstance(consumerJar, cl -> {
118-
Class<?> clazz = cl.loadClass(className);
119-
Constructor<?> constructor = clazz.getConstructor();
120-
return (AbstractDirtyDataConsumer) constructor.newInstance();
121-
});
100+
public static String buildDefaultDirty() {
101+
JSONObject jsonObject = new JSONObject();
102+
jsonObject.put("type", "console");
103+
jsonObject.put("printLimit", "1000");
104+
return jsonObject.toJSONString();
122105
}
123106

124107
/**
@@ -136,16 +119,16 @@ public void close() {
136119
/**
137120
* 收集脏数据放入队列缓存中,记录放入失败的数目和存入队列中的总数目,如果放入失败的数目超过一定比例,那么manager任务失败
138121
*/
139-
public void collectDirtyData(String dataInfo, String cause, String field) {
140-
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause, field);
122+
public void collectDirtyData(String dataInfo, String cause) {
123+
DirtyDataEntity dirtyDataEntity = new DirtyDataEntity(dataInfo, System.currentTimeMillis(), cause);
141124
try {
142-
queue.offer(dirtyDataEntity, blockingInterval, TimeUnit.MILLISECONDS);
125+
consumer.collectDirtyData(dirtyDataEntity, blockingInterval);
143126
count.incrementAndGet();
144127
} catch (Exception ignored) {
145128
LOG.warn("dirty Data insert error ... Failed number: " + errorCount.incrementAndGet());
146129
LOG.warn("error dirty data:" + dirtyDataEntity.toString());
147130
if (errorCount.get() > Math.ceil(count.longValue() * errorLimitRate)) {
148-
throw new RuntimeException("The number of failed number reaches the limit, manager fails");
131+
throw new RuntimeException(String.format("The number of failed number 【%s】 reaches the limit, manager fails", errorCount.get()));
149132
}
150133
}
151134
}
@@ -156,11 +139,4 @@ public void collectDirtyData(String dataInfo, String cause, String field) {
156139
public boolean checkConsumer() {
157140
return consumer.isRunning();
158141
}
159-
160-
/**
161-
* 首字母大写
162-
*/
163-
private static String upperCaseFirstChar(String str) {
164-
return str.substring(0, 1).toUpperCase() + str.substring(1);
165-
}
166142
}

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/TestMain.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.dtstack.flink.sql.dirtyManager.manager;
22

3-
import java.util.HashMap;
4-
import java.util.Map;
3+
import java.util.Properties;
54

65
/**
76
* @author tiezhu
@@ -12,11 +11,11 @@ public class TestMain {
1211
private static final Integer DATA_NUMBER = 1000;
1312

1413
public static void main(String[] args) throws Exception {
15-
Map<String, String> properties = new HashMap<>(8);
16-
properties.put("type", "mysql");
17-
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQLTemp/sqlplugins");
18-
properties.put("url", "jdbc:mysql://kerberos01:3306/tiezhu");
19-
properties.put("userName", "dtstack");
14+
Properties properties = new Properties();
15+
properties.put("type", "console");
16+
properties.put("pluginPath", "/Users/wtz/IdeaProjects/flinkStreamSQLDemoThree/sqlplugins");
17+
properties.put("url", "jdbc:mysql://localhost:3306/tiezhu");
18+
properties.put("userName", "root");
2019
properties.put("password", "abc123");
2120
properties.put("isCreatedTable", "false");
2221
properties.put("batchSize", "1");
@@ -26,7 +25,7 @@ public static void main(String[] args) throws Exception {
2625
for (int i = 0; i < DATA_NUMBER; i++) {
2726
Thread.sleep(100);
2827
manager.collectDirtyData("testDirtyData" + i,
29-
new Exception("testException" + i).getMessage(), "testField");
28+
new Exception("testException" + i).getMessage());
3029
if (i == 50) {
3130
manager.close();
3231
}

0 commit comments

Comments
 (0)