Skip to content

Commit

Permalink
[TUBEMQ-451]Replace ConsumeTupleInfo with Tuple2 (apache#349)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <gosonzhang@tencent.com>
  • Loading branch information
2 people authored and EMsnap committed Jan 7, 2021
1 parent 15ceeb3 commit 6b74100
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.tubemq.corebase.utils.DataConverterUtil;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corerpc.RpcConfig;
import org.apache.tubemq.corerpc.RpcConstants;
import org.apache.tubemq.corerpc.RpcServiceFactory;
Expand Down Expand Up @@ -1883,14 +1884,14 @@ private void balance() {
if (consumerId == null) {
continue;
}
ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
|| tupleInfo.groupName == null
|| tupleInfo.consumerInfo == null) {
|| tupleInfo.f0 == null
|| tupleInfo.f1 == null) {
continue;
}
List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
Expand All @@ -1907,7 +1908,7 @@ private void balance() {
currentPartMap = new HashMap<>();
}
}
if (tupleInfo.consumerInfo.isOverTLS()) {
if (tupleInfo.f1.isOverTLS()) {
for (Partition currentPart : currentPartMap.values()) {
if (!blackTopicList.contains(currentPart.getTopic())) {
boolean found = false;
Expand All @@ -1923,8 +1924,8 @@ private void balance() {
}
}
deletedSubInfoList
.add(new SubscribeInfo(consumerId, tupleInfo.groupName,
tupleInfo.consumerInfo.isOverTLS(), currentPart));
.add(new SubscribeInfo(consumerId, tupleInfo.f0,
tupleInfo.f1.isOverTLS(), currentPart));
}
for (Partition finalPart : finalPartList) {
if (!blackTopicList.contains(finalPart.getTopic())) {
Expand All @@ -1940,22 +1941,22 @@ private void balance() {
continue;
}
addedSubInfoList.add(new SubscribeInfo(consumerId,
tupleInfo.groupName, true, finalPart));
tupleInfo.f0, true, finalPart));
}
}
} else {
for (Partition currentPart : currentPartMap.values()) {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (!finalPartList.contains(currentPart))) {
deletedSubInfoList
.add(new SubscribeInfo(consumerId, tupleInfo.groupName, false, currentPart));
.add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart));
}
}
for (Partition finalPart : finalPartList) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
addedSubInfoList.add(new SubscribeInfo(consumerId,
tupleInfo.groupName, false, finalPart));
tupleInfo.f0, false, finalPart));
}
}
}
Expand Down Expand Up @@ -2033,16 +2034,16 @@ private void resetBalance() {
if (consumerId == null) {
continue;
}
ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
|| tupleInfo.groupName == null
|| tupleInfo.consumerInfo == null) {
|| tupleInfo.f0 == null
|| tupleInfo.f1 == null) {
continue;
}

List<String> blackTopicList =
this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
Expand All @@ -2066,15 +2067,15 @@ private void resetBalance() {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
deletedSubInfoList
.add(new SubscribeInfo(consumerId, tupleInfo.groupName,
tupleInfo.consumerInfo.isOverTLS(), currentPart));
.add(new SubscribeInfo(consumerId, tupleInfo.f0,
tupleInfo.f1.isOverTLS(), currentPart));
}
}
for (Partition finalPart : finalPartMap.values()) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.groupName,
tupleInfo.consumerInfo.isOverTLS(), finalPart));
addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0,
tupleInfo.f1.isOverTLS(), finalPart));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.tubemq.corebase.cluster.ConsumerInfo;
import org.apache.tubemq.corebase.utils.Tuple2;


public class ConsumerInfoHolder {
Expand Down Expand Up @@ -369,7 +370,7 @@ public ConsumerInfo removeConsumer(String group,
return consumer;
}

public ConsumeTupleInfo getConsumeTupleInfo(String consumerId) {
public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) {
try {
rwLock.readLock().lock();
ConsumerInfo consumerInfo = null;
Expand All @@ -378,7 +379,7 @@ public ConsumeTupleInfo getConsumeTupleInfo(String consumerId) {
if (consumeBandInfo != null) {
consumerInfo = consumeBandInfo.getConsumerInfo(consumerId);
}
return new ConsumeTupleInfo(groupName, consumerInfo);
return new Tuple2<String, ConsumerInfo>(groupName, consumerInfo);
} finally {
rwLock.readLock().unlock();
}
Expand Down Expand Up @@ -424,13 +425,4 @@ public void clear() {
groupInfoMap.clear();
}

public class ConsumeTupleInfo {
public String groupName;
public ConsumerInfo consumerInfo;

public ConsumeTupleInfo(String groupName, ConsumerInfo consumerInfo) {
this.groupName = groupName;
this.consumerInfo = consumerInfo;
}
}
}

0 comments on commit 6b74100

Please sign in to comment.