Skip to content
Permalink
Browse files

fixed issue #1374 , support queryLogEvent partition

  • Loading branch information...
agapple committed Jan 7, 2019
1 parent 670b783 commit de501c85a15b13f1f76d434aa92b6fcacbebbc61
Showing with 40 additions and 44 deletions.
  1. +40 −44 server/src/main/java/com/alibaba/otter/canal/common/MQMessageUtils.java
@@ -1,6 +1,12 @@
package com.alibaba.otter.canal.common;

import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.StringUtils;

@@ -24,46 +30,36 @@
public class MQMessageUtils {

@SuppressWarnings("deprecation")
private static Map<String, List<PartitionData>> partitionDatas = MigrateMap
.makeComputingMap(new MapMaker().softValues(), new Function<String, List<PartitionData>>() {
private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
new Function<String, List<PartitionData>>() {

public List<PartitionData> apply(String pkHashConfigs) {
List<PartitionData> datas = Lists
.newArrayList();
String[] pkHashConfigArray = StringUtils
.split(pkHashConfigs, ",");
List<PartitionData> datas = Lists.newArrayList();
String[] pkHashConfigArray = StringUtils.split(pkHashConfigs,
",");
// schema.table:id^name
for (String pkHashConfig : pkHashConfigArray) {
PartitionData data = new PartitionData();
int i = pkHashConfig
.lastIndexOf(":");
int i = pkHashConfig.lastIndexOf(":");
if (i > 0) {
String pkStr = pkHashConfig
.substring(i + 1);
if (pkStr.equalsIgnoreCase(
"$pk$")) {
String pkStr = pkHashConfig.substring(i + 1);
if (pkStr.equalsIgnoreCase("$pk$")) {
data.hashMode.autoPkHash = true;
} else {
data.hashMode.pkNames = Lists
.newArrayList(
StringUtils
.split(
pkStr,
'^'));
data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
'^'));
}

pkHashConfig = pkHashConfig
.substring(0, i);
pkHashConfig = pkHashConfig.substring(0,
i);
} else {
data.hashMode.tableHash = true;
}

if (!isWildCard(
pkHashConfig)) {
if (!isWildCard(pkHashConfig)) {
data.simpleName = pkHashConfig;
} else {
data.regexFilter = new AviaterRegexFilter(
pkHashConfig);
data.regexFilter = new AviaterRegexFilter(pkHashConfig);
}
datas.add(data);
}
@@ -73,29 +69,24 @@
});

@SuppressWarnings("deprecation")
private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap
.makeComputingMap(new MapMaker().softValues(), new Function<String, List<DynamicTopicData>>() {
private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(new MapMaker().softValues(),
new Function<String, List<DynamicTopicData>>() {

public List<DynamicTopicData> apply(String pkHashConfigs) {
List<DynamicTopicData> datas = Lists
.newArrayList();
String[] dynamicTopicArray = StringUtils
.split(pkHashConfigs, ",");
List<DynamicTopicData> datas = Lists.newArrayList();
String[] dynamicTopicArray = StringUtils.split(pkHashConfigs,
",");
// schema.table
for (String dynamicTopic : dynamicTopicArray) {
DynamicTopicData data = new DynamicTopicData();

if (!isWildCard(
dynamicTopic)) {
if (!isWildCard(dynamicTopic)) {
data.simpleName = dynamicTopic;
} else {
if (dynamicTopic
.contains("\\.")) {
data.tableRegexFilter = new AviaterRegexFilter(
dynamicTopic);
if (dynamicTopic.contains("\\.")) {
data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
} else {
data.schemaRegexFilter = new AviaterRegexFilter(
dynamicTopic);
data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
}
}
datas.add(data);
@@ -232,6 +223,9 @@
partitionEntries[pkHash].add(entry);
}
}
} else {
// 针对stmt/mixed binlog格式的query事件
partitionEntries[0].add(entry);
}
}
}
@@ -281,9 +275,8 @@
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException(
"ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}

CanalEntry.EventType eventType = rowChange.getEventType();
@@ -455,6 +448,9 @@
idx++;
}
}
} else {
// 针对stmt/mixed binlog格式的query事件
partitionMessages[0] = flatMessage;
}
}
return partitionMessages;
@@ -524,8 +520,8 @@ public static boolean checkPkNamesHasContain(List<String> pkNames, String name)

private static boolean isWildCard(String value) {
// not contaiins '.' ?
return StringUtils.containsAny(value,
new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$', '^' });
return StringUtils.containsAny(value, new char[] { '*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
'^' });
}

private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,

0 comments on commit de501c8

Please sign in to comment.
You can’t perform that action at this time.