Skip to content

Commit

Permalink
[TUBEMQ-149]Some of the consumers stop consuming their corresponding …
Browse files Browse the repository at this point in the history
…partitions and never release the partition to others (#90)

Co-authored-by: gosonzhang <gosonzhang@tencent.com>
  • Loading branch information
gosonzhang and gosonzhang committed May 22, 2020
1 parent bccceba commit 4893e28
Showing 1 changed file with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<>();
private final BlockingQueue<String> indexPartition =
new LinkedBlockingQueue<>();
private final ConcurrentLinkedQueue<String> indexPartition =
new ConcurrentLinkedQueue<String>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
Expand Down Expand Up @@ -251,7 +249,23 @@ public PartitionSelectResult pushSelect() {
if (this.isClosed.get()) {
return null;
}
String key = indexPartition.take();
String key = null;
do {
key = indexPartition.poll();
if (key != null) {
break;
}
if (this.isClosed.get()) {
break;
}
if (!partitionMap.isEmpty()) {
break;
}
ThreadUtils.sleep(200);
} while(true);
if (key == null) {
return null;
}
PartitionExt partitionExt = partitionMap.get(key);
if (partitionExt == null) {
return null;
Expand All @@ -271,9 +285,12 @@ public PartitionSelectResult pushSelect() {
}

protected boolean isPartitionInUse(String partitionKey, long usedToken) {
if (partitionMap.containsKey(partitionKey)) {
PartitionExt partitionExt = partitionMap.get(partitionKey);
if (partitionExt != null) {
Long curToken = partitionUsedMap.get(partitionKey);
return curToken != null && curToken == usedToken;
if (curToken != null && curToken == usedToken) {
return true;
}
}
return false;
}
Expand Down Expand Up @@ -322,7 +339,7 @@ protected void errReqRelease(String partitionKey, long usedToken, boolean isLast
partitionUsedMap.remove(partitionKey);
partitionExt.setLastPackConsumed(isLastPackConsumed);
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -351,7 +368,7 @@ protected void succRspRelease(String partitionKey, String topicName,
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -384,7 +401,7 @@ public void errRspRelease(String partitionKey, String topicName,
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand All @@ -409,7 +426,7 @@ public void close() {
}
for (int i = this.waitCont.get() + 1; i > 0; i--) {
try {
indexPartition.put("------");
indexPartition.offer("------");
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -629,12 +646,12 @@ public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
Long oldTime = partitionUsedMap.get(keyId);
if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
partitionUsedMap.remove(keyId);
if (partitionMap.containsKey(keyId)) {
PartitionExt partitionExt = partitionMap.get(keyId);
PartitionExt partitionExt = partitionMap.get(keyId);
if (partitionExt != null) {
partitionExt.setLastPackConsumed(false);
if (!indexPartition.contains(keyId)) {
try {
indexPartition.put(keyId);
indexPartition.offer(keyId);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -715,7 +732,7 @@ private void addPartitionsInfo(Map<Partition, Long> partOffsetMap) {
partitionUsedMap.remove(partition.getPartitionKey());
if (!indexPartition.contains(partition.getPartitionKey())) {
try {
indexPartition.put(partition.getPartitionKey());
indexPartition.offer(partition.getPartitionKey());
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -757,7 +774,7 @@ private boolean cancelTimeTask(String indexId) {
}

private boolean isTimeWait(String indexId) {
return this.timeouts.containsKey(indexId);
return (timeouts.get(indexId) != null);
}

private boolean hasPartitionWait() {
Expand All @@ -776,10 +793,11 @@ public TimeoutTask(final String indexId) {
public void run(Timeout timeout) throws Exception {
Timeout timeout1 = timeouts.remove(indexId);
if (timeout1 != null) {
if (partitionMap.containsKey(indexId)) {
PartitionExt partitionExt = partitionMap.get(indexId);
if (partitionExt != null) {
if (!indexPartition.contains(this.indexId)) {
try {
indexPartition.put(this.indexId);
indexPartition.offer(this.indexId);
} catch (Throwable e) {
//
}
Expand Down

0 comments on commit 4893e28

Please sign in to comment.