Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others #90

Merged
merged 1 commit into from
May 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<>();
private final BlockingQueue<String> indexPartition =
new LinkedBlockingQueue<>();
private final ConcurrentLinkedQueue<String> indexPartition =
new ConcurrentLinkedQueue<String>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
Expand Down Expand Up @@ -251,7 +249,23 @@ public PartitionSelectResult pushSelect() {
if (this.isClosed.get()) {
return null;
}
String key = indexPartition.take();
String key = null;
do {
key = indexPartition.poll();
if (key != null) {
break;
}
if (this.isClosed.get()) {
break;
}
if (!partitionMap.isEmpty()) {
break;
}
ThreadUtils.sleep(200);
} while(true);
if (key == null) {
return null;
}
PartitionExt partitionExt = partitionMap.get(key);
if (partitionExt == null) {
return null;
Expand All @@ -271,9 +285,12 @@ public PartitionSelectResult pushSelect() {
}

protected boolean isPartitionInUse(String partitionKey, long usedToken) {
if (partitionMap.containsKey(partitionKey)) {
PartitionExt partitionExt = partitionMap.get(partitionKey);
if (partitionExt != null) {
Long curToken = partitionUsedMap.get(partitionKey);
return curToken != null && curToken == usedToken;
if (curToken != null && curToken == usedToken) {
return true;
}
}
return false;
}
Expand Down Expand Up @@ -322,7 +339,7 @@ protected void errReqRelease(String partitionKey, long usedToken, boolean isLast
partitionUsedMap.remove(partitionKey);
partitionExt.setLastPackConsumed(isLastPackConsumed);
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -351,7 +368,7 @@ protected void succRspRelease(String partitionKey, String topicName,
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -384,7 +401,7 @@ public void errRspRelease(String partitionKey, String topicName,
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
indexPartition.put(partitionKey);
indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
Expand All @@ -409,7 +426,7 @@ public void close() {
}
for (int i = this.waitCont.get() + 1; i > 0; i--) {
try {
indexPartition.put("------");
indexPartition.offer("------");
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -629,12 +646,12 @@ public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
Long oldTime = partitionUsedMap.get(keyId);
if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
partitionUsedMap.remove(keyId);
if (partitionMap.containsKey(keyId)) {
PartitionExt partitionExt = partitionMap.get(keyId);
PartitionExt partitionExt = partitionMap.get(keyId);
if (partitionExt != null) {
partitionExt.setLastPackConsumed(false);
if (!indexPartition.contains(keyId)) {
try {
indexPartition.put(keyId);
indexPartition.offer(keyId);
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -715,7 +732,7 @@ private void addPartitionsInfo(Map<Partition, Long> partOffsetMap) {
partitionUsedMap.remove(partition.getPartitionKey());
if (!indexPartition.contains(partition.getPartitionKey())) {
try {
indexPartition.put(partition.getPartitionKey());
indexPartition.offer(partition.getPartitionKey());
} catch (Throwable e) {
//
}
Expand Down Expand Up @@ -757,7 +774,7 @@ private boolean cancelTimeTask(String indexId) {
}

private boolean isTimeWait(String indexId) {
return this.timeouts.containsKey(indexId);
return (timeouts.get(indexId) != null);
}

private boolean hasPartitionWait() {
Expand All @@ -776,10 +793,11 @@ public TimeoutTask(final String indexId) {
public void run(Timeout timeout) throws Exception {
Timeout timeout1 = timeouts.remove(indexId);
if (timeout1 != null) {
if (partitionMap.containsKey(indexId)) {
PartitionExt partitionExt = partitionMap.get(indexId);
if (partitionExt != null) {
if (!indexPartition.contains(this.indexId)) {
try {
indexPartition.put(this.indexId);
indexPartition.offer(this.indexId);
} catch (Throwable e) {
//
}
Expand Down