-
Notifications
You must be signed in to change notification settings - Fork 7.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
建议更正一下数据同步ES的Wiki #2160
Comments
看你研究得比较深,目前投产了吗,有没有发现其它问题? |
|
OK,感谢! |
tks |
感谢~ |
这个错误在1.1.6中已经修复,可以参考1.1.6更新的内容,对没有配置key的OuterAdapter,会根据destination+groupId+序号生成一个key,保证这些OuterAdapter不共享 |
👌 |
先附官网数据同步ES的地址:
https://github.com/alibaba/canal/wiki/Sync-ES
实际上 配置es的时候 遗漏了 key的属性, 应该在ES的文档声明中 加上这个
标准姿势应该如下:
application.yml配置
key: exampleKey #配置es数据源的key 必须唯一 (这个属性超级无敌重要)
hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
properties:
cluster.name: elasticsearch # es cluster name
适配器映射配置:
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
outerAdapterKey: exampleKey #对应上文中的key 这个key很重要 再强调一次
groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
如果按照官网现在给的那个例子,有概率出现以下问题:
由于es没有配置key 会导致多个 instance 共享 一个 EsAdapater,且EsAdapter 会间接关联到一个非线程安全的final List requests = new ArrayList<>()
1.在多个instance同时写入es的时候 有可能会数据丢失(requests.add操作) 【我自己遇到过,而且极难排查】
2.抛出java.lang.RuntimeException: java.lang.RuntimeException: java.util.ConcurrentModificationException 这是因为 当其中某个 instance在提交的时候 会进行for循环 遍历requests 假如恰好另外一个instance中间 进行了 add操作 就会触发。【有人遇到过这个问题】
顺便附一段 初始化 EsAdapter的代码:
public T getExtension(String name, String key) {
if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
String extKey = name + "-" + StringUtils.trimToEmpty(key);
Holder holder = cachedInstances.get(extKey);
if (holder == null) {
cachedInstances.putIfAbsent(extKey, new Holder<>());
holder = cachedInstances.get(extKey);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name, key);
holder.set(instance);
}
}
}
return (T) instance;
}
从这段代码可以看出 不配置key 引用指向的都是同一个EsAdapter.
The text was updated successfully, but these errors were encountered: