From 96736ace57d375194074ea6e58779ad47d28403c Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 25 May 2020 10:53:20 +0800 Subject: [PATCH] [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] --- .../tubemq/client/consumer/RmtDataCache.java | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java index ded1d998a3e..e138d2ac7e5 100644 --- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java +++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java @@ -340,11 +340,7 @@ protected void errReqRelease(String partitionKey, long usedToken, boolean isLast if (oldUsedToken != null && oldUsedToken == usedToken) { partitionUsedMap.remove(partitionKey); partitionExt.setLastPackConsumed(isLastPackConsumed); - try { - indexPartition.offer(partitionKey); - } catch (Throwable e) { - // - } + releaseIdlePartition(-1, partitionKey); } } } @@ -365,16 +361,7 @@ protected void succRspRelease(String partitionKey, String topicName, partitionExt.setLastPackConsumed(isLastPackConsumed); long waitDlt = partitionExt.procConsumeResult(isFilterConsume); - if (waitDlt > 10) { - timeouts.put(partitionKey, - timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS)); - } else { - try { - indexPartition.offer(partitionKey); - } catch (Throwable e) { - // - } - } + releaseIdlePartition(waitDlt, partitionKey); } } } @@ -398,21 +385,26 @@ public void errRspRelease(String partitionKey, String topicName, long waitDlt = partitionExt.procConsumeResult(isFilterConsume, reqProcType, errCode, msgSize, isEscLimit, limitDlt, curDataDlt, false); - if (waitDlt > 10) { - timeouts.put(partitionKey, - timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS)); - } else { - try { - indexPartition.offer(partitionKey); - } catch (Throwable e) { - // - } - } + releaseIdlePartition(waitDlt, partitionKey); } } } } + private void releaseIdlePartition(long waitDlt, String partitionKey) { + if (waitDlt > 10) { + TimeoutTask timeoutTask = new TimeoutTask(partitionKey); + timeouts.put(partitionKey, + timer.newTimeout(timeoutTask, waitDlt, TimeUnit.MILLISECONDS)); + } else { + try { + indexPartition.offer(partitionKey); + } catch (Throwable e) { + // + } + } + } + /** * Close the remote data cache */ @@ -427,11 +419,7 @@ public void close() { timer = null; } for (int i = this.waitCont.get() + 1; i > 0; i--) { - try { - indexPartition.offer("------"); - } catch (Throwable e) { - // - } + releaseIdlePartition(-1, "------"); } } } @@ -663,6 +651,31 @@ public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) { } } } + // add timeout expired check + if (!timeouts.isEmpty()) { + List partKeys = new ArrayList(); + partKeys.addAll(timeouts.keySet()); + Timeout timeout1 = null; + for (String keyId : partKeys) { + timeout1 = timeouts.get(keyId); + if (timeout1 != null && timeout1.isExpired()) { + timeout1 = timeouts.remove(keyId); + if (timeout1 != null) { + PartitionExt partitionExt = partitionMap.get(keyId); + if (partitionExt != null) { + if (!indexPartition.contains(keyId)) { + try { + indexPartition.offer(keyId); + } catch (Throwable e) { + // + } + } + } + } + } + } + + } } private void waitPartitions(List partitionKeys, long inUseWaitPeriodMs) { @@ -787,9 +800,15 @@ private boolean hasPartitionWait() { public class TimeoutTask implements TimerTask { private String indexId; + private long createTime = 0L; public TimeoutTask(final String indexId) { this.indexId = indexId; + this.createTime = System.currentTimeMillis(); + } + + public long getCreateTime() { + return this.createTime; } @Override